Skip to content

Commit

Permalink
fix: handle numeric types when generating Avro values (#96)
Browse files Browse the repository at this point in the history
* fix: handle numeric types when generating Avro values

* fix: add support for logical types
  • Loading branch information
yvrng authored Oct 15, 2024
1 parent 14698f3 commit f78569e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ private static Object buildValue(Schema schema, Object data) {
case ARRAY -> buildArrayValue(schema, (Collection<?>) data);
case ENUM -> buildEnumValue(schema, (String) data);
case FIXED -> buildFixedValue(schema, (byte[]) data);
case STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN, NULL -> data;
case INT -> data instanceof Number number ? number.intValue() : data;
case LONG -> data instanceof Number number ? number.longValue() : data;
case FLOAT -> data instanceof Number number ? number.floatValue() : data;
case DOUBLE -> data instanceof Number number ? number.doubleValue() : data;
case STRING, BYTES, BOOLEAN, NULL -> data;
};
}

Expand Down
66 changes: 66 additions & 0 deletions src/test/java/io/kestra/plugin/kafka/KafkaTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,72 @@ void produceComplexAvro() throws Exception {
assertThat(reproduceRunOutput.getMessagesCount(), is(1));
}

@Test
void produceAvro_withIntegerAsLong() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
String topic = "tu_" + IdUtils.create();

Map<String, Object> value = Map.of("number", 42);

Produce task = Produce.builder()
.properties(Map.of("bootstrap.servers", this.bootstrap))
.serdeProperties(Map.of("schema.registry.url", this.registry))
.keySerializer(SerdeType.STRING)
.valueSerializer(SerdeType.AVRO)
.topic(topic)
.valueAvroSchema("""
{
"type": "record",
"name": "Sample",
"namespace": "io.kestra.examples",
"fields": [
{
"name": "number",
"type": "long"
}
]
}
""")
.from(Map.of("value", value))
.build();

Produce.Output output = task.run(runContext);
assertThat(output.getMessagesCount(), is(1));
}

@Test
void produceAvro_withDoubleAsFloat() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
String topic = "tu_" + IdUtils.create();

Map<String, Object> value = Map.of("number", 42.0d);

Produce task = Produce.builder()
.properties(Map.of("bootstrap.servers", this.bootstrap))
.serdeProperties(Map.of("schema.registry.url", this.registry))
.keySerializer(SerdeType.STRING)
.valueSerializer(SerdeType.AVRO)
.topic(topic)
.valueAvroSchema("""
{
"type": "record",
"name": "Sample",
"namespace": "io.kestra.examples",
"fields": [
{
"name": "number",
"type": "float"
}
]
}
""")
.from(Map.of("value", value))
.build();

Produce.Output output = task.run(runContext);
assertThat(output.getMessagesCount(), is(1));
}

@Test
void produceAvro_withUnion_andRecord() throws Exception {
RunContext runContext = runContextFactory.of(ImmutableMap.of());
Expand Down

0 comments on commit f78569e

Please sign in to comment.