Skip to content

Commit

Permalink
Adding fail-fast feature to fast-serde (#519)
Browse files Browse the repository at this point in the history
* code cleanup

* bugfixes

* Generation errors logged on ERROR level.

* [fastserde] Adding fail-fast feature.
  • Loading branch information
krisso-rtb authored Oct 19, 2023
1 parent d661ed3 commit ab40879
Show file tree
Hide file tree
Showing 7 changed files with 187 additions and 73 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"type": "record",
"name": "SimpleRecord",
"namespace": "com.linkedin.avro.fastserde.generated.avro",
"fields": [
{
"name": "text",
"type": "string",
"default": "In vino veritas"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,23 @@
package com.linkedin.avro.fastserde;

import com.linkedin.avro.fastserde.generated.avro.SimpleRecord;
import com.linkedin.avro.fastserde.generated.avro.TestRecord;
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;

import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;

import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.specific.SpecificDatumWriter;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -20,7 +32,7 @@ public void testIsSupportedForFastDeserializer() {
supportedSchemaTypes.add(Schema.Type.ARRAY);

Map<Schema.Type, Schema> schemaTypes = new HashMap<>();
/**
/*
* Those types could be created by {@link Schema#create(org.apache.avro.Schema.Type)} function.
*/
schemaTypes.put(Schema.Type.RECORD, Schema.parse("{\"type\": \"record\", \"name\": \"test\", \"fields\":[]}"));
Expand Down Expand Up @@ -68,4 +80,81 @@ public void testBuildFastSpecificDeserializerWithCorrectClasspath() {
FastSerdeCache cache = FastSerdeCache.getDefaultInstance();
cache.buildFastSpecificDeserializer(TestRecord.SCHEMA$, TestRecord.SCHEMA$);
}

@Test(groups = "serializationTest", timeOut = 5_000L,
expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Fast specific serializer could not be generated.")
public void testSpecificSerializationFailsFast() throws Exception {
serializationShouldFailFast(FastSpecificDatumWriter::new);
}

@Test(groups = "serializationTest", timeOut = 5_000L,
expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Fast generic serializer could not be generated.")
public void testGenericSerializationFailsFast() throws Exception {
serializationShouldFailFast(FastGenericDatumWriter::new);
}

@Test(groups = "deserializationTest", timeOut = 5_000L,
expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Fast specific deserializer could not be generated.")
public void testSpecificDeserializationFailsFast() throws Exception {
deserializationShouldFailFast(FastSpecificDatumReader::new);
}

@Test(groups = "deserializationTest", timeOut = 5_000L,
expectedExceptions = UnsupportedOperationException.class,
expectedExceptionsMessageRegExp = "Fast generic deserializer could not be generated.")
public void testGenericDeserializationFailsFast() throws Exception {
deserializationShouldFailFast(FastGenericDatumReader::new);
}

private void serializationShouldFailFast(
BiFunction<Schema, FastSerdeCache, DatumWriter<SimpleRecord>> datumWriterFactory) throws Exception {
// given:
SimpleRecord data = new SimpleRecord();
data.put(0, "Veni, vidi, vici.");
FastSerdeCache cache = createCacheWithoutClassLoader();
DatumWriter<SimpleRecord> writer = datumWriterFactory.apply(data.getSchema(), cache);

int i = 0;
while (++i <= 100) {
BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(new ByteArrayOutputStream());
// should throw exception (except 1st iteration when fallback writer is always used)
writer.write(data, encoder);
Thread.sleep(50L);
}
}

private void deserializationShouldFailFast(
BiFunction<Schema, FastSerdeCache, DatumReader<SimpleRecord>> datumReaderFactory) throws Exception {
// given
SimpleRecord data = new SimpleRecord();
data.put(0, "Omnes una manet nox.");
FastSerdeCache cache = createCacheWithoutClassLoader();

SpecificDatumWriter<SimpleRecord> specificDatumWriter = new SpecificDatumWriter<>(data.getSchema());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryEncoder encoder = AvroCompatibilityHelper.newBinaryEncoder(baos);
specificDatumWriter.write(data, encoder);
encoder.flush();

DatumReader<SimpleRecord> datumReader = datumReaderFactory.apply(data.getSchema(), cache);

int i = 0;
while (++i <= 100) {
BinaryDecoder decoder = AvroCompatibilityHelper.newBinaryDecoder(baos.toByteArray());
// should throw exception (except 1st iteration when fallback reader is always used)
datumReader.read(null, decoder);
Thread.sleep(50L);
}
}

private FastSerdeCache createCacheWithoutClassLoader() throws IllegalAccessException, NoSuchFieldException {
FastSerdeCache cache = new FastSerdeCache(null, null, true);
Field classLoaderField = cache.getClass().getDeclaredField("classLoader");
classLoaderField.setAccessible(true);
classLoaderField.set(cache, null); // so that an exception is thrown while compiling generated class
return cache;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang3.StringUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ public T read(T reuse, Decoder in) throws IOException {
fastDeserializer = cachedFastDeserializer.get();
} else {
fastDeserializer = getFastDeserializerFromCache(cache, writerSchema, readerSchema, modelData);
if (!FastSerdeCache.isFastDeserializer(fastDeserializer)) {
// don't cache
} else {
if (FastSerdeCache.isFastDeserializer(fastDeserializer)) {
cachedFastDeserializer.compareAndSet(null, fastDeserializer);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("FastGenericDeserializer was generated and cached for reader schema: ["
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelperCommon;
import com.sun.codemodel.JBlock;
import com.sun.codemodel.JClass;
import com.sun.codemodel.JCodeModel;
import com.sun.codemodel.JConditional;
import com.sun.codemodel.JDefinedClass;
import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JFieldRef;
import com.sun.codemodel.JFieldVar;
import com.sun.codemodel.JMethod;
import com.sun.codemodel.JMod;
import com.sun.codemodel.JVar;

Expand All @@ -28,7 +25,6 @@
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.specific.SpecificData;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Loading

0 comments on commit ab40879

Please sign in to comment.