diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala index dd5a736e7571..28c8d7cb82ec 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHIteratorApi.scala @@ -133,8 +133,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case p: GlutenMergeTreePartition => ExtensionTableBuilder diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala index 571dc4ba9258..c0f509c68cda 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteOnS3Suite.scala @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(1)(plans.head.getSplitInfos(null).size) + assertResult(1)(plans.head.getSplitInfos.size) } } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala index b33a2065f665..43bc6339a43d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1803,7 +1803,7 @@ class GlutenClickHouseMergeTreeWriteSuite case scanExec: BasicScanExecTransformer => scanExec } assertResult(1)(plans.size) - assertResult(conf._2)(plans.head.getSplitInfos(null).size) + assertResult(conf._2)(plans.head.getSplitInfos.size) } } }) @@ -1935,7 +1935,7 @@ class GlutenClickHouseMergeTreeWriteSuite case f: BasicScanExecTransformer => f } assertResult(2)(scanExec.size) - assertResult(conf._2)(scanExec(1).getSplitInfos(null).size) + assertResult(conf._2)(scanExec(1).getSplitInfos.size) } } }) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala index 320d1f366c23..061daaac0fad 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxIteratorApi.scala @@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types._ import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil} - -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil} import java.lang.{Long => JLong} import java.nio.charset.StandardCharsets @@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo = { + properties: Map[String, String]): SplitInfo = { partition match { case f: FilePartition => val ( @@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { modificationTimes, partitionColumns, metadataColumns) = - constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf) + constructSplitInfo(partitionSchema, f.files, metadataColumnNames) val preferredLocations = SoftAffinity.getFilePartitionLocations(f) LocalFilesBuilder.makeLocalFiles( @@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging { private def constructSplitInfo( schema: StructType, files: Array[PartitionedFile], - metadataColumnNames: Seq[String], - serializableHadoopConf: SerializableConfiguration) = { + metadataColumnNames: Seq[String]) = { val paths = new JArrayList[String]() val starts = new JArrayList[JLong] val lengths = new JArrayList[JLong]() @@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging { file => // The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded // path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder - var filePath = file.filePath.toString - if (filePath.startsWith("viewfs")) { - val viewPath = new Path(filePath) - val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value) - filePath = viewFileSystem.resolvePath(viewPath).toString - } paths.add( GlutenURLDecoder - .decode(filePath, StandardCharsets.UTF_8.name())) + .decode(file.filePath.toString, StandardCharsets.UTF_8.name())) starts.add(JLong.valueOf(file.start)) lengths.add(JLong.valueOf(file.length)) val (fileSize, modificationTime) = diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala index 10d24c317cc1..d453e90cb0c3 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.connector.read.{InputPartition, Scan} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType -import org.apache.spark.util.SerializableConfiguration import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil @@ -60,8 +59,7 @@ case class IcebergScanTransformer( override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan) override def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + partitions: Seq[InputPartition]): Seq[SplitInfo] = { val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions( scan, keyGroupedPartitioning, diff --git a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala index 7f399ce629cf..de71d341db69 100644 --- a/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala +++ b/gluten-iceberg/src/test/scala/org/apache/gluten/execution/VeloxIcebergSuite.scala @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3) case _ => // do nothing } checkLengthAndPlan(df, 7) @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite { case plan if plan.isInstanceOf[IcebergScanTransformer] => assert( plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined) - assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1) + assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1) case _ => // do nothing } checkLengthAndPlan(df, 5) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala index 69c9d37334de..11211bd0da91 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/IteratorApi.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration trait IteratorApi { @@ -38,8 +37,7 @@ trait IteratorApi { partitionSchema: StructType, fileFormat: ReadFileFormat, metadataColumnNames: Seq[String], - properties: Map[String, String], - serializableHadoopConf: SerializableConfiguration): SplitInfo + properties: Map[String, String]): SplitInfo /** Generate native row partition. */ def genPartitions( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index d768ac2c5936..73ed35e7190b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.hive.HiveTableScanExecTransformer import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} -import org.apache.spark.util.SerializableConfiguration import com.google.protobuf.StringValue import io.substrait.proto.NamedStruct @@ -63,13 +62,11 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource def getProperties: Map[String, String] = Map.empty /** Returns the split infos that will be processed by the underlying native engine. */ - def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { - getSplitInfosFromPartitions(getPartitions, serializableHadoopConf) + def getSplitInfos(): Seq[SplitInfo] = { + getSplitInfosFromPartitions(getPartitions) } - def getSplitInfosFromPartitions( - partitions: Seq[InputPartition], - serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = { + def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = { partitions.map( BackendsApiManager.getIteratorApiInstance .genSplitInfo( @@ -77,8 +74,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource getPartitionSchema, fileFormat, getMetadataColumns.map(_.name), - getProperties, - serializableHadoopConf)) + getProperties)) } override protected def doValidateInternal(): ValidationResult = { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala index 70839ffc2eba..20b31aed1bd3 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch -import org.apache.spark.util.SerializableConfiguration import com.google.common.collect.Lists @@ -127,8 +126,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext) val sparkConf: SparkConf = sparkContext.getConf - val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration( - sparkContext.hadoopConfiguration) val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo @transient @@ -289,10 +286,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f */ val allScanPartitions = basicScanExecTransformers.map(_.getPartitions) val allScanSplitInfos = - getSplitInfosFromPartitions( - basicScanExecTransformers, - allScanPartitions, - serializableHadoopConf) + getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions) val inputPartitions = BackendsApiManager.getIteratorApiInstance.genPartitions( wsCtx, @@ -384,8 +378,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f private def getSplitInfosFromPartitions( basicScanExecTransformers: Seq[BasicScanExecTransformer], - allScanPartitions: Seq[Seq[InputPartition]], - serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = { + allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = { // If these are two scan transformers, they must have same partitions, // otherwise, exchange will be inserted. We should combine the two scan // transformers' partitions with same index, and set them together in @@ -404,7 +397,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f val allScanSplitInfos = allScanPartitions.zip(basicScanExecTransformers).map { case (partition, transformer) => - transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf) + transformer.getSplitInfosFromPartitions(partition) } val partitionLength = allScanSplitInfos.head.size if (allScanSplitInfos.exists(_.size != partitionLength)) {