Skip to content

Commit

Permalink
Revert view fs support
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Nov 11, 2024
1 parent be0435a commit 0d2ae6a
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
properties: Map[String, String]): SplitInfo = {
partition match {
case p: GlutenMergeTreePartition =>
ExtensionTableBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(1)(plans.head.getSplitInfos(null).size)
assertResult(1)(plans.head.getSplitInfos.size)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1803,7 +1803,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assertResult(1)(plans.size)
assertResult(conf._2)(plans.head.getSplitInfos(null).size)
assertResult(conf._2)(plans.head.getSplitInfos.size)
}
}
})
Expand Down Expand Up @@ -1935,7 +1935,7 @@ class GlutenClickHouseMergeTreeWriteSuite
case f: BasicScanExecTransformer => f
}
assertResult(2)(scanExec.size)
assertResult(conf._2)(scanExec(1).getSplitInfos(null).size)
assertResult(conf._2)(scanExec(1).getSplitInfos.size)
}
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{ExecutorManager, SerializableConfiguration, SparkDirectoryUtil}

import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.util.{ExecutorManager, SparkDirectoryUtil}

import java.lang.{Long => JLong}
import java.nio.charset.StandardCharsets
Expand All @@ -57,8 +55,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo = {
properties: Map[String, String]): SplitInfo = {
partition match {
case f: FilePartition =>
val (
Expand All @@ -69,7 +66,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
modificationTimes,
partitionColumns,
metadataColumns) =
constructSplitInfo(partitionSchema, f.files, metadataColumnNames, serializableHadoopConf)
constructSplitInfo(partitionSchema, f.files, metadataColumnNames)
val preferredLocations =
SoftAffinity.getFilePartitionLocations(f)
LocalFilesBuilder.makeLocalFiles(
Expand Down Expand Up @@ -112,8 +109,7 @@ class VeloxIteratorApi extends IteratorApi with Logging {
private def constructSplitInfo(
schema: StructType,
files: Array[PartitionedFile],
metadataColumnNames: Seq[String],
serializableHadoopConf: SerializableConfiguration) = {
metadataColumnNames: Seq[String]) = {
val paths = new JArrayList[String]()
val starts = new JArrayList[JLong]
val lengths = new JArrayList[JLong]()
Expand All @@ -125,15 +121,9 @@ class VeloxIteratorApi extends IteratorApi with Logging {
file =>
// The "file.filePath" in PartitionedFile is not the original encoded path, so the decoded
// path is incorrect in some cases and here fix the case of ' ' by using GlutenURLDecoder
var filePath = file.filePath.toString
if (filePath.startsWith("viewfs")) {
val viewPath = new Path(filePath)
val viewFileSystem = FileSystem.get(viewPath.toUri, serializableHadoopConf.value)
filePath = viewFileSystem.resolvePath(viewPath).toString
}
paths.add(
GlutenURLDecoder
.decode(filePath, StandardCharsets.UTF_8.name()))
.decode(file.filePath.toString, StandardCharsets.UTF_8.name()))
starts.add(JLong.valueOf(file.start))
lengths.add(JLong.valueOf(file.length))
val (fileSize, modificationTime) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

import org.apache.iceberg.spark.source.GlutenIcebergSourceUtil

Expand Down Expand Up @@ -60,8 +59,7 @@ case class IcebergScanTransformer(
override lazy val fileFormat: ReadFileFormat = GlutenIcebergSourceUtil.getFileFormat(scan)

override def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
partitions: Seq[InputPartition]): Seq[SplitInfo] = {
val groupedPartitions = SparkShimLoader.getSparkShims.orderPartitions(
scan,
keyGroupedPartitioning,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -208,7 +208,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 3)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
case _ => // do nothing
}
checkLengthAndPlan(df, 7)
Expand Down Expand Up @@ -289,7 +289,7 @@ class VeloxIcebergSuite extends WholeStageTransformerSuite {
case plan if plan.isInstanceOf[IcebergScanTransformer] =>
assert(
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos(null).length == 1)
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
case _ => // do nothing
}
checkLengthAndPlan(df, 5)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

trait IteratorApi {

Expand All @@ -38,8 +37,7 @@ trait IteratorApi {
partitionSchema: StructType,
fileFormat: ReadFileFormat,
metadataColumnNames: Seq[String],
properties: Map[String, String],
serializableHadoopConf: SerializableConfiguration): SplitInfo
properties: Map[String, String]): SplitInfo

/** Generate native row partition. */
def genPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
import org.apache.spark.util.SerializableConfiguration

import com.google.protobuf.StringValue
import io.substrait.proto.NamedStruct
Expand Down Expand Up @@ -63,22 +62,19 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource
def getProperties: Map[String, String] = Map.empty

/** Returns the split infos that will be processed by the underlying native engine. */
def getSplitInfos(serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions, serializableHadoopConf)
def getSplitInfos(): Seq[SplitInfo] = {
getSplitInfosFromPartitions(getPartitions)
}

def getSplitInfosFromPartitions(
partitions: Seq[InputPartition],
serializableHadoopConf: SerializableConfiguration): Seq[SplitInfo] = {
def getSplitInfosFromPartitions(partitions: Seq[InputPartition]): Seq[SplitInfo] = {
partitions.map(
BackendsApiManager.getIteratorApiInstance
.genSplitInfo(
_,
getPartitionSchema,
fileFormat,
getMetadataColumns.map(_.name),
getProperties,
serializableHadoopConf))
getProperties))
}

override protected def doValidateInternal(): ValidationResult = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.SparkInputMetricsUtil.InputMetricsWrapper
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

import com.google.common.collect.Lists

Expand Down Expand Up @@ -127,8 +126,6 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
BackendsApiManager.getMetricsApiInstance.genWholeStageTransformerMetrics(sparkContext)

val sparkConf: SparkConf = sparkContext.getConf
val serializableHadoopConf: SerializableConfiguration = new SerializableConfiguration(
sparkContext.hadoopConfiguration)
val numaBindingInfo: GlutenNumaBindingInfo = GlutenConfig.getConf.numaBindingInfo

@transient
Expand Down Expand Up @@ -289,10 +286,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
*/
val allScanPartitions = basicScanExecTransformers.map(_.getPartitions)
val allScanSplitInfos =
getSplitInfosFromPartitions(
basicScanExecTransformers,
allScanPartitions,
serializableHadoopConf)
getSplitInfosFromPartitions(basicScanExecTransformers, allScanPartitions)
val inputPartitions =
BackendsApiManager.getIteratorApiInstance.genPartitions(
wsCtx,
Expand Down Expand Up @@ -384,8 +378,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f

private def getSplitInfosFromPartitions(
basicScanExecTransformers: Seq[BasicScanExecTransformer],
allScanPartitions: Seq[Seq[InputPartition]],
serializableHadoopConf: SerializableConfiguration): Seq[Seq[SplitInfo]] = {
allScanPartitions: Seq[Seq[InputPartition]]): Seq[Seq[SplitInfo]] = {
// If these are two scan transformers, they must have same partitions,
// otherwise, exchange will be inserted. We should combine the two scan
// transformers' partitions with same index, and set them together in
Expand All @@ -404,7 +397,7 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f
val allScanSplitInfos =
allScanPartitions.zip(basicScanExecTransformers).map {
case (partition, transformer) =>
transformer.getSplitInfosFromPartitions(partition, serializableHadoopConf)
transformer.getSplitInfosFromPartitions(partition)
}
val partitionLength = allScanSplitInfos.head.size
if (allScanSplitInfos.exists(_.size != partitionLength)) {
Expand Down

0 comments on commit 0d2ae6a

Please sign in to comment.