Skip to content

Commit

Permalink
Parquet: Implement Variant readers.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Jan 31, 2025
1 parent 8e456ae commit e21e8eb
Show file tree
Hide file tree
Showing 13 changed files with 2,320 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Set<Integer> struct(Types.StructType struct, List<Set<Integer>> fieldResu

@Override
public Set<Integer> field(Types.NestedField field, Set<Integer> fieldResult) {
if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType()) {
if ((includeStructIds && field.type().isStructType()) || field.type().isPrimitiveType() || field.type() instanceof Types.VariantType) {
fieldIds.add(field.fieldId());
}
return fieldIds;
Expand All @@ -72,4 +72,9 @@ public Set<Integer> map(Types.MapType map, Set<Integer> keyResult, Set<Integer>
}
return fieldIds;
}

@Override
public Set<Integer> variant() {
return null;
}
}
36 changes: 20 additions & 16 deletions core/src/main/java/org/apache/iceberg/variants/Variants.java
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ enum BasicType {
ARRAY
}

public static VariantMetadata emptyMetadata() {
return SerializedMetadata.EMPTY_V1_METADATA;
}

public static VariantMetadata metadata(ByteBuffer metadata) {
return SerializedMetadata.from(metadata);
}
Expand Down Expand Up @@ -209,59 +213,59 @@ public static VariantPrimitive<Void> ofNull() {
return new PrimitiveWrapper<>(PhysicalType.NULL, null);
}

