Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[PMEM-SHUFFLE-52] Sync RPMP to master branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Eugene-Mark committed Aug 29, 2021
1 parent c17d9c8 commit ff9078e
Show file tree
Hide file tree
Showing 27 changed files with 321 additions and 1,009 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
**/target/**
*.class
*.iml
*.sh

# logs
*.log
Expand Down
662 changes: 0 additions & 662 deletions core/rpmp-core.patch

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] class UnCompressedMapStatus(
val step = 8

def this(loc: BlockManagerId, uncompressedSizes: Array[Long], mapTaskId: Long) = {
this(loc, uncompressedSizes.map(MapStatus.compressSize), mapTaskId)
this(loc, uncompressedSizes.flatMap(UnCompressedMapStatus.longToBytes), mapTaskId)
}

override def updateLocation(newLoc: BlockManagerId): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import org.apache.spark.storage.BlockManager
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import java.security.MessageDigest

private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffleBlockResolver,
metadataResolver: MetadataResolver,
blockManager: BlockManager,
Expand Down Expand Up @@ -128,22 +130,9 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
}

val shuffleServerId = blockManager.shuffleServerId
/**
if (pmofConf.enableRdma) {
val rkey = PmemBlockOutputStreamArray(0).getRkey()
metadataResolver.pushPmemBlockInfo(stageId, mapId, pmemBlockInfoMap, rkey)
val blockManagerId: BlockManagerId =
BlockManagerId(shuffleServerId.executorId, PmofTransferService.shuffleNodesMap(shuffleServerId.host),
PmofTransferService.getTransferServiceInstance(pmofConf, blockManager).port, shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
} else {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
}
**/

if (pmofConf.enableRemotePmem) {
mapStatus = new UnCompressedMapStatus(shuffleServerId, partitionLengths, mapId)
//mapStatus = MapStatus(shuffleServerId, partitionLengths)
} else if (!pmofConf.enableRdma) {
mapStatus = MapStatus(shuffleServerId, partitionLengths, mapId)
} else {
Expand All @@ -156,6 +145,12 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
shuffleServerId.topologyInfo)
mapStatus = MapStatus(blockManagerId, partitionLengths, mapId)
}
/**
For debug usage
logInfo(
s" shuffle_${stageId}_${mapId}_0 size is ${partitionLengths(0)}, decompressed size is ${mapStatus
.getSizeForBlock(0)}")
**/
}

/** Close this writer, passing along whether the map completed */
Expand All @@ -180,4 +175,11 @@ private[spark] class PmemShuffleWriter[K, V, C](shuffleBlockResolver: PmemShuffl
}
}
}

/**
* For debug usage
**/
def md5(s: String) = {
MessageDigest.getInstance("MD5").digest(s.getBytes)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,6 @@ object NettyByteBufferPool extends Logging {
}
}

def allocateNewBuffer(): ByteBuf = synchronized {
allocator.directBuffer()
}

