From ff9078ebcf8dd97b6a792e08ac77f9688a6b513f Mon Sep 17 00:00:00 2001 From: Eugene-Mark Date: Sun, 29 Aug 2021 21:21:03 +0800 Subject: [PATCH] [PMEM-SHUFFLE-52] Sync RPMP to master branch --- .gitignore | 1 - core/rpmp-core.patch | 662 ------------------ .../pmof/UnCompressedMapStatus.scala | 2 +- .../shuffle/pmof/PmemShuffleWriter.scala | 28 +- .../storage/pmof/NettyByteBufferPool.scala | 4 - .../storage/pmof/PmemBlockOutputStream.scala | 4 +- .../spark/storage/pmof/PmemOutputStream.scala | 10 +- .../RpmpShuffleBlockFetcherIterator.scala | 10 +- rpmp/CMakeLists.txt | 10 +- rpmp/README.md | 26 +- rpmp/bin/start-rpmp.sh | 72 ++ rpmp/bin/stop-rpmp.sh | 48 ++ rpmp/pmpool/CMakeLists.txt | 2 +- rpmp/pmpool/proxy/PhysicalNode.h | 8 + .../proxy/clientService/ClientService.cc | 70 +- .../proxy/clientService/ClientService.h | 4 +- .../proxy/metastore/ConnectionFacade.cc | 63 +- .../pmpool/proxy/metastore/ConnectionFacade.h | 10 +- .../pmpool/proxy/metastore/MetastoreFacade.cc | 8 +- rpmp/pmpool/proxy/metastore/MetastoreFacade.h | 3 +- rpmp/pmpool/proxy/metastore/redis/Redis.cc | 11 +- rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc | 35 - rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h | 28 - .../metastore/rocksdb/RocksConnection.cc | 60 -- .../proxy/metastore/rocksdb/RocksConnection.h | 27 - .../proxy/replicaService/ReplicaService.cc | 80 ++- rpmp/test/rocksdb_test.cc | 44 -- 27 files changed, 321 insertions(+), 1009 deletions(-) delete mode 100644 core/rpmp-core.patch create mode 100755 rpmp/bin/start-rpmp.sh create mode 100755 rpmp/bin/stop-rpmp.sh delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc delete mode 100644 rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h delete mode 100644 rpmp/test/rocksdb_test.cc diff --git a/.gitignore b/.gitignore index d9de765f..58c6c9d4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,6 @@ **/target/** *.class *.iml -*.sh # logs *.log diff --git a/core/rpmp-core.patch b/core/rpmp-core.patch deleted file mode 100644 index 7bfdf46b..00000000 --- a/core/rpmp-core.patch +++ /dev/null @@ -1,662 +0,0 @@ -diff --git a/core/pom.xml b/core/pom.xml -index 983f724a..37a5a17e 100644 ---- a/core/pom.xml -+++ b/core/pom.xml -@@ -43,6 +43,11 @@ - hpnl - 0.5 - -+ -+ com.intel.rpmp -+ rpmp -+ 0.1 -+ - - org.xerial - sqlite-jdbc -diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala -index c417bba0..8399abbf 100644 ---- a/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala -+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/BaseShuffleReader.scala -@@ -131,7 +131,7 @@ private[spark] class BaseShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], - assert(pmofConf.enablePmem) - // Create an ExternalSorter to sort the data. - val sorter = -- new PmemExternalSorter[K, C, C](context, handle, pmofConf, ordering = Some(keyOrd), serializer = dep.serializer) -+ new PmemExternalSorter[K, C, C](context, handle, pmofConf, blockManager, ordering = Some(keyOrd), serializer = dep.serializer) - sorter.insertAllAndUpdateMetrics(aggregatedIter) - case None => - aggregatedIter -diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/MetadataResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/MetadataResolver.scala -index 7b2984b4..50d96a14 100644 ---- a/core/src/main/scala/org/apache/spark/shuffle/pmof/MetadataResolver.scala -+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/MetadataResolver.scala -@@ -44,7 +44,7 @@ class MetadataResolver(pmofConf: PmofConf) { - * @param rkey - */ - def pushPmemBlockInfo(shuffleId: Int, mapId: Long, dataAddressMap: mutable.HashMap[Int, Array[(Long, Int)]], rkey: Long): Unit = { -- val buffer: Array[Byte] = new Array[Byte](pmofConf.reduce_serializer_buffer_size.toInt) -+ val buffer: Array[Byte] = new Array[Byte](pmofConf.map_serializer_buffer_size.toInt) - var output = new Output(buffer) - val bufferArray = new ArrayBuffer[ByteBuffer]() - -@@ -60,7 +60,7 @@ class MetadataResolver(pmofConf: PmofConf) { - blockBuffer.flip() - bufferArray += blockBuffer - output.close() -- val new_buffer = new Array[Byte](pmofConf.reduce_serializer_buffer_size.toInt) -+ val new_buffer = new Array[Byte](pmofConf.map_serializer_buffer_size.toInt) - output = new Output(new_buffer) - } - } -diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala -index 147ac27c..20727e34 100644 ---- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala -+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala -@@ -21,6 +21,7 @@ import org.apache.spark._ - import org.apache.spark.internal.Logging - import org.apache.spark.network.pmof.PmofTransferService - import org.apache.spark.scheduler.MapStatus -+import org.apache.spark.scheduler.pmof.UnCompressedMapStatus - import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter} - import org.apache.spark.storage._ - import org.apache.spark.util.collection.pmof.PmemExternalSorter -@@ -76,7 +77,7 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl - - if (dep.mapSideCombine) { // do aggregation - if (dep.aggregator.isDefined) { -- sorter = new PmemExternalSorter[K, V, C](context, handle, pmofConf, dep.aggregator, Some(dep.partitioner), -+ sorter = new PmemExternalSorter[K, V, C](context, handle, pmofConf, blockManager, dep.aggregator, Some(dep.partitioner), - dep.keyOrdering, dep.serializer) - sorter.setPartitionByteBufferArray(PmemBlockOutputStreamArray) - sorter.insertAll(records) -@@ -107,12 +108,19 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl - val pmemBlockInfoMap = mutable.HashMap.empty[Int, Array[(Long, Int)]] - var output_str : String = "" - -+ var rKey: Int = 0 - for (i <- spillPartitionArray) { -- if (pmofConf.enableRdma) { -- pmemBlockInfoMap(i) = PmemBlockOutputStreamArray(i).getPartitionMeta().map { info => (info._1, info._2) } -+ if (pmofConf.enableRdma && !pmofConf.enableRemotePmem) { -+ pmemBlockInfoMap(i) = PmemBlockOutputStreamArray(i).getPartitionMeta().map (info => { -+ if (rKey == 0) { -+ rKey = info._3 -+ } -+ //logInfo(s"${ShuffleBlockId(stageId, mapId, i)} [${rKey}]${info._1}:${info._2}") -+ (info._1, info._2) -+ }) - } - partitionLengths(i) = PmemBlockOutputStreamArray(i).size -- output_str += "\tPartition " + i + ": " + partitionLengths(i) + ", records: " + PmemBlockOutputStreamArray(i).records + "\n" -+ output_str += s"\t${ShuffleBlockId(stageId, mapId, i)}: ${partitionLengths(i)}, records: ${PmemBlockOutputStreamArray(i).records}\n" - } - - for (i <- 0 until numPartitions) { -@@ -120,6 +128,7 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl - } - - val shuffleServerId = blockManager.shuffleServerId -+ /** - if (pmofConf.enableRdma) { - val rkey = PmemBlockOutputStreamArray(0).getRkey() - metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey) -@@ -130,6 +139,23 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl - } else { - mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) - } -+ **/ -+ -+ if (pmofConf.enableRemotePmem) { -+ mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId) -+ //mapStatus = MapStatus(shuffleServerId, partitionLengths) -+ } else if (!pmofConf.enableRdma) { -+ mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) -+ } else { -+ metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rKey) -+ val blockManagerId: BlockManagerId = -+ BlockManagerId( -+ shuffleServerId.executorId, -+ PmofTransferService.shuffleNodesMap(shuffleServerId.host), -+ PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, -+ shuffleServerId.topologyInfo) -+ mapStatus = MapStatus(blockManagerId, partitionLengths, mapId) -+ } - } - - /** Close this writer, passing along whether the map completed */ -diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala -index 63b8241b..51a6b3bc 100644 ---- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala -+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmofShuffleManager.scala -@@ -21,7 +21,7 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager - */ - private[this] val taskIdMapsForShuffle = new ConcurrentHashMap[Int, OpenHashSet[Long]]() - -- private[this] val pmofConf = new PmofConf(conf) -+ private[this] val pmofConf = PmofConf.getConf(conf) - var metadataResolver: MetadataResolver = _ - - override def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle = { -@@ -62,9 +62,27 @@ private[spark] class PmofShuffleManager(conf: SparkConf) extends ShuffleManager - override def getReader[K, C](handle: _root_.org.apache.spark.shuffle.ShuffleHandle, startMapIndex: Int, endMapIndex: Int, startPartition: Int, endPartition: Int, context: _root_.org.apache.spark.TaskContext, readMetrics: ShuffleReadMetricsReporter): _root_.org.apache.spark.shuffle.ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId( - handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) -- if (pmofConf.enableRdma) { -- new RdmaShuffleReader(handle.asInstanceOf[BaseShuffleHandle[K, _, C]], -- startMapIndex, endMapIndex, startPartition, endPartition, context, pmofConf) -+ val env: SparkEnv = SparkEnv.get -+ if (pmofConf.enableRemotePmem) { -+ new RpmpShuffleReader( -+ handle.asInstanceOf[BaseShuffleHandle[K, _, C]], -+ startMapIndex, -+ endMapIndex, -+ startPartition, -+ endPartition, -+ context, -+ pmofConf) -+ } else if (pmofConf.enableRdma) { -+ metadataResolver = MetadataResolver.getMetadataResolver(pmofConf) -+ PmofTransferService.getTransferServiceInstance(pmofConf, env.blockManager, this) -+ new RdmaShuffleReader( -+ handle.asInstanceOf[BaseShuffleHandle[K, _, C]], -+ startMapIndex, -+ endMapIndex, -+ startPartition, -+ endPartition, -+ context, -+ pmofConf) - } else { - new BaseShuffleReader( - handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, startPartition, endPartition, context, readMetrics, pmofConf) -diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala -index c2270592..8a37a57c 100644 ---- a/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala -+++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/RdmaShuffleReader.scala -@@ -31,6 +31,7 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], - private[this] val dep = handle.dependency - private[this] val serializerInstance: SerializerInstance = dep.serializer.newInstance() - private[this] val enable_pmem: Boolean = SparkEnv.get.conf.getBoolean("spark.shuffle.pmof.enable_pmem", defaultValue = true) -+ private[this] val enable_rpmp: Boolean = SparkEnv.get.conf.getBoolean("spark.shuffle.pmof.enable_remote_pmem", defaultValue = true) - - /** Read the combined key-values for this reduce task */ - override def read(): Iterator[Product2[K, C]] = { -@@ -88,8 +89,8 @@ private[spark] class RdmaShuffleReader[K, C](handle: BaseShuffleHandle[K, _, C], - // Sort the output if there is a sort ordering defined. - dep.keyOrdering match { - case Some(keyOrd: Ordering[K]) => -- if (enable_pmem) { -- val sorter = new PmemExternalSorter[K, C, C](context, handle, pmofConf, ordering = Some(keyOrd), serializer = dep.serializer) -+ if (enable_pmem && !enable_rpmp) { -+ val sorter = new PmemExternalSorter[K, C, C](context, handle, pmofConf, blockManager, ordering = Some(keyOrd), serializer = dep.serializer) - sorter.insertAll(aggregatedIter) - CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop()) - } else { -diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala -index 5ad49d44..e7ebb84e 100644 ---- a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala -+++ b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala -@@ -40,6 +40,10 @@ object NettyByteBufferPool extends Logging { - } - } - -+ def allocateNewBuffer(): ByteBuf = synchronized { -+ allocator.directBuffer() -+ } -+ - def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized { - val byteBuf = allocator.directBuffer(65536, bufSize * 2) - bufferMap += (byteBuf -> bufSize) -diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala -index bde6ad44..ff92982c 100644 ---- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala -+++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockInputStream.scala -@@ -1,36 +1,40 @@ - package org.apache.spark.storage.pmof - -+import java.io.InputStream - import com.esotericsoftware.kryo.KryoException -+import java.io.IOException - import org.apache.spark.SparkEnv - import org.apache.spark.serializer.{DeserializationStream, Serializer, SerializerInstance, SerializerManager} - import org.apache.spark.storage.BlockId -+import org.apache.spark.util.configuration.pmof.PmofConf -+import org.apache.spark.internal.Logging - --class PmemBlockInputStream[K, C](pmemBlockOutputStream: PmemBlockOutputStream, serializer: Serializer) { -- val blockId: BlockId = pmemBlockOutputStream.getBlockId() -+trait PmemBlockInputStream[K, C] { -+ def readNextItem(): (K, C) -+} -+ -+class LocalPmemBlockInputStream[K, C]( -+ blockId: BlockId, -+ total_records: Long, -+ serializer: Serializer) -+ extends PmemBlockInputStream[K, C] { - val serializerManager: SerializerManager = SparkEnv.get.serializerManager - val serInstance: SerializerInstance = serializer.newInstance() -- val persistentMemoryWriter: PersistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler -+ val persistentMemoryWriter: PersistentMemoryHandler = -+ PersistentMemoryHandler.getPersistentMemoryHandler - var pmemInputStream: PmemInputStream = new PmemInputStream(persistentMemoryWriter, blockId.name) - val wrappedStream = serializerManager.wrapStream(blockId, pmemInputStream) - var inObjStream: DeserializationStream = serInstance.deserializeStream(wrappedStream) - -- var total_records: Long = 0 - var indexInBatch: Int = 0 - var closing: Boolean = false - -- loadStream() -- -- def loadStream(): Unit = { -- total_records = pmemBlockOutputStream.getTotalRecords() -- indexInBatch = 0 -- } -- - def readNextItem(): (K, C) = { - if (closing == true) { - close() - return null - } -- try{ -+ try { - val k = inObjStream.readObject().asInstanceOf[K] - val c = inObjStream.readObject().asInstanceOf[C] - indexInBatch += 1 -@@ -39,8 +43,7 @@ class PmemBlockInputStream[K, C](pmemBlockOutputStream: PmemBlockOutputStream, s - } - (k, c) - } catch { -- case ex: KryoException => { -- } -+ case ex: KryoException => {} - sys.exit(0) - } - } -@@ -51,3 +54,76 @@ class PmemBlockInputStream[K, C](pmemBlockOutputStream: PmemBlockOutputStream, s - inObjStream = null - } - } -+ -+class RemotePmemBlockInputStream[K, C]( -+ blockId: BlockId, -+ mapStatus: Seq[(String, Long, Int)], -+ serializer: Serializer, -+ pmofConf: PmofConf) -+ extends PmemBlockInputStream[K, C] -+ with Logging { -+ val serializerManager: SerializerManager = SparkEnv.get.serializerManager -+ val serInstance: SerializerInstance = serializer.newInstance() -+ val remotePersistentMemoryPool = -+ RemotePersistentMemoryPool.getInstance(pmofConf.rpmpHost, pmofConf.rpmpPort) -+ -+ var map_index: Int = 0 -+ var num_items: Int = 0 -+ var cur_num_items: Int = 0 -+ var inObjStream: DeserializationStream = _ -+ var buf: NioManagedBuffer = _ -+ var input: InputStream = _ -+ -+ def loadStream(): Unit = { -+ if (buf != null) { -+ inObjStream.close() -+ input.close() -+ buf.release() -+ } -+ if (map_index == mapStatus.size) { -+ inObjStream = null -+ } else { -+ num_items = mapStatus(map_index)._3 -+ buf = new NioManagedBuffer(mapStatus(map_index)._2.toInt) -+ logDebug(s"[GET started] ${mapStatus(map_index)._1}-${mapStatus(map_index)._2}") -+ var retry = 0 -+ while (remotePersistentMemoryPool.get( -+ mapStatus(map_index)._1, -+ mapStatus(map_index)._2, -+ buf.nioByteBuffer) == -1) { -+ logWarning( -+ s"${mapStatus(map_index)._1}-${mapStatus(map_index)._2} RPMem get failed due to time out, try again") -+ retry += 1 -+ if (retry == 4) { -+ throw new IOException( -+ s"${mapStatus(map_index)._1}-${mapStatus(map_index)._2} RPMem get failed due to time out") -+ } -+ } -+ logDebug(s"[GET Completed] ${mapStatus(map_index)._1}-${mapStatus(map_index)._2}") -+ val in = buf.createInputStream() -+ input = serializerManager.wrapStream(blockId, in) -+ inObjStream = serInstance.deserializeStream(input) -+ map_index += 1 -+ } -+ } -+ -+ def readNextItem(): (K, C) = { -+ try { -+ if (buf == null || cur_num_items >= num_items) { -+ loadStream() -+ cur_num_items = 0 -+ } -+ if (inObjStream == null) { -+ return null -+ } -+ val k = inObjStream.readObject().asInstanceOf[K] -+ val c = inObjStream.readObject().asInstanceOf[C] -+ cur_num_items += 1 -+ (k, c) -+ } catch { -+ case ex: KryoException => {} -+ sys.exit(0) -+ } -+ } -+ -+} -\ No newline at end of file -diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala -index cc70c718..eab50776 100644 ---- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala -+++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala -@@ -12,17 +12,19 @@ import org.apache.spark.util.configuration.pmof.PmofConf - - import scala.collection.mutable.ArrayBuffer - --class PmemBlockId (stageId: Int, tmpId: Int) extends ShuffleBlockId(stageId, 0, tmpId) { -- override def name: String = "reduce_spill_" + stageId + "_" + tmpId -+class PmemBlockId (executorId: String, stageId: Int, tmpName: String, tmpId: Int) extends ShuffleBlockId(stageId, 0, tmpId) { -+ override def name: String = s"reduce_spill_${executorId}_${tmpName}_${tmpId}" - override def isShuffle: Boolean = false - } - - object PmemBlockId { - private var tempId: Int = 0 -- def getTempBlockId(stageId: Int): PmemBlockId = synchronized { -- val cur_tempId = tempId -- tempId += 1 -- new PmemBlockId (stageId, cur_tempId) -+ def getTempBlockId(executorId: String, tmpName: String, stageId: Int): PmemBlockId = synchronized { -+ synchronized{ -+ val cur_tempId = tempId -+ tempId += 1 -+ new PmemBlockId (executorId, stageId, tmpName, cur_tempId) -+ } - } - } - -@@ -45,14 +47,22 @@ private[spark] class PmemBlockOutputStream( - var partitionMeta: Array[(Long, Int, Int)] = _ - val root_dir = Utils.getConfiguredLocalDirs(conf).toList.sortWith(_ < _)(0) - -- val persistentMemoryWriter: PersistentMemoryHandler = PersistentMemoryHandler.getPersistentMemoryHandler(pmofConf, -- root_dir, pmofConf.path_list, blockId.name, pmofConf.maxPoolSize) -+ var persistentMemoryWriter: PersistentMemoryHandler = _ -+ var remotePersistentMemoryPool: RemotePersistentMemoryPool = _ -+ val mapStatus: ArrayBuffer[(String, Long, Int)] = ArrayBuffer[(String, Long, Int)]() -+ -+ if (!pmofConf.enableRemotePmem) { -+ persistentMemoryWriter = PersistentMemoryHandler.getPersistentMemoryHandler(pmofConf, -+ root_dir, pmofConf.path_list, blockId.name, pmofConf.maxPoolSize) -+ } else { -+ remotePersistentMemoryPool = RemotePersistentMemoryPool.getInstance(pmofConf.rpmpHost, pmofConf.rpmpPort) -+ } - - //disable metadata updating by default - //persistentMemoryWriter.updateShuffleMeta(blockId.name) - - val pmemOutputStream: PmemOutputStream = new PmemOutputStream( -- persistentMemoryWriter, numPartitions, blockId.name, numMaps, (pmofConf.spill_throttle.toInt + 1024)) -+ persistentMemoryWriter, remotePersistentMemoryPool, numPartitions, blockId.name, numMaps, (pmofConf.spill_throttle.toInt + 1024)) - val serInstance = serializer.newInstance() - val bs = serializerManager.wrapStream(blockId, pmemOutputStream) - var objStream: SerializationStream = serInstance.serializeStream(bs) -@@ -82,15 +92,16 @@ private[spark] class PmemBlockOutputStream( - } - - def maybeSpill(force: Boolean = false): Unit = { -- if ((pmofConf.spill_throttle != -1 && pmemOutputStream.remainingSize >= pmofConf.spill_throttle) || force == true) { -+ if ((pmofConf.spill_throttle != -1 && pmemOutputStream.bufferRemainingSize >= pmofConf.spill_throttle) || force == true) { - val start = System.nanoTime() - flush() -- pmemOutputStream.doFlush() -+ //pmemOutputStream.doFlush() - val bufSize = pmemOutputStream.flushedSize -+ mapStatus += ((pmemOutputStream.flushed_block_id, bufSize, recordsPerBlock)) - if (bufSize > 0) { - recordsArray += recordsPerBlock - recordsPerBlock = 0 -- size += bufSize -+ size = bufSize - - if (blockId.isShuffle == true) { - val writeMetrics = taskMetrics.shuffleWriteMetrics -@@ -109,10 +120,29 @@ private[spark] class PmemBlockOutputStream( - spilled - } - -+ def getPartitionBlockInfo(res_array: Array[Long]): Array[(Long, Int, Int)] = { -+ var i = -3 -+ var blockInfo = Array.ofDim[(Long, Int)]((res_array.length)/3) -+ blockInfo.map{ -+ x => i += 3; -+ (res_array(i), res_array(i+1).toInt, res_array(i+2).toInt) -+ } -+ } -+ - def getPartitionMeta(): Array[(Long, Int, Int)] = { - if (partitionMeta == null) { - var i = -1 -- partitionMeta = persistentMemoryWriter.getPartitionBlockInfo(blockId.name).map{ x=> i+=1; (x._1, x._2, recordsArray(i))} -+ partitionMeta = if (!pmofConf.enableRemotePmem) { -+ persistentMemoryWriter.getPartitionBlockInfo(blockId.name).map ( x => { -+ i += 1 -+ (x._1, x._2, getRkey().toInt) -+ }) -+ } else { -+ getPartitionBlockInfo(remotePersistentMemoryPool.getMeta(blockId.name)).map( x => { -+ i += 1 -+ (x._1, x._2, x._3) -+ }) -+ } - } - partitionMeta - } -diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala -index 08c632e9..d4345ce3 100644 ---- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala -+++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala -@@ -2,12 +2,14 @@ package org.apache.spark.storage.pmof - - import java.io.OutputStream - import java.nio.ByteBuffer -+import java.io.IOException - - import io.netty.buffer.{ByteBuf, PooledByteBufAllocator} - import org.apache.spark.internal.Logging - - class PmemOutputStream( - persistentMemoryWriter: PersistentMemoryHandler, -+ remotePersistentMemoryPool: RemotePersistentMemoryPool, - numPartitions: Int, - blockId: String, - numMaps: Int, -@@ -15,11 +17,14 @@ class PmemOutputStream( - ) extends OutputStream with Logging { - var set_clean = true - var is_closed = false -+ var key_id = 0 - -- val length: Int = bufferSize -+ val length: Int = 1024 * 1024 * 6 - var bufferFlushedSize: Int = 0 - var bufferRemainingSize: Int = 0 - val buf: ByteBuf = NettyByteBufferPool.allocateFlexibleNewBuffer(length) -+ var flushed_block_id: String = _ -+ var cur_block_id: String = blockId - - override def write(bytes: Array[Byte], off: Int, len: Int): Unit = { - buf.writeBytes(bytes, off, len) -@@ -32,13 +37,33 @@ class PmemOutputStream( - } - - override def flush(): Unit = { -- } -- -- def doFlush(): Unit = { - if (bufferRemainingSize > 0) { -- val byteBuffer: ByteBuffer = buf.nioBuffer() -- persistentMemoryWriter.setPartition(numPartitions, blockId, byteBuffer, bufferRemainingSize, set_clean) -- logDebug(s"flush ${blockId} size ${bufferRemainingSize}") -+ if (remotePersistentMemoryPool != null) { -+ logDebug(s" [PUT Started]${cur_block_id}-${bufferRemainingSize}") -+ val byteBuffer: ByteBuffer = buf.nioBuffer() -+ var retry = 0 -+ while (remotePersistentMemoryPool.put(cur_block_id, byteBuffer, bufferRemainingSize) == -1) { -+ logWarning( -+ s"${cur_block_id}-${bufferRemainingSize} RPMem put failed due to time out, try again") -+ retry += 1 -+ if (retry == 4) { -+ throw new IOException( -+ s"${cur_block_id}-${bufferRemainingSize} RPMem put failed due to time out.") -+ } -+ } -+ logDebug(s" [PUT Completed]${cur_block_id}-${bufferRemainingSize}") -+ key_id += 1 -+ flushed_block_id = cur_block_id -+ cur_block_id = s"${blockId}_${key_id}" -+ } else { -+ val byteBuffer: ByteBuffer = buf.nioBuffer() -+ persistentMemoryWriter.setPartition( -+ numPartitions, -+ blockId, -+ byteBuffer, -+ bufferRemainingSize, -+ set_clean) -+ } - bufferFlushedSize += bufferRemainingSize - bufferRemainingSize = 0 - } -@@ -47,6 +72,10 @@ class PmemOutputStream( - } - } - -+ def doFlush(): Unit = { -+ -+ } -+ - def flushedSize(): Int = { - bufferFlushedSize - } -diff --git a/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala -index 1b462542..3c4e47d0 100644 ---- a/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala -+++ b/core/src/main/scala/org/apache/spark/util/collection/pmof/PmemExternalSorter.scala -@@ -1,6 +1,7 @@ - package org.apache.spark.util.collection.pmof - - import java.util.Comparator -+import java.util.UUID - - import scala.collection.mutable - import scala.collection.mutable.ArrayBuffer -@@ -12,11 +13,13 @@ import org.apache.spark.util.collection._ - import org.apache.spark.storage.pmof._ - import org.apache.spark.util.configuration.pmof.PmofConf - import org.apache.spark.util.{CompletionIterator, Utils => TryUtils} -+import org.apache.spark.storage.BlockManager - - private[spark] class PmemExternalSorter[K, V, C]( - context: TaskContext, - handle: BaseShuffleHandle[K, _, C], - pmofConf: PmofConf, -+ blockManager: BlockManager, - aggregator: Option[Aggregator[K, V, C]] = None, - partitioner: Option[Partitioner] = None, - ordering: Option[Ordering[K]] = None, -@@ -59,9 +62,10 @@ private[spark] class PmemExternalSorter[K, V, C]( - if (mapStage) { - pmemBlockOutputStreamArray(partitionId) - } else { -+ val tmpBlockName = s"${UUID.randomUUID()}" - pmemBlockOutputStreamArray += new PmemBlockOutputStream( - context.taskMetrics(), -- PmemBlockId.getTempBlockId(stageId), -+ PmemBlockId.getTempBlockId(blockManager.blockManagerId.executorId, tmpBlockName, stageId), - SparkEnv.get.serializerManager, - serializer, - SparkEnv.get.conf, -@@ -331,7 +335,18 @@ private[spark] class PmemExternalSorter[K, V, C]( - class SpillReader(pmemBlockOutputStream: PmemBlockOutputStream) { - // Each spill reader is relate to one partition - // which is different from spark original codes (relate to one spill file) -- val pmemBlockInputStream = new PmemBlockInputStream[K, C](pmemBlockOutputStream, serializer) -+ val pmemBlockInputStream = if (!pmofConf.enableRemotePmem) { -+ new LocalPmemBlockInputStream[K, C]( -+ pmemBlockOutputStream.getBlockId, -+ pmemBlockOutputStream.getTotalRecords, -+ serializer) -+ } else { -+ new RemotePmemBlockInputStream[K, C]( -+ pmemBlockOutputStream.getBlockId, -+ pmemBlockOutputStream.mapStatus, -+ serializer, -+ pmofConf) -+ } - var nextItem: (K, C) = _ - - def readPartitionIter(partitionId: Int): Iterator[Product2[K, C]] = new Iterator[Product2[K, C]] { -diff --git a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala -index fe8c0453..e39af6f1 100644 ---- a/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala -+++ b/core/src/main/scala/org/apache/spark/util/configuration/pmof/PmofConf.scala -@@ -29,4 +29,25 @@ class PmofConf(conf: SparkConf) { - val pmemCoreMap = conf.get("spark.shuffle.pmof.dev_core_set", defaultValue = "/dev/dax0.0:0-17,36-53").split(";").map(_.trim).map(_.split(":")).map(arr => arr(0) -> arr(1)).toMap - val fileEmptyTimeout: Int = conf.getInt("spark.shuffle.pmof.file_empty_timeout", defaultValue = 30) - val fileEmptyInterval: Int = conf.getInt("spark.shuffle.pmof.file_empty_interval", defaultValue = 5) -+ val enableRemotePmem: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_remote_pmem", defaultValue = false); -+ val enableRemotePmemSort: Boolean = conf.getBoolean("spark.shuffle.pmof.enable_remote_pmem_sort", defaultValue = false); -+ val rpmpHost: String = conf.get("spark.rpmp.rhost", defaultValue = "172.168.0.40") -+ val rpmpPort: String = conf.get("spark.rpmp.rport", defaultValue = "61010") - } -+ -+object PmofConf { -+ var ins: PmofConf = null -+ def getConf(conf: SparkConf): PmofConf = -+ if (ins == null) { -+ ins = new PmofConf(conf) -+ ins -+ } else { -+ ins -+ } -+ def getConf: PmofConf = -+ if (ins == null) { -+ throw new IllegalStateException("PmofConf is not initialized yet") -+ } else { -+ ins -+ } -+} -\ No newline at end of file -diff --git a/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterSuite.scala -index 09740132..0bc8e16a 100644 ---- a/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterSuite.scala -+++ b/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterSuite.scala -@@ -50,7 +50,7 @@ class PmemShuffleWriterSuite extends SparkFunSuite with SharedSparkContext with - conf.set("spark.shuffle.pmof.pmem_list", "/dev/dax0.0") - shuffleBlockResolver = new PmemShuffleBlockResolver(conf) - serializer = new JavaSerializer(conf) -- pmofConf = new PmofConf(conf) -+ pmofConf = PmofConf.getConf(conf) - taskMetrics = new TaskMetrics() - serializerManager = new SerializerManager(serializer, conf) - -diff --git a/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterWithSortSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterWithSortSuite.scala -index 15048d4f..5ec88ada 100644 ---- a/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterWithSortSuite.scala -+++ b/core/src/test/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriterWithSortSuite.scala -@@ -55,7 +55,7 @@ class PmemShuffleWriterWithSortSuite extends SparkFunSuite with SharedSparkConte - conf.set("spark.shuffle.pmof.pmem_list", "/dev/dax0.0") - shuffleBlockResolver = new PmemShuffleBlockResolver(conf) - serializer = new JavaSerializer(conf) -- pmofConf = new PmofConf(conf) -+ pmofConf = PmofConf.getConf(conf) - taskMetrics = new TaskMetrics() - serializerManager = new SerializerManager(serializer, conf) - diff --git a/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala index da368c0e..98391624 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/pmof/UnCompressedMapStatus.scala @@ -27,7 +27,7 @@ private[spark] class UnCompressedMapStatus( val step = 8 def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = { - this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId) + this(loc, uncompressedSizes.flatMap(UnCompressedMapStatus.longToBytes), mapTaskId) } override def updateLocation(newLoc: BlockManagerId): Unit = { diff --git a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala index 20727e34..7453d6f6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/pmof/PmemShuffleWriter.scala @@ -33,6 +33,8 @@ import org.apache.spark.storage.BlockManager import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import java.security.MessageDigest + private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffleBlockResolver, metadataResolver: MetadataResolver, blockManager: BlockManager, @@ -128,22 +130,9 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl } val shuffleServerId = blockManager.shuffleServerId - /** - if (pmofConf.enableRdma) { - val rkey = PmemBlockOutputStreamArray(0).getRkey() - metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey) - val blockManagerId: BlockManagerId = - BlockManagerId(shuffleServerId.executorId, PmofTransferService.shuffleNodesMap(shuffleServerId.host), - PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, shuffleServerId.topologyInfo) - mapStatus = MapStatus(blockManagerId, partitionLengths, mapId) - } else { - mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) - } - **/ if (pmofConf.enableRemotePmem) { mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId) - //mapStatus = MapStatus(shuffleServerId, partitionLengths) } else if (!pmofConf.enableRdma) { mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId) } else { @@ -156,6 +145,12 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl shuffleServerId.topologyInfo) mapStatus = MapStatus(blockManagerId, partitionLengths, mapId) } + /** + For debug usage + logInfo( + s" shuffle_${stageId}_${mapId}_0 size is ${partitionLengths(0)}, decompressed size is ${mapStatus + .getSizeForBlock(0)}") + **/ } /** Close this writer, passing along whether the map completed */ @@ -180,4 +175,11 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl } } } + + /** + * For debug usage + **/ + def md5(s: String) = { + MessageDigest.getInstance("MD5").digest(s.getBytes) + } } diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala index e7ebb84e..5ad49d44 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/NettyByteBufferPool.scala @@ -40,10 +40,6 @@ object NettyByteBufferPool extends Logging { } } - def allocateNewBuffer(): ByteBuf = synchronized { - allocator.directBuffer() - } - def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized { val byteBuf = allocator.directBuffer(65536, bufSize * 2) bufferMap += (byteBuf -> bufSize) diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala index 55430bf0..976f15e7 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemBlockOutputStream.scala @@ -95,13 +95,13 @@ private[spark] class PmemBlockOutputStream( if ((pmofConf.spill_throttle != -1 && pmemOutputStream.bufferRemainingSize >= pmofConf.spill_throttle) || force == true) { val start = System.nanoTime() flush() - //pmemOutputStream.doFlush() + pmemOutputStream.doFlush() val bufSize = pmemOutputStream.flushedSize mapStatus += ((pmemOutputStream.flushed_block_id, bufSize, recordsPerBlock)) if (bufSize > 0) { recordsArray += recordsPerBlock recordsPerBlock = 0 - size += bufSize + size = bufSize if (blockId.isShuffle == true) { val writeMetrics = taskMetrics.shuffleWriteMetrics diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala index 98efe403..5be7d9f2 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/PmemOutputStream.scala @@ -19,7 +19,7 @@ class PmemOutputStream( var is_closed = false var key_id = 0 - val length: Int = 1024 * 1024 * 6 + val length: Int = bufferSize var bufferFlushedSize: Int = 0 var bufferRemainingSize: Int = 0 val buf: ByteBuf = NettyByteBufferPool.allocateFlexibleNewBuffer(length) @@ -37,6 +37,9 @@ class PmemOutputStream( } override def flush(): Unit = { + } + + def doFlush(): Unit = { if (bufferRemainingSize > 0) { if (remotePersistentMemoryPool != null) { logDebug(s" [PUT Started]${cur_block_id}-${bufferRemainingSize}") @@ -55,6 +58,7 @@ class PmemOutputStream( key_id += 1 flushed_block_id = cur_block_id cur_block_id = s"${blockId}_${key_id}" + logDebug(s" [PUT Completed]${blockId}-${bufferRemainingSize}, ${NettyByteBufferPool.dump(byteBuffer, bufferRemainingSize)}") } else { val byteBuffer: ByteBuffer = buf.nioBuffer() persistentMemoryWriter.setPartition( @@ -73,10 +77,6 @@ class PmemOutputStream( } } - def doFlush(): Unit = { - - } - def flushedSize(): Int = { bufferFlushedSize } diff --git a/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala index c26f6d9f..4ff9bc9e 100644 --- a/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/pmof/RpmpShuffleBlockFetcherIterator.scala @@ -140,14 +140,18 @@ private[spark] final class RpmpShuffleBlockFetcherIterator( private[this] var address: BlockManagerId = _ private[this] var blockInfos: Seq[(BlockId, Long, Int)] = _ private[this] var iterator: Iterator[(BlockId, Long, Int)] = _ + private[this] var blocksByAddressSeq: Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = _ initialize() def initialize(): Unit = { context.addTaskCompletionListener[Unit](_ => cleanup()) - blocksByAddressSize = blocksByAddress.size + + blocksByAddressSeq = blocksByAddress.toSeq + blocksByAddressSize = blocksByAddressSeq.size + if (blocksByAddressCurrentId < blocksByAddressSize) { - val res = blocksByAddress.toSeq(blocksByAddressCurrentId) + val res = blocksByAddressSeq(blocksByAddressCurrentId) address = res._1 blockInfos = res._2 iterator = blockInfos.iterator @@ -228,7 +232,7 @@ private[spark] final class RpmpShuffleBlockFetcherIterator( has_next = false return has_next } - val res = blocksByAddress.toSeq(blocksByAddressCurrentId) + val res = blocksByAddressSeq(blocksByAddressCurrentId) address = res._1 blockInfos = res._2 iterator = blockInfos.iterator diff --git a/rpmp/CMakeLists.txt b/rpmp/CMakeLists.txt index 6dbd049c..1784b913 100644 --- a/rpmp/CMakeLists.txt +++ b/rpmp/CMakeLists.txt @@ -48,13 +48,11 @@ file(COPY bin/stop-rpmp.sh DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}) add_executable(data-server main.cc) add_executable(chashtest test/chash_test.cc) -add_executable(rocksdbtest test/rocksdb_test.cc) add_executable(proxy-server ProxyMain.cc) add_executable(client Client.cc) -target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(data-server pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(client pmpool spdlog hiredis jsoncpp rocksdb redis++) -target_link_libraries(rocksdbtest spdlog hiredis jsoncpp rocksdb redis++) +target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(data-server pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp redis++) +target_link_libraries(client pmpool spdlog hiredis jsoncpp redis++) file(COPY ${PROJECT_SOURCE_DIR}/config DESTINATION .) diff --git a/rpmp/README.md b/rpmp/README.md index e2bf3bb4..94283fa0 100644 --- a/rpmp/README.md +++ b/rpmp/README.md @@ -17,8 +17,10 @@ Hardware: Software: - [HPNL](https://github.com/Intel-bigdata/HPNL) - [hiredis](https://github.com/redis/hiredis) + - [redis-plus-plus](https://github.com/sewenew/redis-plus-plus.git) - [jsoncpp](https://github.com/open-source-parsers/jsoncpp) - [PMDK](https://github.com/pmem/pmdk.git) + - [boost](https://www.boost.org/) ## Hardware Enabling @@ -76,16 +78,31 @@ make && make install ``` hiredis: +``` git clone https://github.com/redis/hiredis cd hiredis mkdir build; cd build +cmake .. make && make install +``` + +Redis-plus-plus: +``` +git clone https://github.com/sewenew/redis-plus-plus.git +cd redis-plus-plus +mkdir build +cd build +cmake .. +make +make install +``` jsoncpp: ``` git clone https://github.com/open-source-parsers/jsoncpp.git cd jsoncpp mkdir build; cd build +cmake .. make && make install ``` @@ -111,13 +128,16 @@ make make install ``` +boost: +``` +yum install boost-devel +``` ### Build for C/C++ ``` -git clone https://github.com/Intel-bigdata/Spark-PMoF.git +git clone https://github.com/oap-project/pmem-shuffle.git +cd pmem-shuffle git submodule update --init --recursive -git submodule add -b master https://github.com/redis/hiredis.git rpmp/include/hiredis -git submodule add -b master https://github.com/open-source-parsers/jsoncpp.git rpmp/include/jsoncpp cd rpmp mkdir build cd build diff --git a/rpmp/bin/start-rpmp.sh b/rpmp/bin/start-rpmp.sh new file mode 100755 index 00000000..9c45c5f5 --- /dev/null +++ b/rpmp/bin/start-rpmp.sh @@ -0,0 +1,72 @@ +#!/usr/bin/env bash + +if [ -L ${BASH_SOURCE-$0} ]; then + FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}")) +else + FWDIR=$(dirname "${BASH_SOURCE-$0}") +fi + +if [[ -z "${RPMP_HOME}" ]]; then + export RPMP_HOME=$(cd "${FWDIR}/.."; pwd) +fi +export BIN_HOME=$RPMP_HOME/bin +export CONFIG_HOME=$RPMP_HOME/config +export LOG_HOME=$RPMP_HOME/log +PROXY_SERVER_LOG_PATH=$LOG_HOME/proxy-server.log +DATA_SERVER_LOG_PATH=$LOG_HOME/data-server.log +PROXY_SERVER_PID_PATH=/tmp/rpmp-proxy-server.pid +DATA_SERVER_PID_PATH=/tmp/rpmp-data-server.pid + +#Keep this arg for future use. +USER_VARGS= +while [ $# != 0 ]; do + case "$1" in + "--help" | "-h") + echo "--help -h Show this usage information" + exit + shift + ;; + *) + USER_VARGS+=" $1" + shift + ;; + esac +done + +CONFIG_FILE="$CONFIG_HOME/rpmp.conf" +PROXY_ADDR_KEY="rpmp.network.proxy.address" +SERVER_ADDR_KEY="rpmp.network.server.address" +PROXY_ADDR= +SERVER_ADDR= + +while IFS= read -r line; do + tokens=( $line ) + if [ "${tokens[0]}" = "$PROXY_ADDR_KEY" ]; then + PROXY_ADDR=${tokens[1]} + elif [ "${tokens[0]}" = "$SERVER_ADDR_KEY" ]; then + SERVER_ADDR=${tokens[1]} + fi +done < $CONFIG_FILE + +#TODO: check if current process is running + +#Separate address by ','. +IFS=',' +#Start RPMP proxy +for addr in $PROXY_ADDR; do + echo "Starting RPMP proxy on $addr.." + #Pass addr to RPMP proxy + ssh $addr "cd ${BIN_HOME}; mkdir -p ${LOG_HOME}; \ + ./proxy-server --current_proxy_addr $addr --log ${PROXY_SERVER_LOG_PATH} >> ${PROXY_SERVER_LOG_PATH} & \ + echo \$! > ${PROXY_SERVER_PID_PATH}" +done + +# TODO: parse addr in main.cc +#Start RPMP server +for addr in $SERVER_ADDR; do + echo "Starting RPMP server on $addr.." + #Pass addr to RPMP server + ssh $addr "cd ${BIN_HOME}; mkdir -p ${LOG_HOME}; \ + ./data-server --address $addr --log ${DATA_SERVER_LOG_PATH} >> ${DATA_SERVER_LOG_PATH} & \ + echo \$! > ${DATA_SERVER_PID_PATH}" +done \ No newline at end of file diff --git a/rpmp/bin/stop-rpmp.sh b/rpmp/bin/stop-rpmp.sh new file mode 100755 index 00000000..b74b5cdd --- /dev/null +++ b/rpmp/bin/stop-rpmp.sh @@ -0,0 +1,48 @@ +#!/usr/bin/env bash + +if [ -L ${BASH_SOURCE-$0} ]; then + FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}")) +else + FWDIR=$(dirname "${BASH_SOURCE-$0}") +fi + +if [[ -z "${RPMP_HOME}" ]]; then + export RPMP_HOME=$(cd "${FWDIR}/.."; pwd) +fi +export BIN_HOME=$RPMP_HOME/bin +export CONFIG_HOME=$RPMP_HOME/config +PROXY_PID_FILE_PATH=/tmp/rpmp-proxy.pid +SERVER_PID_FILE_PATH=/tmp/rpmp-server.pid + +CONFIG_FILE="$CONFIG_HOME/rpmp.conf" +PROXY_ADDR_KEY="rpmp.network.proxy.address" +SERVER_ADDR_KEY="rpmp.network.server.address" +PROXY_ADDR= +SERVER_ADDR= + +while IFS= read -r line; do + tokens=( $line ) + if [ "${tokens[0]}" = "$PROXY_ADDR_KEY" ]; then + PROXY_ADDR=${tokens[1]} + elif [ "${tokens[0]}" = "$SERVER_ADDR_KEY" ]; then + SERVER_ADDR=${tokens[1]} + fi +done < $CONFIG_FILE + +. "$BIN_HOME/common.sh" + +#Separate address by ','. +IFS=',' + +#Stop RPMP server +for addr in $SERVER_ADDR; do + echo "Stopping RPMP server on $addr.." + ssh $addr "PID=\$(cat SERVER_PID_FILE_PATH 2>/dev/null); kill -9 \$PID > /dev/null 2>&1" +done + +#Stop RPMP proxy +for addr in $PROXY_ADDR; do + echo "Stopping RPMP proxy on $addr.." + #Pass addr to RPMP proxy + ssh $addr "PID=\$(cat $PROXY_PID_FILE_PATH 2>/dev/null); kill -9 \$PID > /dev/null 2>&1" +done diff --git a/rpmp/pmpool/CMakeLists.txt b/rpmp/pmpool/CMakeLists.txt index ce8c8194..23cd94ec 100644 --- a/rpmp/pmpool/CMakeLists.txt +++ b/rpmp/pmpool/CMakeLists.txt @@ -2,7 +2,7 @@ add_library(pmpool_client_jni SHARED Event.cc ProxyEvent.cc client/PmPoolClient. target_link_libraries(pmpool_client_jni LINK_PUBLIC ${Boost_LIBRARIES} hpnl) set_target_properties(pmpool_client_jni PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") -add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc proxy/metastore/rocksdb/Rocks.cc proxy/metastore/rocksdb/RocksConnection.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc) +add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc) target_link_libraries(pmpool LINK_PUBLIC ${Boost_LIBRARIES} hpnl pmemobj) set_target_properties(pmpool PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib") diff --git a/rpmp/pmpool/proxy/PhysicalNode.h b/rpmp/pmpool/proxy/PhysicalNode.h index f55c197f..7b7d315b 100644 --- a/rpmp/pmpool/proxy/PhysicalNode.h +++ b/rpmp/pmpool/proxy/PhysicalNode.h @@ -25,6 +25,14 @@ class PhysicalNode { string getPort() { return port; } + void setIp(string ip){ + this->ip = ip; + } + + void setPort(string port){ + this->port = port; + } + private: string ip; string port; diff --git a/rpmp/pmpool/proxy/clientService/ClientService.cc b/rpmp/pmpool/proxy/clientService/ClientService.cc index 4eaac9ee..f584ca49 100644 --- a/rpmp/pmpool/proxy/clientService/ClientService.cc +++ b/rpmp/pmpool/proxy/clientService/ClientService.cc @@ -105,6 +105,65 @@ void ClientService::addRecords(uint64_t key, unordered_setset(to_string(key), json_str); } +/** +* Get available nodes from metastore +**/ +std::unordered_set ClientService::getNodes(uint64_t key){ + std::unordered_set nodes; + + int retry = 10; + int timeout = 1; + while(retry > 0){ + std::string key_str = to_string(key); + auto rawJson = metastore_->get(key_str); + + const auto rawJsonLength = static_cast(rawJson.length()); + JSONCPP_STRING err; + Json::Value root; + + Json::CharReaderBuilder builder; + const std::unique_ptr reader(builder.newCharReader()); + if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, + &err)) { + #ifdef DEBUG + std::cout << "key: " << key <setIp(ip); + node->setPort("12346"); + nodes.insert(*node); + } + } + } + + if(retry == 0){ + std::cout << "ClientService::Error occurred in getNodes with multiples retrys for key: " << key << std::endl; + } + + return nodes; +} + + void ClientService::enqueue_recv_msg(std::shared_ptr request) { worker_->addTask(request); // ProxyRequestContext rc = request->get_rc(); @@ -144,7 +203,16 @@ void ClientService::handle_recv_msg(std::shared_ptr request) { break; } case GET_REPLICA: { - auto nodes = proxyServer_->getReplica(rc.key); + auto nodes = getNodes(rc.key); + /** Debug usage + cout<<"/////////start//////"<{ * NODE: * STATUS: * SIZE: - * + * } **/ const string JOB_STATUS = "JOB_STATUS"; const string NODES = "NODES"; @@ -113,9 +113,9 @@ class ClientService : public std::enable_shared_from_this{ const string SIZE = "SIZE"; void enqueue_finalize_msg(std::shared_ptr reply); void handle_finalize_msg(std::shared_ptr reply); - // std::vector> workers_; void constructJobStatus(Json::Value record, uint64_t key); void addRecords(uint64_t key, unordered_set nodes); + std::unordered_set getNodes(uint64_t key); std::shared_ptr worker_; std::shared_ptr chunkMgr_; diff --git a/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc b/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc index 537ddeaf..503cad9b 100644 --- a/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc +++ b/rpmp/pmpool/proxy/metastore/ConnectionFacade.cc @@ -4,14 +4,9 @@ #include #include -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - #include "redis/Redis.h" using namespace std; -using namespace ROCKSDB_NAMESPACE; ConnectionFacade::ConnectionFacade(std::shared_ptr config, std::shared_ptr log,string type){ config_ = config; @@ -19,72 +14,24 @@ ConnectionFacade::ConnectionFacade(std::shared_ptr config, std::shared_p type_ = type; } -//RocksDB -int ConnectionFacade::connect(string DBPath){ - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, DBPath, &db_); - - if (s.ok() == true){ - setConnected(true); - return 0; - } - setConnected(false); - return -1; -} - string ConnectionFacade::put(string key, string value){ - if(type_ == ROCKS){ - Status s = db_->Put(WriteOptions(), key, value); - return s.ToString(); - }else{ - return redis_->set(key, value); - } + return redis_->set(key, value); } string ConnectionFacade::get(string key){ - if(type_ == ROCKS){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - return value; - }else{ - return redis_->get(key); - } + return redis_->get(key); } int ConnectionFacade::exists(string key){ - if(type_ == ROCKS){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - if(s.ok()){ - return 1; - } - return 0; - }else{ - return redis_->exists(key); - } + return redis_->exists(key); } std::unordered_set ConnectionFacade::scanAll(){ - if(type_ == ROCKS){ - //Do nothing - }else{ - return redis_->scanAll(); - } + return redis_->scanAll(); } std::unordered_set ConnectionFacade::scan(std::string pattern){ - if(type_ == ROCKS){ - //Do nothing - }else{ - return redis_->scan(pattern); - } + return redis_->scan(pattern); } //Redis diff --git a/rpmp/pmpool/proxy/metastore/ConnectionFacade.h b/rpmp/pmpool/proxy/metastore/ConnectionFacade.h index 7da576f8..d5d20d10 100644 --- a/rpmp/pmpool/proxy/metastore/ConnectionFacade.h +++ b/rpmp/pmpool/proxy/metastore/ConnectionFacade.h @@ -4,24 +4,19 @@ #include #include -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" #include "redis/Redis.h" #include "pmpool/Config.h" #include "pmpool/RLog.h" using namespace std; -using namespace ROCKSDB_NAMESPACE; /** - * Facade for connection to Redis and RocksDB + * Facade for connection to Redis * **/ class ConnectionFacade: public std::enable_shared_from_this{ public: - // RocksDB ConnectionFacade(std::shared_ptr config, std::shared_ptr log, string type); // Redis int connect(); @@ -40,10 +35,7 @@ class ConnectionFacade: public std::enable_shared_from_this{ bool connected_; int setConnected(bool connected); string type_; - string ROCKS = "ROCKS"; string REDIS = "REDIS"; - // RocksDB - DB *db_; // Redis shared_ptr redis_; }; diff --git a/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc b/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc index 01884835..354cc548 100644 --- a/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc +++ b/rpmp/pmpool/proxy/metastore/MetastoreFacade.cc @@ -2,7 +2,6 @@ #include "../../Config.h" #include "MetastoreFacade.h" -#include "rocksdb/Rocks.h" #include "redis/Redis.h" #include "json/json.h" @@ -17,12 +16,7 @@ MetastoreFacade::MetastoreFacade(std::shared_ptr config, std::shared_ptr bool MetastoreFacade::connect() { int res = 0; - if(type_ == ROCKS){ - string DBPath = "/tmp/rocksdb_simple_example"; - res = connection_->connect(DBPath); - }else if(type_ == REDIS){ - res = connection_->connect(); - } + res = connection_->connect(); if (res == 0) { log_->get_console_log()->info("Successfully connected to metastore database"); return true; diff --git a/rpmp/pmpool/proxy/metastore/MetastoreFacade.h b/rpmp/pmpool/proxy/metastore/MetastoreFacade.h index 3bcb1388..dae5ad0f 100644 --- a/rpmp/pmpool/proxy/metastore/MetastoreFacade.h +++ b/rpmp/pmpool/proxy/metastore/MetastoreFacade.h @@ -10,7 +10,7 @@ class Config; /** - * Facade for metastore, either Redis or RocksDB + * Facade for metastore * **/ class MetastoreFacade: public std::enable_shared_from_this{ @@ -31,7 +31,6 @@ class MetastoreFacade: public std::enable_shared_from_this{ std::string port_; std::string type_; std::string REDIS = "REDIS"; - std::string ROCKS = "ROCKS"; }; #endif \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/redis/Redis.cc b/rpmp/pmpool/proxy/metastore/redis/Redis.cc index 88e16092..83f27a4d 100644 --- a/rpmp/pmpool/proxy/metastore/redis/Redis.cc +++ b/rpmp/pmpool/proxy/metastore/redis/Redis.cc @@ -21,8 +21,15 @@ Redis::Redis(std::shared_ptr config, std::shared_ptr log){ **/ bool Redis::connect() { // Create an Redis object, which is movable but NOT copyable. - string connection_str = "tcp://" + address_ + ":" + port_; - redis_ = new sw::redis::Redis(connection_str); + sw::redis::ConnectionOptions connection_options; + connection_options.host = address_; // Required. + connection_options.port = stoi(port_); // Optional. The default port is 6379. + + sw::redis::ConnectionPoolOptions pool_options; + pool_options.size = 5; // Pool size, i.e. max number of connections. + + //string connection_str = "tcp://" + address_ + ":" + port_; + redis_ = new sw::redis::Redis(connection_options, pool_options); return true; } diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc b/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc deleted file mode 100644 index 247c5c79..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.cc +++ /dev/null @@ -1,35 +0,0 @@ -#include -#include "../../../Config.h" - -#include "Rocks.h" -#include "json/json.h" - -Rocks::Rocks(std::shared_ptr config, std::shared_ptr log){ - log_ = log; - config_ = config; - rocksConnection_ = std::make_shared(); -} - -bool Rocks::connect(string DBPath) { - int res = rocksConnection_->connect(DBPath); - if (res == 0) { - log_->get_console_log()->info("Successfully connected to rocksdb database"); - return true; - } - log_->get_console_log()->error("Failed to connect to rocksdb database"); - return false; -} - -string Rocks::set(string key, string value){ - rocksConnection_->put(key, value); - return ""; -} - -string Rocks::get(string key){ - string value = rocksConnection_->get(key); - return value; -}; - -int Rocks::exists(string key){ - return rocksConnection_->exists(key); -} \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h b/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h deleted file mode 100644 index 98512f02..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/Rocks.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef SPARK_PMOF_ROCKS_H -#define SPARK_PMOF_ROCKS_H - -#include - -#include "pmpool/Config.h" -#include "pmpool/RLog.h" -#include "RocksConnection.h" - -class Config; - -class Rocks: public std::enable_shared_from_this{ -public: - Rocks(std::shared_ptr configs, std::shared_ptr log); - bool connect(string DBPath); - string set(string key, string value); - string get(string key); - int exists(string key); - -private: - std::shared_ptr rocksConnection_; - std::shared_ptr config_; - std::shared_ptr log_; - std::string address_; - std::string port_; -}; - -#endif \ No newline at end of file diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc b/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc deleted file mode 100644 index 17bad80e..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.cc +++ /dev/null @@ -1,60 +0,0 @@ -#include "RocksConnection.h" - -#include -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace std; -using namespace ROCKSDB_NAMESPACE; - -int RocksConnection::connect(string DBPath){ - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - - // open DB - Status s = DB::Open(options, DBPath, &db_); - - if (s.ok() == true){ - setConnected(true); - return 0; - } - setConnected(false); - return -1; -} - -bool RocksConnection::isConnected(){ - return connected_; -} - -int RocksConnection::setConnected(bool connected){ - connected_ = connected; - return 0; -} - -Status RocksConnection::put(string key, string value){ - Status s = db_->Put(WriteOptions(), key, value); - return s; -} - -string RocksConnection::get(string key){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - return value; -} - -int RocksConnection::exists(string key){ - string value; - Status s = db_->Get(ReadOptions(), key, &value); - if(s.ok()){ - return 1; - } - return 0; -} diff --git a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h b/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h deleted file mode 100644 index 4cf3061f..00000000 --- a/rpmp/pmpool/proxy/metastore/rocksdb/RocksConnection.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef SPARK_PMOF_ROCKSCONNECTION_H -#define SPARK_PMOF_ROCKSCONNECTION_H - -#include -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace std; -using namespace ROCKSDB_NAMESPACE; - -class RocksConnection: public std::enable_shared_from_this{ -public: - int connect(string DBPath); - bool isConnected(); - Status put(string key, string value); - string get(string key); - int exists(string key); -private: - bool connected_; - int setConnected(bool connected); - DB *db_; -}; - -#endif \ No newline at end of file diff --git a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc index 7c8875cf..1b220181 100644 --- a/rpmp/pmpool/proxy/replicaService/ReplicaService.cc +++ b/rpmp/pmpool/proxy/replicaService/ReplicaService.cc @@ -69,43 +69,57 @@ void ReplicaService::enqueue_recv_msg(std::shared_ptr request) { * Update data status once it's been put to the node successfully **/ void ReplicaService::updateRecord(uint64_t key, PhysicalNode node, uint64_t size){ - string rawJson = metastore_->get(to_string(key)); - #ifdef DEBUG - cout<(rawJson.length()); - JSONCPP_STRING err; - Json::Value root; - - Json::CharReaderBuilder builder; - const std::unique_ptr reader(builder.newCharReader()); - if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, - &err)) { - #ifndef DEBUG - std::cout << "key: " << key < 0){ + std::string key_str = to_string(key); + auto rawJson = metastore_->get(key_str); + #ifdef DEBUG + cout<(rawJson.length()); + JSONCPP_STRING err; + Json::Value root; + + Json::CharReaderBuilder builder; + const std::unique_ptr reader(builder.newCharReader()); + if (!reader->parse(rawJson.c_str(), rawJson.c_str() + rawJsonLength, &root, + &err)) { + #ifdef DEBUG + std::cout << "key: " << key <set(to_string(key), json_str); + root["data"] = data; + string json_str = rootToString(root); + metastore_->set(to_string(key), json_str); + } + if(retry == 0){ + std::cout << "key: " << key < -#include - -#include "rocksdb/db.h" -#include "rocksdb/slice.h" -#include "rocksdb/options.h" - -using namespace ROCKSDB_NAMESPACE; -using namespace std; - -#if defined(OS_WIN) -std::string kDBPath = "C:\\Windows\\TEMP\\rocksdb_simple_example"; -#else -std::string kDBPath = "/tmp/rocksdb_simple_example"; -#endif - -int put_and_get(){ - DB* db; - Options options; - // Optimize RocksDB. This is the easiest way to get RocksDB to perform well - options.IncreaseParallelism(); - options.OptimizeLevelStyleCompaction(); - // create the DB if it's not already present - options.create_if_missing = true; - // open DB - Status s = DB::Open(options, kDBPath, &db); - assert(s.ok()); - cout<Put(WriteOptions(), "key1", "value1"); - assert(s.ok()); - std::string value; - // get value - s = db->Get(ReadOptions(), "key1", &value); - assert(s.ok()); - assert(value == "value"); - cout<<"value: "<