static VariantPrimitive<Boolean> of(boolean value) {
public static VariantPrimitive<Boolean> of(boolean value) {
return new PrimitiveWrapper<>(PhysicalType.BOOLEAN_TRUE, value);
}

static VariantPrimitive<Byte> of(byte value) {
public static VariantPrimitive<Byte> of(byte value) {
return new PrimitiveWrapper<>(PhysicalType.INT8, value);
}

static VariantPrimitive<Short> of(short value) {
public static VariantPrimitive<Short> of(short value) {
return new PrimitiveWrapper<>(PhysicalType.INT16, value);
}

static VariantPrimitive<Integer> of(int value) {
public static VariantPrimitive<Integer> of(int value) {
return new PrimitiveWrapper<>(PhysicalType.INT32, value);
}

static VariantPrimitive<Long> of(long value) {
public static VariantPrimitive<Long> of(long value) {
return new PrimitiveWrapper<>(PhysicalType.INT64, value);
}

static VariantPrimitive<Float> of(float value) {
public static VariantPrimitive<Float> of(float value) {
return new PrimitiveWrapper<>(PhysicalType.FLOAT, value);
}

static VariantPrimitive<Double> of(double value) {
public static VariantPrimitive<Double> of(double value) {
return new PrimitiveWrapper<>(PhysicalType.DOUBLE, value);
}

static VariantPrimitive<Integer> ofDate(int value) {
public static VariantPrimitive<Integer> ofDate(int value) {
return new PrimitiveWrapper<>(PhysicalType.DATE, value);
}

static VariantPrimitive<Integer> ofIsoDate(String value) {
public static VariantPrimitive<Integer> ofIsoDate(String value) {
return ofDate(DateTimeUtil.isoDateToDays(value));
}

static VariantPrimitive<Long> ofTimestamptz(long value) {
public static VariantPrimitive<Long> ofTimestamptz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
public static VariantPrimitive<Long> ofIsoTimestamptz(String value) {
return ofTimestamptz(DateTimeUtil.isoTimestamptzToMicros(value));
}

static VariantPrimitive<Long> ofTimestampntz(long value) {
public static VariantPrimitive<Long> ofTimestampntz(long value) {
return new PrimitiveWrapper<>(PhysicalType.TIMESTAMPNTZ, value);
}

static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
public static VariantPrimitive<Long> ofIsoTimestampntz(String value) {
return ofTimestampntz(DateTimeUtil.isoTimestampToMicros(value));
}

static VariantPrimitive<BigDecimal> of(BigDecimal value) {
public static VariantPrimitive<BigDecimal> of(BigDecimal value) {
int bitLength = value.unscaledValue().bitLength();
if (bitLength < 32) {
return new PrimitiveWrapper<>(PhysicalType.DECIMAL4, value);
Expand All @@ -274,11 +278,11 @@ static VariantPrimitive<BigDecimal> of(BigDecimal value) {
throw new UnsupportedOperationException("Unsupported decimal precision: " + value.precision());
}

static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
public static VariantPrimitive<ByteBuffer> of(ByteBuffer value) {
return new PrimitiveWrapper<>(PhysicalType.BINARY, value);
}

static VariantPrimitive<String> of(String value) {
public static VariantPrimitive<String> of(String value) {
return new PrimitiveWrapper<>(PhysicalType.STRING, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.iceberg.variants;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
Expand All @@ -27,10 +29,55 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

public class VariantTestUtil {
private VariantTestUtil() {}

public static void assertEqual(VariantMetadata expected, VariantMetadata actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.dictionarySize())
.as("Dictionary size should match")
.isEqualTo(expected.dictionarySize());

for (int i = 0; i < expected.dictionarySize(); i += 1) {
assertThat(actual.get(i)).isEqualTo(expected.get(i));
}
}

public static void assertEqual(VariantValue expected, VariantValue actual) {
assertThat(actual).isNotNull();
assertThat(expected).isNotNull();
assertThat(actual.type()).as("Variant type should match").isEqualTo(expected.type());

if (expected.type() == Variants.PhysicalType.OBJECT) {
VariantObject expectedObject = expected.asObject();
VariantObject actualObject = actual.asObject();
assertThat(actualObject.numFields())
.as("Variant object num fields should match")
.isEqualTo(expectedObject.numFields());
for (String fieldName : expectedObject.fieldNames()) {
assertEqual(expectedObject.get(fieldName), actualObject.get(fieldName));
}

} else if (expected.type() == Variants.PhysicalType.ARRAY) {
VariantArray expectedArray = expected.asArray();
VariantArray actualArray = actual.asArray();
assertThat(actualArray.numElements())
.as("Variant array num element should match")
.isEqualTo(expectedArray.numElements());
for (int i = 0; i < expectedArray.numElements(); i += 1) {
assertEqual(expectedArray.get(i), actualArray.get(i));
}

} else {
assertThat(actual.asPrimitive().get())
.as("Variant primitive value should match")
.isEqualTo(expected.asPrimitive().get());
}
}

private static byte primitiveHeader(int primitiveType) {
return (byte) (primitiveType << 2);
}
Expand Down Expand Up @@ -60,7 +107,11 @@ static SerializedPrimitive createString(String string) {
return SerializedPrimitive.from(buffer, buffer.get(0));
}

static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
public static ByteBuffer emptyMetadata() {
return createMetadata(ImmutableList.of(), true);
}

public static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortNames) {
if (fieldNames.isEmpty()) {
return SerializedMetadata.EMPTY_V1_BUFFER;
}
Expand Down Expand Up @@ -108,7 +159,7 @@ static ByteBuffer createMetadata(Collection<String> fieldNames, boolean sortName
return buffer;
}

static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
public static ByteBuffer createObject(ByteBuffer metadataBuffer, Map<String, VariantValue> data) {
// create the metadata to look up field names
VariantMetadata metadata = Variants.metadata(metadataBuffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.data.parquet;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -26,7 +27,9 @@
import org.apache.iceberg.parquet.ParquetSchemaUtil;
import org.apache.iceberg.parquet.ParquetValueReader;
import org.apache.iceberg.parquet.ParquetValueReaders;
import org.apache.iceberg.parquet.ParquetVariantVisitor;
import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
import org.apache.iceberg.parquet.VariantReaderBuilder;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -431,6 +434,16 @@ public ParquetValueReader<?> primitive(
}
}

@Override
public ParquetValueReader<?> variant(Types.VariantType iVariant, ParquetValueReader<?> reader) {
return reader;
}

@Override
public ParquetVariantVisitor<ParquetValueReader<?>> variantVisitor() {
return new VariantReaderBuilder(type, Arrays.asList(currentPath()));
}

MessageType type() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
Expand Down Expand Up @@ -75,6 +76,32 @@ private static Schema convertInternal(
converter.getAliases());
}

/**
* Returns true if the name identifies a field in the struct/group.
*
* @param group a GroupType
* @param name a String name
* @return true if the group contains a field with the given name
*/
public static boolean hasField(GroupType group, String name) {
return fieldType(group, name) != null;
}

/**
* Returns the Type of the named field in the struct/group, or null.
*
* @param group a GroupType
* @param name a String name
* @return the Type of the field in the group, or null if it is not present.
*/
public static Type fieldType(GroupType group, String name) {
try {
return group.getType(name);
} catch (InvalidRecordException ignored) {
return null;
}
}

public static MessageType pruneColumns(MessageType fileSchema, Schema expectedSchema) {
// column order must match the incoming type, so it doesn't matter that the ids are unordered
Set<Integer> selectedIds = TypeUtil.getProjectedIds(expectedSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public static ParquetValueReader<Integer> unboxed(ColumnDescriptor desc) {
return new UnboxedReader<>(desc);
}

public static ParquetValueReader<Byte> intsAsByte(ColumnDescriptor desc) {
return new IntAsByteReader(desc);
}

public static ParquetValueReader<Short> intsAsShort(ColumnDescriptor desc) {
return new IntAsShortReader(desc);
}

public static ParquetValueReader<String> strings(ColumnDescriptor desc) {
return new StringReader(desc);
}
Expand Down Expand Up @@ -390,6 +398,28 @@ public String read(String reuse) {
}
}

private static class IntAsByteReader extends UnboxedReader<Byte> {
private IntAsByteReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Byte read(Byte ignored) {
return (byte) readInteger();
}
}

private static class IntAsShortReader extends UnboxedReader<Short> {
private IntAsShortReader(ColumnDescriptor desc) {
super(desc);
}

@Override
public Short read(Short ignored) {
return (short) readInteger();
}
}

public static class IntAsLongReader extends UnboxedReader<Long> {
public IntAsLongReader(ColumnDescriptor desc) {
super(desc);
Expand Down
Loading

0 comments on commit e21e8eb

Please sign in to comment.