Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add hard limit for FastSerdeCache #82

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,20 @@ public final class FastSerdeCache {

private static volatile FastSerdeCache _INSTANCE;

/**
* Fast-avro will generate and load serializer and deserializer(SerDes) classes into metaspace during runtime.
* During serialization and deserialization, fast-avro also leverages JIT compilation to boost the SerDes speed.
* And JIT compilation code is saved in code cache.
* Too much usage of metaspace and code cache will bring GC/OOM issue.
*
* We set a hard limit of the total number of SerDes classes generated and loaded by fast-avro.
* By default, the limit is set to MAX_INT.
* Fast-avro will fall back to regular avro after the limit is hit.
* One could set the limit through {@link FastSerdeCache} constructors.
*/
private volatile int generatedFastSerDesLimit = Integer.MAX_VALUE;
private final AtomicInteger generatedSerDesNum = new AtomicInteger(0);

private final Map<String, FastDeserializer<?>> fastSpecificRecordDeserializersCache =
new FastAvroConcurrentHashMap<>();
private final Map<String, FastDeserializer<?>> fastGenericRecordDeserializersCache =
Expand Down Expand Up @@ -81,6 +95,20 @@ public FastSerdeCache(Executor executorService, Supplier<String> compileClassPat
this(executorService, compileClassPathSupplier.get());
}

/**
*
* @param executorService
* {@link Executor} used by serializer/deserializer compile threads
* @param compileClassPathSupplier
* custom classpath {@link Supplier}
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(Executor executorService, Supplier<String> compileClassPathSupplier, int limit) {
this(executorService, compileClassPathSupplier);
this.generatedFastSerDesLimit = limit;
}

public FastSerdeCache(String compileClassPath) {
this();
this.compileClassPath = Optional.ofNullable(compileClassPath);
Expand Down Expand Up @@ -122,6 +150,15 @@ private FastSerdeCache() {
this((Executor) null);
}

/**
* @param limit
* custom number {@link #generatedFastSerDesLimit}
*/
public FastSerdeCache(int limit) {
this();
this.generatedFastSerDesLimit = limit;
}

/**
* Gets default {@link FastSerdeCache} instance. Default instance classpath can be customized via
* {@value #CLASSPATH} or {@value #CLASSPATH_SUPPLIER} system properties.
Expand Down Expand Up @@ -206,10 +243,11 @@ public FastDeserializer<?> getFastSpecificDeserializer(Schema writerSchema, Sche
new FastDeserializerWithAvroSpecificImpl<>(writerSchema, readerSchema));
if (deserializer == null) {
deserializer = fastSpecificRecordDeserializersCache.get(schemaKey);
CompletableFuture.supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor)
.thenAccept(d -> {
fastSpecificRecordDeserializersCache.put(schemaKey, d);
});
buildFastClassWithNumCheck(
() -> CompletableFuture.supplyAsync(() -> buildSpecificDeserializer(writerSchema, readerSchema), executor)
.thenAccept(s -> {
fastSpecificRecordDeserializersCache.put(schemaKey, s);
}));
}
}

Expand All @@ -234,10 +272,11 @@ public FastDeserializer<?> getFastGenericDeserializer(Schema writerSchema, Schem
new FastDeserializerWithAvroGenericImpl(writerSchema, readerSchema));
if (deserializer == null) {
deserializer = fastGenericRecordDeserializersCache.get(schemaKey);
CompletableFuture.supplyAsync(() -> buildGenericDeserializer(writerSchema, readerSchema), executor)
.thenAccept(d -> {
fastGenericRecordDeserializersCache.put(schemaKey, d);
});
buildFastClassWithNumCheck(
() -> CompletableFuture.supplyAsync(() -> buildGenericDeserializer(writerSchema, readerSchema), executor)
.thenAccept(s -> {
fastGenericRecordDeserializersCache.put(schemaKey, s);
}));
}
}
return deserializer;
Expand All @@ -258,9 +297,11 @@ public FastSerializer<?> getFastSpecificSerializer(Schema schema) {
fastSpecificRecordSerializersCache.putIfAbsent(schemaKey, new FastSerializerWithAvroSpecificImpl(schema));
if (serializer == null) {
serializer = fastSpecificRecordSerializersCache.get(schemaKey);
CompletableFuture.supplyAsync(() -> buildSpecificSerializer(schema), executor).thenAccept(s -> {
fastSpecificRecordSerializersCache.put(schemaKey, s);
});
buildFastClassWithNumCheck(
() -> CompletableFuture.supplyAsync(() -> buildSpecificSerializer(schema), executor)
.thenAccept(s -> {
fastSpecificRecordSerializersCache.put(schemaKey, s);
}));
}
}

Expand All @@ -283,9 +324,11 @@ public FastSerializer<?> getFastGenericSerializer(Schema schema) {
fastGenericRecordSerializersCache.putIfAbsent(schemaKey, new FastSerializerWithAvroGenericImpl(schema));
if (serializer == null) {
serializer = fastGenericRecordSerializersCache.get(schemaKey);
CompletableFuture.supplyAsync(() -> buildGenericSerializer(schema), executor).thenAccept(s -> {
fastGenericRecordSerializersCache.put(schemaKey, s);
});
buildFastClassWithNumCheck(
() -> CompletableFuture.supplyAsync(() -> buildGenericSerializer(schema), executor)
.thenAccept(s -> {
fastGenericRecordSerializersCache.put(schemaKey, s);
}));
}
}
return serializer;
Expand All @@ -296,6 +339,26 @@ private String getSchemaKey(Schema writerSchema, Schema readerSchema) {
Utils.getSchemaFingerprint(readerSchema));
}

/**
* A wrapper function that limits the total number of fast de/serilizer calsses generated
*
* @param runnable The function to build and save fast de/serializer
*/
private void buildFastClassWithNumCheck(Runnable runnable) {
try {
if (this.generatedSerDesNum.incrementAndGet() <= this.generatedFastSerDesLimit) {
runnable.run();
} else if (this.generatedSerDesNum.get() == this.generatedFastSerDesLimit + 1) {
// We still want to print the warning when the limit is hit
LOGGER.warn("Generated fast serdes classes number hits limit {}", this.generatedFastSerDesLimit);
} else {
LOGGER.debug("Generated serdes number {}, with fast serdes limit set to {}", this.generatedSerDesNum.get(), this.generatedFastSerDesLimit);
}
} catch (Exception e) {
LOGGER.error("Fast serdes class generation failed");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we print out the full error stacktrace here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exceptions are handled in other functions. So a stack trace here may not provide any useful information.

}
}

/**
* This function will generate a fast specific deserializer, and it will throw exception if anything wrong happens.
* This function can be used to verify whether current {@link FastSerdeCache} could generate proper fast deserializer.
Expand Down