Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL][1.2] Port 6432 6657 for Celeborn bug fix in branch 1.2 #7922

Open
wants to merge 3 commits into
base: branch-1.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.celeborn.client.ShuffleClient
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.protocol.ShuffleMode

import java.io.IOException
import java.util.Locale
Expand All @@ -55,68 +56,29 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: CHSplitResult = _

private val nativeBufferSize: Int = GlutenConfig.getConf.shuffleWriterBufferSize

@throws[IOException]
override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}

if (nativeShuffleWriter == -1L) {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
shuffleId,
mapId,
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
)
CHNativeMemoryAllocators.createSpillable(
"CelebornShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer " +
"is created. This behavior should be" +
"optimized by moving memory " +
"allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter)
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
}
)
}
while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
initShuffleWriter()
val col = cb.column(0).asInstanceOf[CHColumnVector]
val block = col.getBlockAddress
jniWrapper
.split(nativeShuffleWriter, block)
jniWrapper.split(nativeShuffleWriter, col.getBlockAddress)
dep.metrics("numInputRows").add(cb.numRows)
dep.metrics("inputBatches").add(1)
// This metric is important, AQE use it to decide if EliminateLimit
writeMetrics.incRecordsWritten(cb.numRows())
}
}

// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
handleEmptyIterator()
return
}

splitResult = jniWrapper.stop(nativeShuffleWriter)

dep.metrics("splitTime").add(splitResult.getSplitTime)
Expand All @@ -135,6 +97,43 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V](
mapStatus = MapStatus(blockManager.shuffleServerId, splitResult.getRawPartitionLengths, mapId)
}

override def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
shuffleId,
mapId,
nativeBufferSize,
customizedCompressCodec,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
GlutenConfig.getConf.chColumnarThrowIfMemoryExceed,
GlutenConfig.getConf.chColumnarFlushBlockBufferBeforeEvict,
GlutenConfig.getConf.chColumnarForceExternalSortShuffle,
GlutenConfig.getConf.chColumnarForceMemorySortShuffle
|| ShuffleMode.SORT.name.equalsIgnoreCase(shuffleWriterType)
)
CHNativeMemoryAllocators.createSpillable(
"CelebornShuffleWriter",
new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException(
"Fatal: spill() called before a celeborn shuffle writer is created. " +
"This behavior should be optimized by moving memory allocations from make() to split()")
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
val spilled = jniWrapper.evict(nativeShuffleWriter)
logInfo(s"Gluten shuffle writer: Spilled $spilled / $size bytes of data")
spilled
}
}
)
}

override def closeShuffleWriter(): Unit = {
jniWrapper.close(nativeShuffleWriter)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SHUFFLE_COMPRESS
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.celeborn.CelebornShuffleHandle
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage.BlockManager

import org.apache.celeborn.client.ShuffleClient
Expand Down Expand Up @@ -52,12 +53,23 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

protected val mapId: Int = context.partitionId()

protected lazy val nativeBufferSize: Int = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

protected val clientPushBufferMaxSize: Int = celebornConf.clientPushBufferMaxSize

protected val clientPushSortMemoryThreshold: Long = celebornConf.clientPushSortMemoryThreshold

protected val clientSortMemoryMaxSize: Long = celebornConf.clientPushSortMemoryThreshold

protected val shuffleWriterType: String =
celebornConf.shuffleWriterMode.name.toLowerCase(Locale.ROOT)

Expand Down Expand Up @@ -96,6 +108,10 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](

@throws[IOException]
final override def write(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}
internalWrite(records)
}

Expand All @@ -122,10 +138,18 @@ abstract class CelebornHashBasedColumnarShuffleWriter[K, V](
}
}

def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {}

def closeShuffleWriter(): Unit = {}

def getPartitionLengths: Array[Long] = partitionLengths

def initShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
if (nativeShuffleWriter == -1L) {
createShuffleWriter(columnarBatch)
}
}

