From 42ca14f7f35bad39301921a33b34699c72986eec Mon Sep 17 00:00:00 2001 From: Karthik Ramgopal Date: Thu, 18 Jan 2024 09:45:53 -0800 Subject: [PATCH] Improve performance of AvscSchemaWriter (#537) Co-authored-by: Karthik Ramgopal --- .../builder/plugins/BuilderPluginContext.java | 28 +- .../avroutil1/builder/SchemaBuilder.java | 20 +- .../writer/avsc/AvscSchemaWriter.java | 315 ++++++++++-------- 3 files changed, 210 insertions(+), 153 deletions(-) diff --git a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/plugins/BuilderPluginContext.java b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/plugins/BuilderPluginContext.java index 8c96be21e..0c90ea36c 100644 --- a/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/plugins/BuilderPluginContext.java +++ b/avro-builder/builder-spi/src/main/java/com/linkedin/avroutil1/builder/plugins/BuilderPluginContext.java @@ -45,19 +45,27 @@ public void run() throws Exception { throw new IllegalStateException("run() has already been invoked"); } - //"seal" any internal state to prevent plugins from trying to do weird things during execution + // "seal" any internal state to prevent plugins from trying to do weird things during execution sealed = true; - int operationCount = operations.stream().collect(StreamUtil.toParallelStream(op -> { - try { - op.run(operationContext); - } catch (Exception e) { - throw new IllegalStateException("Exception running operation", e); - } + if (!operations.isEmpty()) { + long operationStart = System.currentTimeMillis(); + final int parallelism = Math.min(operations.size(), 5); + int operationCount = operations.stream().collect(StreamUtil.toParallelStream(op -> { + try { + op.run(operationContext); + } catch (Exception e) { + throw new IllegalStateException("Exception running operation", e); + } - return 1; - }, 2)).reduce(0, Integer::sum); + return 1; + }, parallelism)).reduce(0, Integer::sum); - LOGGER.info("Executed {} operations for builder plugins", operationCount); + long operationEnd = System.currentTimeMillis(); + LOGGER.info("Executed {} operations with parallelism of {} for builder plugins in {} millis", operationCount, + parallelism, operationEnd - operationStart); + } else { + LOGGER.info("No operations specified to run"); + } } } diff --git a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java index edb8b5afe..cd10cab83 100644 --- a/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java +++ b/avro-builder/builder/src/main/java/com/linkedin/avroutil1/builder/SchemaBuilder.java @@ -28,6 +28,8 @@ import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -35,13 +37,20 @@ */ public class SchemaBuilder { + private static final Logger LOGGER = LoggerFactory.getLogger(SchemaBuilder.class); + private SchemaBuilder() { } public static void main(String[] args) throws Exception { + long start = System.currentTimeMillis(); + + long pluginLoadStart = System.currentTimeMillis(); List plugins = loadPlugins(1); + long pluginLoadEnd = System.currentTimeMillis(); + LOGGER.info("Loaded {} plugins in {} millis.", plugins.size(), pluginLoadEnd - pluginLoadStart); + long optionParseStart = System.currentTimeMillis(); OptionParser parser = new OptionParser(); - OptionSpec inputOpt = parser.accepts("input", "Schema or directory of schemas to compile [REQUIRED]") .withRequiredArg().required() .describedAs("file"); @@ -253,7 +262,10 @@ public static void main(String[] args) throws Exception { ); opConfig.validateParameters(); + long optionParseEnd = System.currentTimeMillis(); + LOGGER.info("Parsed all options in {} millis.", optionParseEnd - optionParseStart); + long operationContextBuildStart = System.currentTimeMillis(); OperationContextBuilder operationContextBuilder; switch (opConfig.getGeneratorType()) { case AVRO_UTIL: @@ -267,6 +279,9 @@ public static void main(String[] args) throws Exception { throw new IllegalStateException("unhandled: " + opConfig.getGeneratorType()); } OperationContext opContext = operationContextBuilder.buildOperationContext(opConfig); + long operationContextBuildEnd = System.currentTimeMillis(); + LOGGER.info("Built operation context in {} millis.", operationContextBuildStart - operationContextBuildEnd); + BuilderPluginContext context = new BuilderPluginContext(opContext); // Allow other plugins to add operations @@ -275,6 +290,9 @@ public static void main(String[] args) throws Exception { } context.run(); + + long end = System.currentTimeMillis(); + LOGGER.info("Finished running SchemaBuilder in {} millis", end - start); } private static List loadPlugins(@SuppressWarnings("SameParameterValue") int currentApiVersion) { diff --git a/parser/src/main/java/com/linkedin/avroutil1/writer/avsc/AvscSchemaWriter.java b/parser/src/main/java/com/linkedin/avroutil1/writer/avsc/AvscSchemaWriter.java index 73a28e0f8..f06c12859 100644 --- a/parser/src/main/java/com/linkedin/avroutil1/writer/avsc/AvscSchemaWriter.java +++ b/parser/src/main/java/com/linkedin/avroutil1/writer/avsc/AvscSchemaWriter.java @@ -6,6 +6,8 @@ package com.linkedin.avroutil1.writer.avsc; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; import com.linkedin.avroutil1.model.AvroArrayLiteral; import com.linkedin.avroutil1.model.AvroArraySchema; import com.linkedin.avroutil1.model.AvroBooleanLiteral; @@ -35,28 +37,21 @@ import com.linkedin.avroutil1.model.JsonPropertiesContainer; import com.linkedin.avroutil1.model.SchemaOrRef; import com.linkedin.avroutil1.parser.avsc.AvscUnparsedLiteral; -import java.io.StringReader; +import java.io.IOException; import java.io.StringWriter; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Locale; import java.util.Map; import java.util.Set; -import javax.json.Json; -import javax.json.JsonArrayBuilder; -import javax.json.JsonObjectBuilder; -import javax.json.JsonReader; -import javax.json.JsonValue; -import javax.json.JsonWriter; -import javax.json.stream.JsonGenerator; public class AvscSchemaWriter implements AvroSchemaWriter { + private static final JsonFactory JSON_FACTORY = new JsonFactory(); + @Override public List write(AvroSchema schema, AvscWriterConfig config) { String avsc = generateAvsc(schema, AvscWriterConfig.CORRECT_MITIGATED); @@ -82,7 +77,7 @@ protected Path pathForSchema(AvroSchema maybeNamed) { if (i == parts.length - 1) { pathParts[i - 1] = parts[i] + "." + AvscFile.SUFFIX; } else { - pathParts[i -1] = parts[i]; + pathParts[i - 1] = parts[i]; } } @@ -91,104 +86,119 @@ protected Path pathForSchema(AvroSchema maybeNamed) { public String generateAvsc(AvroSchema schema, AvscWriterConfig config) { AvscWriterContext context = new AvscWriterContext(); - Map jsonConfig = new HashMap<>(); - if (config.isPretty()) { - jsonConfig.put(JsonGenerator.PRETTY_PRINTING, "true"); - } + StringWriter stringWriter = new StringWriter(); - JsonValue dom = writeSchema(schema, context, config); - JsonWriter writer = Json.createWriterFactory(jsonConfig).createWriter(stringWriter); - writer.write(dom); + try (JsonGenerator generator = JSON_FACTORY.createGenerator(stringWriter)) { + if (config.isPretty()) { + generator.useDefaultPrettyPrinter(); + } + writeSchema(schema, context, config, generator); + } catch (IOException e) { + throw new IllegalStateException("Error serializing avro schema to avsc", e); + } + return stringWriter.toString(); } - protected JsonValue writeSchema(AvroSchema schema, AvscWriterContext context, AvscWriterConfig config) { + protected void writeSchema(AvroSchema schema, AvscWriterContext context, AvscWriterConfig config, + JsonGenerator generator) throws IOException { AvroType type = schema.type(); - JsonObjectBuilder definitionBuilder; switch (type) { case ENUM: case FIXED: case RECORD: - return writeNamedSchema((AvroNamedSchema) schema, context, config); + writeNamedSchema((AvroNamedSchema) schema, context, config, generator); + break; case ARRAY: AvroArraySchema arraySchema = (AvroArraySchema) schema; - definitionBuilder = Json.createObjectBuilder(); - definitionBuilder.add("type", "array"); - definitionBuilder.add("items", writeSchema(arraySchema.getValueSchema(), context, config)); - emitJsonProperties(schema, context, config, definitionBuilder); - return definitionBuilder.build(); + generator.writeStartObject(); + generator.writeStringField("type", "array"); + generator.writeFieldName("items"); + writeSchema(arraySchema.getValueSchema(), context, config, generator); + emitJsonProperties(schema, context, config, generator); + generator.writeEndObject(); + break; case MAP: AvroMapSchema mapSchema = (AvroMapSchema) schema; - definitionBuilder = Json.createObjectBuilder(); - definitionBuilder.add("type", "map"); - definitionBuilder.add("values", writeSchema(mapSchema.getValueSchema(), context, config)); - emitJsonProperties(schema, context, config, definitionBuilder); - return definitionBuilder.build(); + generator.writeStartObject(); + generator.writeStringField("type", "map"); + generator.writeFieldName("values"); + writeSchema(mapSchema.getValueSchema(), context, config, generator); + emitJsonProperties(schema, context, config, generator); + generator.writeEndObject(); + break; case UNION: AvroUnionSchema unionSchema = (AvroUnionSchema) schema; - JsonArrayBuilder unionBuilder = Json.createArrayBuilder(); + generator.writeStartArray(); for (SchemaOrRef unionBranch : unionSchema.getTypes()) { AvroSchema branchSchema = unionBranch.getSchema(); //will throw if unresolved ref - unionBuilder.add(writeSchema(branchSchema, context, config)); + writeSchema(branchSchema, context, config, generator); } - return unionBuilder.build(); + generator.writeEndArray(); + break; default: AvroPrimitiveSchema primitiveSchema = (AvroPrimitiveSchema) schema; - if (!primitiveSchema.hasProperties()) { - return Json.createValue(primitiveSchema.type().name().toLowerCase(Locale.ROOT)); + if (primitiveSchema.hasProperties()) { + generator.writeStartObject(); + generator.writeStringField("type", primitiveSchema.type().toTypeName()); + emitJsonProperties(schema, context, config, generator); + generator.writeEndObject(); + } else { + generator.writeString(primitiveSchema.type().toTypeName()); } - definitionBuilder = Json.createObjectBuilder(); - definitionBuilder.add("type", primitiveSchema.type().toTypeName()); - emitJsonProperties(primitiveSchema, context, config, definitionBuilder); - return definitionBuilder.build(); + break; } } - protected JsonValue writeNamedSchema(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config) { + protected void writeNamedSchema(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config, + JsonGenerator generator) throws IOException { boolean seenBefore = context.schemaEncountered(schema); if (seenBefore) { - return writeSchemaRef(schema, context, config); + writeSchemaRef(schema, context, config, generator); + return; } - //common parts to all named schemas - JsonObjectBuilder definitionBuilder = Json.createObjectBuilder(); - AvroName extraAlias = emitSchemaName(schema, context, config, definitionBuilder); - emitSchemaAliases(schema, context, config, extraAlias, definitionBuilder); + + // Common parts for all named schemas. + generator.writeStartObject(); + AvroName extraAlias = emitSchemaName(schema, context, config, generator); + emitSchemaAliases(schema, context, config, extraAlias, generator); if (schema.getDoc() != null) { - definitionBuilder.add("doc", Json.createValue(schema.getDoc())); + generator.writeStringField("doc", schema.getDoc()); } AvroType type = schema.type(); switch (type) { case ENUM: AvroEnumSchema enumSchema = (AvroEnumSchema) schema; - definitionBuilder.add("type", "enum"); - List symbols = enumSchema.getSymbols(); - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); - for (String symbol : symbols) { - arrayBuilder.add(symbol); + generator.writeStringField("type", "enum"); + generator.writeFieldName("symbols"); + generator.writeStartArray(); + for (String symbol : enumSchema.getSymbols()) { + generator.writeString(symbol); } - definitionBuilder.add("symbols", arrayBuilder); - String defaultSymbol = enumSchema.getDefaultSymbol(); - if (defaultSymbol != null) { - definitionBuilder.add("default", Json.createValue(defaultSymbol)); + generator.writeEndArray(); + if (enumSchema.getDefaultSymbol() != null) { + generator.writeStringField("default", enumSchema.getDefaultSymbol()); } break; case FIXED: AvroFixedSchema fixedSchema = (AvroFixedSchema) schema; - definitionBuilder.add("type", "fixed"); - definitionBuilder.add("size", Json.createValue(fixedSchema.getSize())); + generator.writeStringField("type", "fixed"); + generator.writeNumberField("size", fixedSchema.getSize()); break; case RECORD: AvroRecordSchema recordSchema = (AvroRecordSchema) schema; - definitionBuilder.add("type", "record"); //TODO - support error types? - emitRecordFields(recordSchema, context, config, definitionBuilder); + //TODO - support error types? + generator.writeStringField("type", "record"); + emitRecordFields(recordSchema, context, config, generator); break; default: throw new IllegalStateException("not expecting " + type); } - emitJsonProperties(schema, context, config, definitionBuilder); + emitJsonProperties(schema, context, config, generator); + generator.writeEndObject(); + context.popNamingContext(); - return definitionBuilder.build(); } /** @@ -196,19 +206,23 @@ protected JsonValue writeNamedSchema(AvroNamedSchema schema, AvscWriterContext c * @param schema a schema to write a reference to * @param context avsc generation context */ - protected JsonValue writeSchemaRef(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config) { + protected void writeSchemaRef(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config, + JsonGenerator generator) throws IOException { + + // Emit fullname always if configured to do so. if (config.isAlwaysEmitNamespace()) { - //emit fullname always - return Json.createValue(schema.getFullName()); + generator.writeString(schema.getFullName()); } - //figure out what the context namespace is - String contextNamespace = config.isUsePreAvro702Logic() ? - context.getAvro702ContextNamespace() : context.getCorrectContextNamespace(); + + // Figure out what the context namespace is + String contextNamespace = + config.isUsePreAvro702Logic() ? context.getAvro702ContextNamespace() : context.getCorrectContextNamespace(); String qualified = schema.getName().qualified(contextNamespace); - return Json.createValue(qualified); + generator.writeString(qualified); } - protected AvroName emitSchemaName(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config, JsonObjectBuilder output) { + protected AvroName emitSchemaName(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config, + JsonGenerator generator) throws IOException { //before we get to actually writing anything we need to do some accounting of what horrible old avro would do for 702 @@ -249,17 +263,17 @@ protected AvroName emitSchemaName(AvroNamedSchema schema, AvscWriterContext cont if (config.isEmitNamespacesSeparately() || schemaName.getNamespace().isEmpty()) { //there's no way to build a fullname for something in the empty namespace //so for those we always need to emit an empty namespace prop. - output.add("namespace", schemaName.getNamespace()); - output.add("name", schemaName.getSimpleName()); + generator.writeStringField("namespace", schemaName.getNamespace()); + generator.writeStringField("name", schemaName.getSimpleName()); } else { - output.add("name", schemaName.getFullname()); + generator.writeStringField("name", schemaName.getFullname()); } } else { boolean emitNS = config.isUsePreAvro702Logic() ? shouldEmitNSPre702 : shouldEmitNSNormally; if (emitNS) { - output.add("namespace", schemaName.getNamespace()); + generator.writeStringField("namespace", schemaName.getNamespace()); } - output.add("name", schemaName.getSimpleName()); + generator.writeStringField("name", schemaName.getSimpleName()); } context.pushNamingContext(schema, contextNamespaceAfter, contextNamespaceAfter702); @@ -267,81 +281,87 @@ protected AvroName emitSchemaName(AvroNamedSchema schema, AvscWriterContext cont return extraAlias; } - protected void emitSchemaAliases( - AvroNamedSchema schema, - AvscWriterContext context, - AvscWriterConfig config, - AvroName extraAlias, - JsonObjectBuilder output - ) { + protected void emitSchemaAliases(AvroNamedSchema schema, AvscWriterContext context, AvscWriterConfig config, + AvroName extraAlias, JsonGenerator generator) throws IOException { List aliases = schema.getAliases(); int numAliases = (extraAlias != null ? 1 : 0) + (aliases != null ? aliases.size() : 0); if (numAliases == 0) { return; } - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); + + generator.writeFieldName("aliases"); + generator.writeStartArray(); if (aliases != null) { for (AvroName alias : aliases) { - arrayBuilder.add(alias.getFullname()); + generator.writeString(alias.getFullname()); } } if (extraAlias != null) { - arrayBuilder.add(extraAlias.getFullname()); + generator.writeString(extraAlias.getFullname()); } - output.add("aliases", arrayBuilder); + generator.writeEndArray(); } - protected void emitJsonProperties( - JsonPropertiesContainer fieldOrSchema, - AvscWriterContext context, - AvscWriterConfig config, - JsonObjectBuilder output - ) { + protected void emitJsonProperties(JsonPropertiesContainer fieldOrSchema, AvscWriterContext context, + AvscWriterConfig config, JsonGenerator generator) throws IOException { Set propNames = fieldOrSchema.propertyNames(); if (propNames == null || propNames.isEmpty()) { return; } + for (String propName : propNames) { - String json = fieldOrSchema.getPropertyAsJsonLiteral(propName); - JsonReader reader = Json.createReader(new StringReader(json)); - JsonValue propValue = reader.readValue(); - output.add(propName, propValue); + generator.writeFieldName(propName); + generator.writeRawValue(fieldOrSchema.getPropertyAsJsonLiteral(propName)); } } - protected void emitRecordFields(AvroRecordSchema schema, AvscWriterContext context, AvscWriterConfig config, JsonObjectBuilder output) { - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); - List fields = schema.getFields(); - for (AvroSchemaField field : fields) { - JsonObjectBuilder fieldBuilder = Json.createObjectBuilder(); - fieldBuilder.add("name", field.getName()); + protected void emitRecordFields(AvroRecordSchema schema, AvscWriterContext context, AvscWriterConfig config, + JsonGenerator generator) throws IOException { + generator.writeFieldName("fields"); + generator.writeStartArray(); + for (AvroSchemaField field : schema.getFields()) { + generator.writeStartObject(); + + // Field name. + generator.writeStringField("name", field.getName()); + + // Field doc. if (field.hasDoc()) { - fieldBuilder.add("doc", field.getDoc()); + generator.writeStringField("doc", field.getDoc()); } - AvroSchema fieldSchema = field.getSchema(); - fieldBuilder.add("type", writeSchema(fieldSchema, context, config)); - emitJsonProperties(field.getAllProps(), context, config, fieldBuilder); + // Field type. + generator.writeFieldName("type"); + writeSchema(field.getSchema(), context, config, generator); + + // Field properties. + emitJsonProperties(field.getAllProps(), context, config, generator); + // Default value. if (field.hasDefaultValue()) { AvroLiteral defaultValue = field.getDefaultValue(); - JsonValue defaultValueLiteral = writeDefaultValue(fieldSchema, defaultValue, field); - fieldBuilder.add("default", defaultValueLiteral); + generator.writeFieldName("default"); + writeDefaultValue(field.getSchema(), defaultValue, field, generator); } - //TODO - order + + // Aliases + // TODO - order if (field.aliases() != null) { - JsonArrayBuilder jsonArrayBuilder = Json.createArrayBuilder(); - for(String alias : field.aliases()) { - jsonArrayBuilder.add(alias); + generator.writeFieldName("aliases"); + generator.writeStartArray(); + for (String alias : field.aliases()) { + generator.writeString(alias); } - fieldBuilder.add("aliases", jsonArrayBuilder.build()); + generator.writeEndArray(); } - arrayBuilder.add(fieldBuilder); + + generator.writeEndObject(); } - output.add("fields", arrayBuilder); + generator.writeEndArray(); } - protected JsonValue writeDefaultValue(AvroSchema schemaForLiteral, AvroLiteral literal, AvroSchemaField field) { + protected void writeDefaultValue(AvroSchema schemaForLiteral, AvroLiteral literal, AvroSchemaField field, + JsonGenerator generator) throws IOException { AvroType type = schemaForLiteral.type(); String temp; AvroSchema valueSchema; @@ -364,77 +384,88 @@ protected JsonValue writeDefaultValue(AvroSchema schemaForLiteral, AvroLiteral l case NULL: //noinspection unused (kept as a sanity check) AvroNullLiteral nullLiteral = (AvroNullLiteral) literal; - return JsonValue.NULL; + generator.writeNull(); + break; case BOOLEAN: AvroBooleanLiteral boolLiteral = (AvroBooleanLiteral) literal; - return boolLiteral.getValue() ? JsonValue.TRUE : JsonValue.FALSE; + generator.writeBoolean(boolLiteral.getValue()); + break; case INT: AvroIntegerLiteral intLiteral = (AvroIntegerLiteral) literal; - return Json.createValue(intLiteral.getValue()); + generator.writeNumber(intLiteral.getValue()); + break; case LONG: AvroLongLiteral longLiteral = (AvroLongLiteral) literal; - return Json.createValue(longLiteral.getValue()); + generator.writeNumber(longLiteral.getValue()); + break; case FLOAT: AvroFloatLiteral floatLiteral = (AvroFloatLiteral) literal; - return Json.createValue(floatLiteral.getValue()); + generator.writeNumber(floatLiteral.getValue()); + break; case DOUBLE: AvroDoubleLiteral doubleLiteral = (AvroDoubleLiteral) literal; - return Json.createValue(doubleLiteral.getValue()); + generator.writeNumber(doubleLiteral.getValue()); + break; case STRING: AvroStringLiteral stringLiteral = (AvroStringLiteral) literal; - return Json.createValue(stringLiteral.getValue()); + generator.writeString(stringLiteral.getValue()); + break; case BYTES: AvroBytesLiteral bytesLiteral = (AvroBytesLiteral) literal; //spec says "values for bytes and fixed fields are JSON strings, where Unicode code points //0-255 are mapped to unsigned 8-bit byte values 0-255", and this is how its done - temp = new String(bytesLiteral.getValue(), StandardCharsets.ISO_8859_1); - return Json.createValue(temp); + generator.writeString(new String(bytesLiteral.getValue(), StandardCharsets.ISO_8859_1)); + break; case ENUM: AvroEnumLiteral enumLiteral = (AvroEnumLiteral) literal; - return Json.createValue(enumLiteral.getValue()); + generator.writeString(enumLiteral.getValue()); + break; case FIXED: AvroFixedLiteral fixedLiteral = (AvroFixedLiteral) literal; //spec says "values for bytes and fixed fields are JSON strings, where Unicode code points //0-255 are mapped to unsigned 8-bit byte values 0-255", and this is how its done - temp = new String(fixedLiteral.getValue(), StandardCharsets.ISO_8859_1); - return Json.createValue(temp); + generator.writeString(new String(fixedLiteral.getValue(), StandardCharsets.ISO_8859_1)); + break; case ARRAY: AvroArrayLiteral arrayLiteral = (AvroArrayLiteral) literal; List array = arrayLiteral.getValue(); AvroArraySchema arraySchema = (AvroArraySchema) arrayLiteral.getSchema(); valueSchema = arraySchema.getValueSchema(); - JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); - for (AvroLiteral element : array) { - JsonValue serializedElement = writeDefaultValue(valueSchema, element, field); - arrayBuilder.add(serializedElement); + generator.writeStartArray(); + for (AvroLiteral element : array) { + writeDefaultValue(valueSchema, element, field, generator); } - return arrayBuilder.build(); + generator.writeEndArray(); + break; case MAP: AvroMapLiteral mapLiteral = (AvroMapLiteral) literal; Map map = mapLiteral.getValue(); AvroMapSchema mapSchema = (AvroMapSchema) mapLiteral.getSchema(); valueSchema = mapSchema.getValueSchema(); - JsonObjectBuilder objectBuilder = Json.createObjectBuilder(); + generator.writeStartObject(); for (Map.Entry entry : map.entrySet()) { - JsonValue serializedValue = writeDefaultValue(valueSchema, entry.getValue(), field); - objectBuilder.add(entry.getKey(), serializedValue); + generator.writeFieldName(entry.getKey()); + writeDefaultValue(valueSchema, entry.getValue(), field, generator); } - return objectBuilder.build(); + generator.writeEndObject(); + break; case UNION: //default values for unions must be of the 1st type in the union AvroUnionSchema unionSchema = (AvroUnionSchema) schemaForLiteral; AvroSchema firstBranchSchema = unionSchema.getTypes().get(0).getSchema(); - return writeDefaultValue(firstBranchSchema, literal, field); + writeDefaultValue(firstBranchSchema, literal, field, generator); + break; case RECORD: AvroRecordSchema recordSchema = (AvroRecordSchema) schemaForLiteral; - JsonObjectBuilder recordObjectBuilder = Json.createObjectBuilder(); + generator.writeStartObject(); Map recordLiteralMap = ((AvroRecordLiteral) literal).getValue(); for (AvroSchemaField innerField : recordSchema.getFields()) { - recordObjectBuilder.add(innerField.getName(), - writeDefaultValue(innerField.getSchema(), recordLiteralMap.get(innerField.getName()), field)); + generator.writeFieldName(innerField.getName()); + writeDefaultValue(innerField.getSchema(), recordLiteralMap.get(innerField.getName()), field, generator); } - return recordObjectBuilder.build(); + generator.writeEndObject(); + break; default: throw new UnsupportedOperationException("writing default values for " + type + " not implemented yet"); }