Skip to content

Commit

Permalink
Spark 3.5: Iceberg / DataFusion Comet integration
Browse files Browse the repository at this point in the history
  • Loading branch information
huaxingao committed Feb 1, 2025
1 parent 40334f5 commit a346649
Show file tree
Hide file tree
Showing 18 changed files with 932 additions and 24 deletions.
5 changes: 5 additions & 0 deletions spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
dependencies {
implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow')
api project(':iceberg-api')
annotationProcessor libs.immutables.value
compileOnly libs.immutables.value
implementation project(':iceberg-common')
implementation project(':iceberg-core')
implementation project(':iceberg-data')
Expand All @@ -73,6 +75,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {
exclude group: 'org.roaringbitmap'
}

compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"

implementation libs.parquet.column
implementation libs.parquet.hadoop

Expand Down Expand Up @@ -179,6 +183,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility
testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0"

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import java.io.Serializable;
import org.immutables.value.Value;

@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
int batchSize();

ParquetReaderType readerType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

/** Enumerates the types of Parquet readers. */
public enum ParquetReaderType {
/** ICEBERG type utilizes the built-in Parquet reader. */
ICEBERG,

/**
* COMET type changes the Parquet reader to the Apache DataFusion Comet Parquet reader. Comet
* Parquet reader performs I/O and decompression in the JVM but decodes in native to improve
* performance. Additionally, Comet will convert Spark's physical plan into a native physical plan
* and execute this plan natively.
*
* <p>TODO: Implement {@link org.apache.comet.parquet.SupportsComet} in SparkScan to convert Spark
* physical plan to native physical plan for native execution.
*/
COMET;

public static ParquetReaderType fromString(String typeAsString) {
Preconditions.checkArgument(typeAsString != null, "Parquet reader type is null");
try {
return ParquetReaderType.valueOf(typeAsString.toUpperCase());
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Unknown parquet reader type: " + typeAsString);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,4 +355,12 @@ public boolean reportColumnStats() {
.defaultValue(SparkSQLProperties.REPORT_COLUMN_STATS_DEFAULT)
.parse();
}

public ParquetReaderType parquetReaderType() {
return confParser
.enumConf(ParquetReaderType::fromString)
.sessionConf(SparkSQLProperties.PARQUET_READER_TYPE)
.defaultValue(SparkSQLProperties.PARQUET_READER_TYPE_DEFAULT)
.parse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";

// Controls which Parquet reader implementation to use
public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type";
public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET;
// Controls whether to perform the nullability check during writes
public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability";
public static final boolean CHECK_NULLABILITY_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.data.vectorized;

import java.io.IOException;
import java.util.Map;
import org.apache.comet.parquet.AbstractColumnReader;
import org.apache.comet.parquet.ColumnReader;
import org.apache.comet.parquet.TypeUtil;
import org.apache.comet.parquet.Utils;
import org.apache.comet.shaded.arrow.c.CometSchemaImporter;
import org.apache.comet.shaded.arrow.memory.RootAllocator;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.vectorized.ColumnVector;

class CometColumnReader implements VectorizedReader<ColumnVector> {
// use the Comet default batch size
public static final int DEFAULT_BATCH_SIZE = 8192;

private final ColumnDescriptor descriptor;
private final DataType sparkType;

// The delegated ColumnReader from Comet side
private AbstractColumnReader delegate;
private boolean initialized = false;
private int batchSize = DEFAULT_BATCH_SIZE;
private CometSchemaImporter importer;

CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) {
this.sparkType = sparkType;
this.descriptor = descriptor;
}

CometColumnReader(Types.NestedField field) {
DataType dataType = SparkSchemaUtil.convert(field.type());
StructField structField = new StructField(field.name(), dataType, false, Metadata.empty());
this.sparkType = dataType;
this.descriptor = TypeUtil.convertToParquet(structField);
}

public AbstractColumnReader delegate() {
return delegate;
}

void setDelegate(AbstractColumnReader delegate) {
this.delegate = delegate;
}

void setInitialized(boolean initialized) {
this.initialized = initialized;
}

public int batchSize() {
return batchSize;
}

/**
* This method is to initialized/reset the CometColumnReader. This needs to be called for each row
* group after readNextRowGroup, so a new dictionary encoding can be set for each of the new row
* groups.
*/
public void reset() {
if (importer != null) {
importer.close();
}

if (delegate != null) {
delegate.close();
}

this.importer = new CometSchemaImporter(new RootAllocator());
this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false);
this.initialized = true;
}

public ColumnDescriptor descriptor() {
return descriptor;
}

/** Returns the Spark data type for this column. */
public DataType sparkType() {
return sparkType;
}

/**
* Set the page reader to be 'pageReader'.
*
* <p>NOTE: this should be called before reading a new Parquet column chunk, and after {@link
* CometColumnReader#reset} is called.
*/
public void setPageReader(PageReader pageReader) throws IOException {
Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first");
((ColumnReader) delegate).setPageReader(pageReader);
}

@Override
public void close() {
// close resources on native side
if (importer != null) {
importer.close();
}

if (delegate != null) {
delegate.close();
}
}

@Override
public void setBatchSize(int size) {
this.batchSize = size;
}

@Override
public void setRowGroupInfo(
PageReadStore pageReadStore, Map<ColumnPath, ColumnChunkMetaData> map, long size) {
throw new UnsupportedOperationException("Not supported");
}

@Override
public ColumnVector read(ColumnVector reuse, int numRowsToRead) {
throw new UnsupportedOperationException("Not supported");
}
}
Loading

0 comments on commit a346649

Please sign in to comment.