Skip to content

Commit

Permalink
[fast-avro][bugfix] Delegating setSchema() call to coldDeserializer f…
Browse files Browse the repository at this point in the history
…rom FastGenericDatumReader.

It's needed to deserialize 1st record(s) from file using DataFileStream.
  • Loading branch information
krisso-rtb committed Jan 15, 2024
1 parent 0db5b43 commit 3c4540c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ void dataFileStreamShouldReadTheSameValuesAsValuesSentToDataFileWriter() throws
DataFileStream<OuterRecord1> dataFileStream = new DataFileStream<>(inputStream, datumReader);

// when: pre-populated bytes array is consumed by DataFileStream
for (OuterRecord1 outerRecord : dataFileStream) { // throws NPE
for (OuterRecord1 outerRecord : dataFileStream) {
InnerRecord1 innerRecord = (InnerRecord1) FastSerdeTestsSupport.getField(outerRecord, "innerRecord");
CharSequence comment = (CharSequence) FastSerdeTestsSupport.getField(innerRecord, "comment");
actualComments.add(comment.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.linkedin.avro.fastserde.customized.DatumReaderCustomization;
import java.io.IOException;

import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;

import static com.linkedin.avro.fastserde.customized.DatumReaderCustomization.*;
Expand All @@ -17,5 +19,15 @@ default T deserialize(T reuse, Decoder d) throws IOException {
return deserialize(reuse, d, DEFAULT_DATUM_READER_CUSTOMIZATION);
}

/**
* Set the writer's schema.
* @see org.apache.avro.io.DatumReader#setSchema(Schema)
*/
default void setSchema(Schema writerSchema) {
// Implement this method only in vanilla-avro-based classes (e.g. fallback scenario).
// Normally for generated deserializers it doesn't make sense.
throw new UnsupportedOperationException("Can't change schema for already generated class.");
}

T deserialize(T reuse, Decoder d, DatumReaderCustomization customization) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public void setSchema(Schema schema) {
if (readerSchema == null) {
readerSchema = writerSchema;
}

coldDeserializer.setSchema(schema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static boolean isSupportedForFastSerializer(Schema.Type schemaType) {
Schema.Type.ARRAY);
}

public static boolean isFastDeserializer(FastDeserializer deserializer) {
public static boolean isFastDeserializer(FastDeserializer<?> deserializer) {
return deserializer.isBackedByGeneratedClass();
}

Expand Down Expand Up @@ -476,7 +476,7 @@ private FastDeserializer<?> buildSpecificDeserializer(Schema writerSchema, Schem
LOGGER.error("Deserializer class instantiation exception", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

/**
Expand Down Expand Up @@ -536,7 +536,7 @@ private FastDeserializer<?> buildGenericDeserializer(Schema writerSchema, Schema
LOGGER.error("Deserializer class instantiation exception:", e);
}

return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema, modelData, customization, failFast, true);
return new FastSerdeUtils.FastDeserializerWithAvroGenericImpl<>(writerSchema, readerSchema, modelData, customization, failFast, true);
}

public FastSerializer<?> buildFastSpecificSerializer(Schema schema, SpecificData modelData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ public FastDeserializerWithAvroSpecificImpl(Schema writerSchema, Schema readerSc
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
this.customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down Expand Up @@ -103,6 +108,11 @@ public FastDeserializerWithAvroGenericImpl(Schema writerSchema, Schema readerSch
this.runtimeClassGenerationDone = runtimeClassGenerationDone;
}

@Override
public void setSchema(Schema writerSchema) {
customizedDatumReader.setSchema(writerSchema);
}

@Override
public V deserialize(V reuse, Decoder d, DatumReaderCustomization customization) throws IOException {
if (failFast) {
Expand Down

0 comments on commit 3c4540c

Please sign in to comment.