def pushMergedDataToCeleborn(): Unit = {
val pushMergedDataTime = System.nanoTime
client.prepareForMergeData(shuffleId, mapId, context.attemptNumber())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,6 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](

private var splitResult: SplitResult = _

private lazy val nativeBufferSize = {
val bufferSize = GlutenConfig.getConf.shuffleWriterBufferSize
val maxBatchSize = GlutenConfig.getConf.maxBatchSize
if (bufferSize > maxBatchSize) {
logInfo(
s"${GlutenConfig.SHUFFLE_WRITER_BUFFER_SIZE.key} ($bufferSize) exceeds max " +
s" batch size. Limited to ${GlutenConfig.COLUMNAR_MAX_BATCH_SIZE.key} ($maxBatchSize).")
maxBatchSize
} else {
bufferSize
}
}

private val memoryLimit: Long = if ("sort".equals(shuffleWriterType)) {
Math.min(clientSortMemoryMaxSize, clientPushBufferMaxSize * numPartitions)
} else {
availableOffHeapPerTask()
}

private def availableOffHeapPerTask(): Long = {
val perTask =
SparkMemoryUtil.getCurrentAvailableOffHeapMemory / SparkResourceUtil.getTaskSlots(conf)
Expand All @@ -82,49 +63,13 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](

@throws[IOException]
override def internalWrite(records: Iterator[Product2[K, V]]): Unit = {
if (!records.hasNext) {
handleEmptyIterator()
return
}

while (records.hasNext) {
val cb = records.next()._2.asInstanceOf[ColumnarBatch]
if (cb.numRows == 0 || cb.numCols == 0) {
logInfo(s"Skip ColumnarBatch of ${cb.numRows} rows, ${cb.numCols} cols")
} else {
initShuffleWriter(cb)
val handle = ColumnarBatches.getNativeHandle(cb)
if (nativeShuffleWriter == -1L) {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
nativeBufferSize,
customizedCompressionCodec,
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
handle,
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
"celeborn",
shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
)
runtime.addSpiller(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed =
jniWrapper.nativeEvict(nativeShuffleWriter, size, false)
logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of data")
pushed
}
})
}
val startTime = System.nanoTime()
jniWrapper.write(nativeShuffleWriter, cb.numRows, handle, availableOffHeapPerTask())
dep.metrics("splitTime").add(System.nanoTime() - startTime)
Expand All @@ -135,8 +80,13 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
}
}

// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
handleEmptyIterator()
return
}

val startTime = System.nanoTime()
assert(nativeShuffleWriter != -1L)
splitResult = jniWrapper.stop(nativeShuffleWriter)

dep
Expand All @@ -155,6 +105,38 @@ class VeloxCelebornHashBasedColumnarShuffleWriter[K, V](
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)
}

override def createShuffleWriter(columnarBatch: ColumnarBatch): Unit = {
nativeShuffleWriter = jniWrapper.makeForRSS(
dep.nativePartitioning,
nativeBufferSize,
customizedCompressionCodec,
compressionLevel,
bufferCompressThreshold,
GlutenConfig.getConf.columnarShuffleCompressionMode,
clientPushBufferMaxSize,
clientPushSortMemoryThreshold,
celebornPartitionPusher,
ColumnarBatches.getNativeHandle(columnarBatch),
context.taskAttemptId(),
GlutenShuffleUtils.getStartPartitionId(dep.nativePartitioning, context.partitionId),
"celeborn",
shuffleWriterType,
GlutenConfig.getConf.columnarShuffleReallocThreshold
)
runtime.addSpiller(new Spiller() {
override def spill(self: MemoryTarget, phase: Spiller.Phase, size: Long): Long = {
if (!Spillers.PHASE_SET_SPILL_ONLY.contains(phase)) {
return 0L
}
logInfo(s"Gluten shuffle writer: Trying to push $size bytes of data")
// fixme pass true when being called by self
val pushed = jniWrapper.nativeEvict(nativeShuffleWriter, size, false)
logInfo(s"Gluten shuffle writer: Pushed $pushed / $size bytes of data")
pushed
}
})
}

override def closeShuffleWriter(): Unit = {
jniWrapper.close(nativeShuffleWriter)
}
Expand Down
Loading