Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

Commit

Permalink
Merge pull request #37 from xuechendi/wip_sort_shuffle_3
Browse files Browse the repository at this point in the history
Wip sort shuffle 3
  • Loading branch information
xuechendi authored Apr 3, 2019
2 parents 8229d24 + 200ce65 commit 3a6ab31
Show file tree
Hide file tree
Showing 17 changed files with 852 additions and 556 deletions.
2 changes: 1 addition & 1 deletion spark.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ spark.shuffle.pmof.enable_rdma true
spark.shuffle.pmof.enable_pmem true

# for persistent memory
spark.shuffle.pmof.max_stage_num 100
spark.shuffle.pmof.pmem_list /dev/dax0.0,/dev/dax1.0
spark.shuffle.spill.pmof.MemoryThreshold 16777216

# for rdma
spark.shuffle.pmof.server_buffer_nums 32
Expand Down
143 changes: 143 additions & 0 deletions src/main/cpp/PersistentMemoryPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
#include "PersistentMemoryPool.h"
#include <string>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <fcntl.h>
#include <fstream>

PMPool::PMPool(const char* dev, int maxStage, int maxMap, long size):
maxStage(maxStage),
maxMap(maxMap),
stop(false),
dev(dev),
worker(&PMPool::process, this) {

const char *pool_layout_name = "pmem_spark_shuffle";
cout << "PMPOOL is " << dev << endl;
// if this is a fsdax device
// we need to create
// if this is a devdax device

pmpool = pmemobj_open(dev, pool_layout_name);
if (pmpool == NULL) {
pmpool = pmemobj_create(dev, pool_layout_name, size, S_IRUSR | S_IWUSR);
}
if (pmpool == NULL) {
cerr << "Failed to create pool, kill process, errmsg: " << pmemobj_errormsg() << endl;
exit(-1);
}

stageArrayRoot = POBJ_ROOT(pmpool, struct StageArrayRoot);
}

PMPool::~PMPool() {
while(request_queue.size() > 0) {
fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size());
sleep(1);
}
fprintf(stderr, "%s request queue size is %d\n", dev, request_queue.size());
stop = true;
worker.join();
pmemobj_close(pmpool);
}

long PMPool::getRootAddr() {
return (long)pmpool;
}

void PMPool::process() {
Request *cur_req;
while(!stop) {
cur_req = (Request*)request_queue.dequeue();
if (cur_req != nullptr) {
cur_req->exec();
}
}
}

long PMPool::setMapPartition(
int partitionNum,
int stageId,
int mapId,
int partitionId,
long size,
char* data,
bool clean,
int numMaps) {
WriteRequest write_request(this, maxStage, numMaps, partitionNum, stageId, 0, mapId, partitionId, size, data, clean);
request_queue.enqueue((void*)&write_request);
return write_request.getResult();
}

long PMPool::setReducePartition(
int partitionNum,
int stageId,
int partitionId,
long size,
char* data,
bool clean,
int numMaps) {
WriteRequest write_request(this, maxStage, 1, partitionNum, stageId, 1, 0, partitionId, size, data, clean);
request_queue.enqueue((void*)&write_request);

return write_request.getResult();
}

long PMPool::getMapPartition(
MemoryBlock* mb,
int stageId,
int mapId,
int partitionId ) {
ReadRequest read_request(this, mb, stageId, 0, mapId, partitionId);
read_request.exec();
return read_request.getResult();
}

long PMPool::getReducePartition(
MemoryBlock* mb,
int stageId,
int mapId,
int partitionId ) {
ReadRequest read_request(this, mb, stageId, 1, mapId, partitionId);
read_request.exec();
read_request.getResult();
}

long PMPool::getMapPartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) {
MetaRequest meta_request(this, blockInfo, stageId, 0, mapId, partitionId);
meta_request.exec();
return meta_request.getResult();
}

long PMPool::getReducePartitionBlockInfo(BlockInfo *blockInfo, int stageId, int mapId, int partitionId) {
MetaRequest meta_request(this, blockInfo, stageId, 1, mapId, partitionId);
meta_request.exec();
return meta_request.getResult();
}

long PMPool::getMapPartitionSize(int stageId, int mapId, int partitionId) {
SizeRequest size_request(this, stageId, 0, mapId, partitionId);
size_request.exec();
return size_request.getResult();
}

long PMPool::getReducePartitionSize(int stageId, int mapId, int partitionId) {
SizeRequest size_request(this, stageId, 1, mapId, partitionId);
size_request.exec();
return size_request.getResult();
}

long PMPool::deleteMapPartition(int stageId, int mapId, int partitionId) {
DeleteRequest delete_request(this, stageId, 0, mapId, partitionId);
request_queue.enqueue((void*)&delete_request);
return delete_request.getResult();
}

long PMPool::deleteReducePartition(int stageId, int mapId, int partitionId) {
DeleteRequest delete_request(this, stageId, 1, mapId, partitionId);
request_queue.enqueue((void*)&delete_request);
return delete_request.getResult();
}

Loading

0 comments on commit 3a6ab31

Please sign in to comment.