Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: [comet-parquet-exec] Unit test fixes, default scan impl to native_comet #1265

Merged
merged 19 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 6 additions & 1 deletion common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,12 @@ public static native void setPageV2(
* @return a handle to the record batch reader, used in subsequent calls.
*/
public static native long initRecordBatchReader(
String filePath, long fileSize, long start, long length, byte[] requiredSchema);
String filePath,
long fileSize,
long start,
long length,
byte[] requiredSchema,
String sessionTimezone);

// arrow native version of read batch
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ public void init() throws URISyntaxException, IOException {

this.handle =
Native.initRecordBatchReader(
filePath, fileSize, start, length, serializedRequestedArrowSchema);
filePath, fileSize, start, length, serializedRequestedArrowSchema, timeZoneId);
isInitialized = true;
}

Expand Down
26 changes: 14 additions & 12 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,24 +77,26 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val SCAN_NATIVE = "native"
val SCAN_NATIVE_FULL = "native_full"
val SCAN_NATIVE_RECORDBATCH = "native_recordbatch"
val SCAN_NATIVE_COMET = "native_comet"
val SCAN_NATIVE_DATAFUSION = "native_datafusion"
val SCAN_NATIVE_ICEBERG_COMPAT = "native_iceberg_compat"

val COMET_NATIVE_SCAN_IMPL: ConfigEntry[String] = conf("spark.comet.scan.impl")
.doc(
"The implementation of Comet Native Scan to use. Available modes are 'native'," +
"'native_full', and 'native_recordbatch'. " +
"'native' is for the original Comet native scan which uses a jvm based parquet file " +
"reader and native column decoding. Supports simple types only " +
"'native_full' is a fully native implementation of scan based on DataFusion" +
"'native_recordbatch' is a native implementation that exposes apis to read parquet " +
"columns natively.")
s"The implementation of Comet Native Scan to use. Available modes are '$SCAN_NATIVE_COMET'," +
s"'$SCAN_NATIVE_DATAFUSION', and '$SCAN_NATIVE_ICEBERG_COMPAT'. " +
s"'$SCAN_NATIVE_COMET' is for the original Comet native scan which uses a jvm based " +
"parquet file reader and native column decoding. Supports simple types only " +
s"'$SCAN_NATIVE_DATAFUSION' is a fully native implementation of scan based on DataFusion" +
s"'$SCAN_NATIVE_ICEBERG_COMPAT' is a native implementation that exposes apis to read " +
"parquet columns natively.")
.internal()
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set(SCAN_NATIVE, SCAN_NATIVE_FULL, SCAN_NATIVE_RECORDBATCH))
.createWithDefault(sys.env.getOrElse("NATIVE_SCAN_IMPL", SCAN_NATIVE_FULL))
.checkValues(Set(SCAN_NATIVE_COMET, SCAN_NATIVE_DATAFUSION, SCAN_NATIVE_ICEBERG_COMPAT))
.createWithDefault(sys.env
.getOrElse("COMET_PARQUET_SCAN_IMPL", SCAN_NATIVE_COMET)
.toLowerCase(Locale.ROOT))

val COMET_PARQUET_PARALLEL_IO_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.parquet.read.parallel.io.enabled")
Expand Down
2 changes: 0 additions & 2 deletions native/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions native/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ arrow = { version = "53.2.0", features = ["prettyprint", "ffi", "chrono-tz"] }
arrow-array = { version = "53.2.0" }
arrow-buffer = { version = "53.2.0" }
arrow-data = { version = "53.2.0" }
arrow-ipc = { version = "53.2.0" }
arrow-schema = { version = "53.2.0" }
flatbuffers = { version = "24.3.25" }
parquet = { version = "53.2.0", default-features = false, features = ["experimental"] }
datafusion = { version = "43.0.0", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet"] }
datafusion-common = { version = "43.0.0" }
Expand Down
2 changes: 0 additions & 2 deletions native/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-data = { workspace = true }
arrow-schema = { workspace = true }
arrow-ipc = { workspace = true }
flatbuffers = { workspace = true }
parquet = { workspace = true, default-features = false, features = ["experimental"] }
futures = { workspace = true }
mimalloc = { version = "*", default-features = false, optional = true }
Expand Down
7 changes: 5 additions & 2 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1156,8 +1156,11 @@ impl PhysicalPlanner {
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;

let mut spark_parquet_options =
SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let mut spark_parquet_options = SparkParquetOptions::new(
EvalMode::Legacy,
scan.session_timezone.as_str(),
false,
);
spark_parquet_options.allow_cast_unsigned_ints = true;

let mut builder = ParquetExecBuilder::new(file_scan_config)
Expand Down
15 changes: 8 additions & 7 deletions native/core/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use self::util::jni::TypePromotionInfo;
use crate::execution::operators::ExecutionError;
use crate::execution::utils::SparkArrowConvert;
use crate::parquet::data_type::AsBytes;
use crate::parquet::parquet_support::SparkParquetOptions;
use crate::parquet::schema_adapter::SparkSchemaAdapterFactory;
use arrow::buffer::{Buffer, MutableBuffer};
use arrow_array::{Array, RecordBatch};
Expand All @@ -59,8 +60,6 @@ use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use futures::{poll, StreamExt};
use jni::objects::{JBooleanArray, JByteArray, JLongArray, JPrimitiveArray, JString, ReleaseMode};
use jni::sys::jstring;
use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
use parquet_support::SparkParquetOptions;
use read::ColumnReader;
use util::jni::{convert_column_descriptor, convert_encoding, deserialize_schema, get_file_path};

Expand Down Expand Up @@ -608,7 +607,6 @@ enum ParquetReaderState {
struct BatchContext {
runtime: tokio::runtime::Runtime,
batch_stream: Option<SendableRecordBatchStream>,
batch_reader: Option<ParquetRecordBatchReader>,
current_batch: Option<RecordBatch>,
reader_state: ParquetReaderState,
}
Expand Down Expand Up @@ -640,14 +638,14 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
start: jlong,
length: jlong,
required_schema: jbyteArray,
session_timezone: jstring,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| unsafe {
let path: String = env
.get_string(&JString::from_raw(file_path))
.unwrap()
.into();
let batch_stream: Option<SendableRecordBatchStream>;
let batch_reader: Option<ParquetRecordBatchReader> = None;
// TODO: (ARROW NATIVE) Use the common global runtime
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
Expand Down Expand Up @@ -681,8 +679,13 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
// TODO: Maybe these are configs?
table_parquet_options.global.pushdown_filters = true;
table_parquet_options.global.reorder_filters = true;
let session_timezone: String = env
.get_string(&JString::from_raw(session_timezone))
.unwrap()
.into();

let mut spark_parquet_options = SparkParquetOptions::new(EvalMode::Legacy, "UTC", false);
let mut spark_parquet_options =
SparkParquetOptions::new(EvalMode::Legacy, session_timezone.as_str(), false);
spark_parquet_options.allow_cast_unsigned_ints = true;

let builder2 = ParquetExecBuilder::new(file_scan_config)
Expand All @@ -704,7 +707,6 @@ pub unsafe extern "system" fn Java_org_apache_comet_parquet_Native_initRecordBat
let ctx = BatchContext {
runtime,
batch_stream,
batch_reader,
current_batch: None,
reader_state: ParquetReaderState::Init,
};
Expand All @@ -725,7 +727,6 @@ pub extern "system" fn Java_org_apache_comet_parquet_Native_readNextRecordBatch(
let batch_stream = context.batch_stream.as_mut().unwrap();
let runtime = &context.runtime;

// let mut stream = batch_stream.as_mut();
loop {
let next_item = batch_stream.next();
let poll_batch: Poll<Option<datafusion_common::Result<RecordBatch>>> =
Expand Down
2 changes: 1 addition & 1 deletion native/core/src/parquet/parquet_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ fn cast_struct_to_struct(
Ok(Arc::new(StructArray::new(
to_fields.clone(),
cast_fields,
array.nulls().map(|nulls| nulls.clone()),
array.nulls().cloned(),
)))
}
_ => unreachable!(),
Expand Down
1 change: 1 addition & 0 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ message NativeScan {
repeated spark.spark_expression.Expr data_filters = 6;
repeated SparkFilePartition file_partitions = 7;
repeated int64 projection_vector = 8;
string session_timezone = 9;
}

message Projection {
Expand Down
2 changes: 1 addition & 1 deletion native/spark-expr/src/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ fn cast_struct_to_struct(
Ok(Arc::new(StructArray::new(
to_fields.clone(),
cast_fields,
array.nulls().map(|nulls| nulls.clone()),
array.nulls().cloned(),
)))
}
_ => unreachable!(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ class CometSparkSessionExtensions
// here and then it gets replaced with `CometNativeScanExec` in `CometExecRule`
// but that only happens if `COMET_EXEC_ENABLED` is enabled
&& COMET_EXEC_ENABLED.get()
&& COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_FULL =>
&& COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_DATAFUSION =>
logInfo("Comet extension enabled for v1 full native Scan")
CometScanExec(scanExec, session)

Expand Down Expand Up @@ -377,7 +377,7 @@ class CometSparkSessionExtensions
plan.transformUp {
// Fully native scan for V1
case scan: CometScanExec
if COMET_NATIVE_SCAN_IMPL.get.equals(CometConf.SCAN_NATIVE_FULL) =>
if COMET_NATIVE_SCAN_IMPL.get().equals(CometConf.SCAN_NATIVE_DATAFUSION) =>
val nativeOp = QueryPlanSerde.operator2Proto(scan).get
CometNativeScanExec(nativeOp, scan.wrapped, scan.session)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class CometParquetFileFormat extends ParquetFileFormat with MetricsSupport with
// Comet specific configurations
val capacity = CometConf.COMET_BATCH_SIZE.get(sqlConf)
val nativeRecordBatchReaderEnabled =
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_RECORDBATCH)
CometConf.COMET_NATIVE_SCAN_IMPL.get(sqlConf).equals(CometConf.SCAN_NATIVE_ICEBERG_COMPAT)

(file: PartitionedFile) => {
val sharedConf = broadcastedHadoopConf.value.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2517,7 +2517,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim

// Fully native scan for V1
case scan: CometScanExec
if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_FULL =>
if CometConf.COMET_NATIVE_SCAN_IMPL.get(conf) == CometConf.SCAN_NATIVE_DATAFUSION =>
val nativeScanBuilder = OperatorOuterClass.NativeScan.newBuilder()
nativeScanBuilder.setSource(op.simpleStringWithNodeId())

Expand Down Expand Up @@ -2578,6 +2578,7 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
nativeScanBuilder.addAllDataSchema(dataSchema.toIterable.asJava)
nativeScanBuilder.addAllRequiredSchema(requiredSchema.toIterable.asJava)
nativeScanBuilder.addAllPartitionSchema(partitionSchema.toIterable.asJava)
nativeScanBuilder.setSessionTimezone(conf.getConfString("spark.sql.session.timeZone"))

Some(result.setNativeScan(nativeScanBuilder).build())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ case class CometScanExec(
object CometScanExec extends DataTypeSupport {

override def isAdditionallySupported(dt: DataType): Boolean = {
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_RECORDBATCH) {
if (CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_ICEBERG_COMPAT) {
// TODO add array and map
dt match {
case s: StructType => s.fields.map(_.dataType).forall(isTypeSupported)
Expand Down
Loading
Loading