def allocateFlexibleNewBuffer(bufSize: Int): ByteBuf = synchronized {
val byteBuf = allocator.directBuffer(65536, bufSize * 2)
bufferMap += (byteBuf -> bufSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ private[spark] class PmemBlockOutputStream(
if ((pmofConf.spill_throttle != -1 && pmemOutputStream.bufferRemainingSize >= pmofConf.spill_throttle) || force == true) {
val start = System.nanoTime()
flush()
//pmemOutputStream.doFlush()
pmemOutputStream.doFlush()
val bufSize = pmemOutputStream.flushedSize
mapStatus += ((pmemOutputStream.flushed_block_id, bufSize, recordsPerBlock))
if (bufSize > 0) {
recordsArray += recordsPerBlock
recordsPerBlock = 0
size += bufSize
size = bufSize

if (blockId.isShuffle == true) {
val writeMetrics = taskMetrics.shuffleWriteMetrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class PmemOutputStream(
var is_closed = false
var key_id = 0

val length: Int = 1024 * 1024 * 6
val length: Int = bufferSize
var bufferFlushedSize: Int = 0
var bufferRemainingSize: Int = 0
val buf: ByteBuf = NettyByteBufferPool.allocateFlexibleNewBuffer(length)
Expand All @@ -37,6 +37,9 @@ class PmemOutputStream(
}

override def flush(): Unit = {
}

def doFlush(): Unit = {
if (bufferRemainingSize > 0) {
if (remotePersistentMemoryPool != null) {
logDebug(s" [PUT Started]${cur_block_id}-${bufferRemainingSize}")
Expand All @@ -55,6 +58,7 @@ class PmemOutputStream(
key_id += 1
flushed_block_id = cur_block_id
cur_block_id = s"${blockId}_${key_id}"
logDebug(s" [PUT Completed]${blockId}-${bufferRemainingSize}, ${NettyByteBufferPool.dump(byteBuffer, bufferRemainingSize)}")
} else {
val byteBuffer: ByteBuffer = buf.nioBuffer()
persistentMemoryWriter.setPartition(
Expand All @@ -73,10 +77,6 @@ class PmemOutputStream(
}
}

def doFlush(): Unit = {

}

def flushedSize(): Int = {
bufferFlushedSize
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,18 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
private[this] var address: BlockManagerId = _
private[this] var blockInfos: Seq[(BlockId, Long, Int)] = _
private[this] var iterator: Iterator[(BlockId, Long, Int)] = _
private[this] var blocksByAddressSeq: Seq[(BlockManagerId, Seq[(BlockId, Long, Int)])] = _

initialize()

def initialize(): Unit = {
context.addTaskCompletionListener[Unit](_ => cleanup())
blocksByAddressSize = blocksByAddress.size

blocksByAddressSeq = blocksByAddress.toSeq
blocksByAddressSize = blocksByAddressSeq.size

if (blocksByAddressCurrentId < blocksByAddressSize) {
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down Expand Up @@ -228,7 +232,7 @@ private[spark] final class RpmpShuffleBlockFetcherIterator(
has_next = false
return has_next
}
val res = blocksByAddress.toSeq(blocksByAddressCurrentId)
val res = blocksByAddressSeq(blocksByAddressCurrentId)
address = res._1
blockInfos = res._2
iterator = blockInfos.iterator
Expand Down
10 changes: 4 additions & 6 deletions rpmp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,11 @@ file(COPY bin/stop-rpmp.sh DESTINATION ${CMAKE_RUNTIME_OUTPUT_DIRECTORY})

add_executable(data-server main.cc)
add_executable(chashtest test/chash_test.cc)
add_executable(rocksdbtest test/rocksdb_test.cc)
add_executable(proxy-server ProxyMain.cc)
add_executable(client Client.cc)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(rocksdbtest spdlog hiredis jsoncpp rocksdb redis++)
target_link_libraries(chashtest pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(data-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(proxy-server pmpool spdlog hiredis jsoncpp redis++)
target_link_libraries(client pmpool spdlog hiredis jsoncpp redis++)

file(COPY ${PROJECT_SOURCE_DIR}/config DESTINATION .)
26 changes: 23 additions & 3 deletions rpmp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ Hardware:
Software:
- [HPNL](https://github.com/Intel-bigdata/HPNL)
- [hiredis](https://github.com/redis/hiredis)
- [redis-plus-plus](https://github.com/sewenew/redis-plus-plus.git)
- [jsoncpp](https://github.com/open-source-parsers/jsoncpp)
- [PMDK](https://github.com/pmem/pmdk.git)
- [boost](https://www.boost.org/)

## <a id="hardware-enabling"></a>Hardware Enabling

Expand Down Expand Up @@ -76,16 +78,31 @@ make && make install
```

hiredis:
```
git clone https://github.com/redis/hiredis
cd hiredis
mkdir build; cd build
cmake ..
make && make install
```

Redis-plus-plus:
```
git clone https://github.com/sewenew/redis-plus-plus.git
cd redis-plus-plus
mkdir build
cd build
cmake ..
make
make install
```

jsoncpp:
```
git clone https://github.com/open-source-parsers/jsoncpp.git
cd jsoncpp
mkdir build; cd build
cmake ..
make && make install
```

Expand All @@ -111,13 +128,16 @@ make
make install
```

boost:
```
yum install boost-devel
```

### Build for C/C++
```
git clone https://github.com/Intel-bigdata/Spark-PMoF.git
git clone https://github.com/oap-project/pmem-shuffle.git
cd pmem-shuffle
git submodule update --init --recursive
git submodule add -b master https://github.com/redis/hiredis.git rpmp/include/hiredis
git submodule add -b master https://github.com/open-source-parsers/jsoncpp.git rpmp/include/jsoncpp
cd rpmp
mkdir build
cd build
Expand Down
72 changes: 72 additions & 0 deletions rpmp/bin/start-rpmp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env bash

if [ -L ${BASH_SOURCE-$0} ]; then
FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}"))
else
FWDIR=$(dirname "${BASH_SOURCE-$0}")
fi

if [[ -z "${RPMP_HOME}" ]]; then
export RPMP_HOME=$(cd "${FWDIR}/.."; pwd)
fi
export BIN_HOME=$RPMP_HOME/bin
export CONFIG_HOME=$RPMP_HOME/config
export LOG_HOME=$RPMP_HOME/log
PROXY_SERVER_LOG_PATH=$LOG_HOME/proxy-server.log
DATA_SERVER_LOG_PATH=$LOG_HOME/data-server.log
PROXY_SERVER_PID_PATH=/tmp/rpmp-proxy-server.pid
DATA_SERVER_PID_PATH=/tmp/rpmp-data-server.pid

#Keep this arg for future use.
USER_VARGS=
while [ $# != 0 ]; do
case "$1" in
"--help" | "-h")
echo "--help -h Show this usage information"
exit
shift
;;
*)
USER_VARGS+=" $1"
shift
;;
esac
done

CONFIG_FILE="$CONFIG_HOME/rpmp.conf"
PROXY_ADDR_KEY="rpmp.network.proxy.address"
SERVER_ADDR_KEY="rpmp.network.server.address"
PROXY_ADDR=
SERVER_ADDR=

while IFS= read -r line; do
tokens=( $line )
if [ "${tokens[0]}" = "$PROXY_ADDR_KEY" ]; then
PROXY_ADDR=${tokens[1]}
elif [ "${tokens[0]}" = "$SERVER_ADDR_KEY" ]; then
SERVER_ADDR=${tokens[1]}
fi
done < $CONFIG_FILE

#TODO: check if current process is running

#Separate address by ','.
IFS=','
#Start RPMP proxy
for addr in $PROXY_ADDR; do
echo "Starting RPMP proxy on $addr.."
#Pass addr to RPMP proxy
ssh $addr "cd ${BIN_HOME}; mkdir -p ${LOG_HOME}; \
./proxy-server --current_proxy_addr $addr --log ${PROXY_SERVER_LOG_PATH} >> ${PROXY_SERVER_LOG_PATH} & \
echo \$! > ${PROXY_SERVER_PID_PATH}"
done

# TODO: parse addr in main.cc
#Start RPMP server
for addr in $SERVER_ADDR; do
echo "Starting RPMP server on $addr.."
#Pass addr to RPMP server
ssh $addr "cd ${BIN_HOME}; mkdir -p ${LOG_HOME}; \
./data-server --address $addr --log ${DATA_SERVER_LOG_PATH} >> ${DATA_SERVER_LOG_PATH} & \
echo \$! > ${DATA_SERVER_PID_PATH}"
done
48 changes: 48 additions & 0 deletions rpmp/bin/stop-rpmp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env bash

if [ -L ${BASH_SOURCE-$0} ]; then
FWDIR=$(dirname $(readlink "${BASH_SOURCE-$0}"))
else
FWDIR=$(dirname "${BASH_SOURCE-$0}")
fi

if [[ -z "${RPMP_HOME}" ]]; then
export RPMP_HOME=$(cd "${FWDIR}/.."; pwd)
fi
export BIN_HOME=$RPMP_HOME/bin
export CONFIG_HOME=$RPMP_HOME/config
PROXY_PID_FILE_PATH=/tmp/rpmp-proxy.pid
SERVER_PID_FILE_PATH=/tmp/rpmp-server.pid

CONFIG_FILE="$CONFIG_HOME/rpmp.conf"
PROXY_ADDR_KEY="rpmp.network.proxy.address"
SERVER_ADDR_KEY="rpmp.network.server.address"
PROXY_ADDR=
SERVER_ADDR=

while IFS= read -r line; do
tokens=( $line )
if [ "${tokens[0]}" = "$PROXY_ADDR_KEY" ]; then
PROXY_ADDR=${tokens[1]}
elif [ "${tokens[0]}" = "$SERVER_ADDR_KEY" ]; then
SERVER_ADDR=${tokens[1]}
fi
done < $CONFIG_FILE

. "$BIN_HOME/common.sh"

#Separate address by ','.
IFS=','

#Stop RPMP server
for addr in $SERVER_ADDR; do
echo "Stopping RPMP server on $addr.."
ssh $addr "PID=\$(cat SERVER_PID_FILE_PATH 2>/dev/null); kill -9 \$PID > /dev/null 2>&1"
done

#Stop RPMP proxy
for addr in $PROXY_ADDR; do
echo "Stopping RPMP proxy on $addr.."
#Pass addr to RPMP proxy
ssh $addr "PID=\$(cat $PROXY_PID_FILE_PATH 2>/dev/null); kill -9 \$PID > /dev/null 2>&1"
done
2 changes: 1 addition & 1 deletion rpmp/pmpool/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ add_library(pmpool_client_jni SHARED Event.cc ProxyEvent.cc client/PmPoolClient.
target_link_libraries(pmpool_client_jni LINK_PUBLIC ${Boost_LIBRARIES} hpnl)
set_target_properties(pmpool_client_jni PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")

add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc proxy/metastore/rocksdb/Rocks.cc proxy/metastore/rocksdb/RocksConnection.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)
add_library(pmpool SHARED DataServer.cc Protocol.cc Event.cc ProxyEvent.cc NetworkServer.cc proxy/metastore/MetastoreFacade.cc proxy/metastore/ConnectionFacade.cc proxy/metastore/redis/Redis.cc HeartbeatClient.cc HeartbeatEvent.cc hash/xxhash.cc client/PmPoolClient.cc client/NetworkClient.cc client/ProxyClient.cc proxy/clientService/ClientService.cc Proxy.cc proxy/replicaService/ReplicaService.cc DataService/DataServerService.cc proxy/NodeManager.cc proxy/tracker/Tracker.cc)

target_link_libraries(pmpool LINK_PUBLIC ${Boost_LIBRARIES} hpnl pmemobj)
set_target_properties(pmpool PROPERTIES LIBRARY_OUTPUT_DIRECTORY "${CMAKE_BINARY_DIR}/lib")
Expand Down
8 changes: 8 additions & 0 deletions rpmp/pmpool/proxy/PhysicalNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ class PhysicalNode {

string getPort() { return port; }

void setIp(string ip){
this->ip = ip;
}

void setPort(string port){
this->port = port;
}

private:
string ip;
string port;
Expand Down
Loading

0 comments on commit ff9078e

Please sign in to comment.