From 30ec56cd38cfc42c95abdc76e89139f7994f615c Mon Sep 17 00:00:00 2001 From: Shaik Zakir Hussain Date: Fri, 18 Sep 2020 20:17:57 +0530 Subject: [PATCH 1/2] KAFKA-10477: Enabling the same behavior of NULL JsonNodeType to MISSING JsonNodeType in JsonConverter. From 2.10.0 onwards, in jackson lib, ObjectMapper.readTree(input) started to return JsonNode of type MISSING for empty input, as mentioned in the issue: https://github.com/FasterXML/jackson-databind/issues/2211. Made changes in JsonParser to incorporate this, and treat MISSING JsonNodeType in a similar fashion as that of NULL JsonNodeType. Added a unit test for this. --- .../apache/kafka/connect/json/JsonConverter.java | 2 +- .../kafka/connect/json/JsonConverterTest.java | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java index 8a2d6768cc96f..1a175381e8f11 100644 --- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java +++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java @@ -729,6 +729,7 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { } else { switch (jsonValue.getNodeType()) { case NULL: + case MISSING: // Special case. With no schema return null; case BOOLEAN: @@ -751,7 +752,6 @@ private static Object convertToConnect(Schema schema, JsonNode jsonValue) { break; case BINARY: - case MISSING: case POJO: default: schemaType = null; diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 2e189e2d584ae..27cacb562b3cd 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -195,6 +195,20 @@ public void nullToConnect() { assertEquals(SchemaAndValue.NULL, converted); } + /** + * When schemas are disabled, empty data should be decoded to an empty envelope. + * This test verifies the case where `schemas.enable` configuration is set to false, and + * {@link JsonConverter} converts empty bytes to {@link SchemaAndValue#NULL}. + */ + @Test + public void emptyBytesToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + SchemaAndValue converted = converter.toConnectData(TOPIC, "".getBytes()); + assertEquals(SchemaAndValue.NULL, converted); + } + @Test public void nullSchemaPrimitiveToConnect() { SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes()); From 3f0666fd4795cfeb01b3d1b72f272d052b0d54b3 Mon Sep 17 00:00:00 2001 From: Randall Hauch Date: Thu, 1 Oct 2020 17:04:56 -0500 Subject: [PATCH 2/2] KAFKA-10477: Add another test to check for null within schemaless input --- .../kafka/connect/json/JsonConverterTest.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java index 27cacb562b3cd..a1ac71d4c8d57 100644 --- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java +++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java @@ -209,6 +209,22 @@ public void emptyBytesToConnect() { assertEquals(SchemaAndValue.NULL, converted); } + /** + * When schemas are disabled, fields are mapped to Connect maps. + */ + @Test + public void schemalessWithEmptyFieldValueToConnect() { + // This characterizes the messages with empty data when Json schemas is disabled + Map props = Collections.singletonMap("schemas.enable", false); + converter.configure(props, true); + String input = "{ \"a\": \"\", \"b\": null}"; + SchemaAndValue converted = converter.toConnectData(TOPIC, input.getBytes()); + Map expected = new HashMap<>(); + expected.put("a", ""); + expected.put("b", null); + assertEquals(new SchemaAndValue(null, expected), converted); + } + @Test public void nullSchemaPrimitiveToConnect() { SchemaAndValue converted = converter.toConnectData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes());