Skip to content

Commit

Permalink
chore: merge comet-parquet-exec branch into main (#1318)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jan 21, 2025
1 parent 517c255 commit c3a552f
Show file tree
Hide file tree
Showing 1,324 changed files with 239,269 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.slf4j.LoggerFactory;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.schema.Type;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.TimestampNTZType$;

Expand All @@ -36,6 +37,9 @@ public abstract class AbstractColumnReader implements AutoCloseable {
/** The Spark data type. */
protected final DataType type;

/** The Spark data type. */
protected final Type fieldType;

/** Parquet column descriptor. */
protected final ColumnDescriptor descriptor;

Expand All @@ -61,13 +65,23 @@ public abstract class AbstractColumnReader implements AutoCloseable {

public AbstractColumnReader(
DataType type,
Type fieldType,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this.type = type;
this.fieldType = fieldType;
this.descriptor = descriptor;
this.useDecimal128 = useDecimal128;
this.useLegacyDateTimestamp = useLegacyDateTimestamp;
}

public AbstractColumnReader(
DataType type,
ColumnDescriptor descriptor,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
this(type, null, descriptor, useDecimal128, useLegacyDateTimestamp);
TypeUtil.checkParquetType(descriptor, type);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void init() throws URISyntaxException, IOException {
requestedSchema =
CometParquetReadSupport.clipParquetSchema(
requestedSchema, sparkSchema, isCaseSensitive, useFieldId, ignoreMissingIds);
if (requestedSchema.getColumns().size() != sparkSchema.size()) {
if (requestedSchema.getFieldCount() != sparkSchema.size()) {
throw new IllegalArgumentException(
String.format(
"Spark schema has %d columns while " + "Parquet schema has %d columns",
Expand Down
52 changes: 52 additions & 0 deletions common/src/main/java/org/apache/comet/parquet/Native.java
Original file line number Diff line number Diff line change
Expand Up @@ -234,4 +234,56 @@ public static native void setPageV2(
* @param handle the handle to the native Parquet column reader
*/
public static native void closeColumnReader(long handle);

///////////// Arrow Native Parquet Reader APIs
// TODO: Add partitionValues(?), improve requiredColumns to use a projection mask that corresponds
// to arrow.
// Add batch size, datetimeRebaseModeSpec, metrics(how?)...

/**
* Initialize a record batch reader for a PartitionedFile
*
* @param filePath
* @param start
* @param length
* @return a handle to the record batch reader, used in subsequent calls.
*/
public static native long initRecordBatchReader(
String filePath,
long fileSize,
long start,
long length,
byte[] requiredSchema,
String sessionTimezone);

// arrow native version of read batch
/**
* Read the next batch of data into memory on native side
*
* @param handle
* @return the number of rows read
*/
public static native int readNextRecordBatch(long handle);

// arrow native equivalent of currentBatch. 'columnNum' is number of the column in the record
// batch
/**
* Load the column corresponding to columnNum in the currently loaded record batch into JVM
*
* @param handle
* @param columnNum
* @param arrayAddr
* @param schemaAddr
*/
public static native void currentColumnBatch(
long handle, int columnNum, long arrayAddr, long schemaAddr);

// arrow native version to close record batch reader

/**
* Close the record batch reader. Free the resources
*
* @param handle
*/
public static native void closeRecordBatchReader(long handle);
}
Loading

0 comments on commit c3a552f

Please sign in to comment.