Skip to content

Commit

Permalink
Add config to support whether support viewfs
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 15, 2024
1 parent d1b6570 commit bc9e259
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import java.lang.{Long => JLong}
import java.net.URI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ case class IcebergScanTransformer(

override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(
partitions: Seq[InputPartition]): Seq[SplitInfo] = {
override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,24 +294,25 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)

allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
if (GlutenConfig.getConf.enableHdfsViewfs) {
allScanSplitInfos.foreach {
splitInfos =>
splitInfos.foreach {
case splitInfo: LocalFilesNode =>
val paths = splitInfo.getPaths.asScala
if (paths.nonEmpty && paths.head.startsWith("viewfs")) {
// Convert the viewfs path into hdfs
val newPaths = paths.map {
viewfsPath =>
val viewPath = new Path(viewfsPath)
val viewFileSystem =
FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
viewFileSystem.resolvePath(viewPath).toString
}
splitInfo.setPaths(newPaths.asJava)
}
splitInfo.setPaths(newPaths.asJava)
}
}
}
}
}

val inputPartitions =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def enableHiveFileFormatWriter: Boolean = conf.getConf(NATIVE_HIVEFILEFORMAT_WRITER_ENABLED)

def enableCelebornFallback: Boolean = conf.getConf(CELEBORN_FALLBACK_ENABLED)

def enableHdfsViewfs: Boolean = conf.getConf(HDFS_VIEWFS_ENABLED)
}

object GlutenConfig {
Expand Down Expand Up @@ -2193,4 +2195,11 @@ object GlutenConfig {
"Otherwise, do nothing.")
.booleanConf
.createWithDefault(false)

val HDFS_VIEWFS_ENABLED =
buildStaticConf("spark.gluten.storage.hdfs.viewfs.enabled")
.internal()
.doc("If enabled, gluten will convert the viewfs path to hdfs path in scala size")
.booleanConf
.createWithDefault(false)
}

0 comments on commit bc9e259

Please sign in to comment.