Skip to content
This repository has been archived by the owner on Apr 17, 2024. It is now read-only.

rebase current code to spark 2.3.2 #35

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
<version>2.3.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId>
<version>2.3.0</version>
<version>2.3.2</version>
</dependency>
<dependency>
<groupId>com.intel.hpnl</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.util.Random
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}

import org.apache.spark.network.BlockDataManager
import org.apache.spark.network.shuffle.{BlockFetchingListener, TempFileManager}
import org.apache.spark.network.shuffle.{BlockFetchingListener, DownloadFileManager}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.pmof.{MetadataResolver, PmofShuffleManager}
import org.apache.spark.storage.{BlockId, BlockManager, ShuffleBlockId}
Expand All @@ -30,7 +30,7 @@ class RdmaTransferService(conf: SparkConf, val shuffleManager: PmofShuffleManage
executId: String,
blockIds: Array[String],
blockFetchingListener: BlockFetchingListener,
tempFileManager: TempFileManager): Unit = {}
tempFileManager: DownloadFileManager): Unit = {}

def fetchBlock(reqHost: String, reqPort: Int, rmaAddress: Long, rmaLength: Int,
rmaRkey: Long, localAddress: Int, shuffleBuffer: ShuffleBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,12 @@ import java.util.concurrent.LinkedBlockingQueue
import javax.annotation.concurrent.GuardedBy
import org.apache.spark.internal.Logging
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.shuffle.{ShuffleClient, TempFileManager}
import org.apache.spark.network.shuffle.{ShuffleClient, DownloadFileManager, DownloadFile, SimpleDownloadFile}
import org.apache.spark.network.pmof._
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage._
import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.network.util.TransportConf

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -70,7 +71,7 @@ final class RdmaShuffleBlockFetcherIterator(
maxBlocksInFlightPerAddress: Int,
maxReqSizeShuffleToMem: Long,
detectCorrupt: Boolean)
extends Iterator[(BlockId, InputStream)] with TempFileManager with Logging {
extends Iterator[(BlockId, InputStream)] with DownloadFileManager with Logging {

import RdmaShuffleBlockFetcherIterator._

Expand Down Expand Up @@ -126,7 +127,7 @@ final class RdmaShuffleBlockFetcherIterator(
* deleted when cleanup. This is a layer of defensiveness against disk file leaks.
*/
@GuardedBy("this")
private[this] val shuffleFilesSet = mutable.HashSet[File]()
private[this] val shuffleFilesSet = mutable.HashSet[DownloadFile]()

private[this] val remoteRdmaRequestQueue = new LinkedBlockingQueue[RdmaRequest]()

Expand Down Expand Up @@ -256,11 +257,13 @@ final class RdmaShuffleBlockFetcherIterator(
currentResult = null
}

override def createTempFile(): File = {
blockManager.diskBlockManager.createTempLocalBlock()._2
//override def createTempFile(): DownloadFile = {
override def createTempFile(transportConf: TransportConf): DownloadFile = {
new SimpleDownloadFile(
blockManager.diskBlockManager.createTempLocalBlock()._2, transportConf)
}

override def registerTempFileToClean(file: File): Boolean = synchronized {
override def registerTempFileToClean(file: DownloadFile): Boolean = synchronized {
if (isZombie) {
false
} else {
Expand Down Expand Up @@ -296,7 +299,7 @@ final class RdmaShuffleBlockFetcherIterator(
}
shuffleFilesSet.foreach { file =>
if (!file.delete()) {
logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath)
//logWarning("Failed to cleanup shuffle fetch temp file " + file.getAbsolutePath)
}
}
}
Expand Down