From 001012420bb797e29621ab819223bb57acf60462 Mon Sep 17 00:00:00 2001 From: Sanchit Garg Date: Thu, 4 May 2023 18:09:54 +0530 Subject: [PATCH] PLUGIN-1609: Add null check for key and fail the pipeline if message key field value is null --- .../plugin/kafka/sink/KafkaBatchSink.java | 8 +- .../KafkaSinkAndAlertsPublisherTest.java | 80 ++++++++++++++++++- 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/kafka-plugins-client/src/main/java/io/cdap/plugin/kafka/sink/KafkaBatchSink.java b/kafka-plugins-client/src/main/java/io/cdap/plugin/kafka/sink/KafkaBatchSink.java index a462c27..1ad1ad6 100644 --- a/kafka-plugins-client/src/main/java/io/cdap/plugin/kafka/sink/KafkaBatchSink.java +++ b/kafka-plugins-client/src/main/java/io/cdap/plugin/kafka/sink/KafkaBatchSink.java @@ -118,7 +118,13 @@ public void transform(StructuredRecord input, Emitter> emit emitter.emit(new KeyValue<>((Text) null, new Text(body))); } else { String key = input.get(producerConfig.key); - emitter.emit(new KeyValue<>(new Text(key), new Text(body))); + // The key inside the message may be null if it is not specified while publishing the event. + // https://stackoverflow.com/questions/29511521 + if (key != null) { + emitter.emit(new KeyValue<>(new Text(key), new Text(body))); + } else { + throw new RuntimeException("Message Key field value in the record is null"); + } } } diff --git a/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/KafkaSinkAndAlertsPublisherTest.java b/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/KafkaSinkAndAlertsPublisherTest.java index 52f7753..925d6b4 100644 --- a/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/KafkaSinkAndAlertsPublisherTest.java +++ b/kafka-plugins-client/src/test/java/io/cdap/plugin/kafka/KafkaSinkAndAlertsPublisherTest.java @@ -86,6 +86,7 @@ public class KafkaSinkAndAlertsPublisherTest extends HydratorTestBase { @Before public void setupTestClass() throws Exception { + clear(); ArtifactId parentArtifact = NamespaceId.DEFAULT.artifact(APP_ARTIFACT.getName(), APP_ARTIFACT.getVersion()); // add the data-pipeline artifact and mock plugins @@ -179,8 +180,7 @@ public void testKafkaSinkAndAlertsPublisher() throws Exception { // create the pipeline ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink"); ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig)); - - + Set expected = ImmutableSet.of("100,samuel,jackson", "200,dwayne,johnson", "300,christopher,walken", @@ -228,7 +228,81 @@ public Alert apply(String s) { return GSON.fromJson(s, Alert.class); } } - ))); + ))); + } + + @Test + public void testKafkaSinkAndAlertsPublisherWithNullKey() throws Exception { + Schema schema = Schema.recordOf( + "user", + Schema.Field.of("id", Schema.nullableOf(Schema.of(Schema.Type.LONG))), + Schema.Field.of("first", Schema.of(Schema.Type.STRING)), + Schema.Field.of("last", Schema.of(Schema.Type.NULL))); + + // create the pipeline config + String inputName = "sinkTestInput"; + + String usersTopic = "records"; + Map sinkProperties = new HashMap<>(); + sinkProperties.put("brokers", "localhost:" + kafkaPort); + sinkProperties.put("referenceName", "kafkaTest"); + sinkProperties.put("topic", usersTopic); + sinkProperties.put("schema", schema.toString()); + sinkProperties.put("format", "csv"); + sinkProperties.put("key", "last"); + sinkProperties.put("async", "FALSE"); + sinkProperties.put("compressionType", "none"); + + ETLStage source = new ETLStage("source", MockSource.getPlugin(inputName)); + ETLStage sink = + new ETLStage("sink", new ETLPlugin("Kafka", KafkaBatchSink.PLUGIN_TYPE, sinkProperties, null)); + + ETLBatchConfig pipelineConfig = ETLBatchConfig.builder("* * * * *") + .addStage(source) + .addStage(sink) + .addConnection(source.getName(), sink.getName()) + .build(); + + // create the pipeline + ApplicationId pipelineId = NamespaceId.DEFAULT.app("testKafkaSink"); + ApplicationManager appManager = deployApplication(pipelineId, new AppRequest<>(APP_ARTIFACT, pipelineConfig)); + + Set expected = ImmutableSet.of("100,samuel,jackson", + "200,dwayne,johnson", + "300,christopher,walken", + "400,donald,trump"); + + List records = new ArrayList<>(); + for (String e : expected) { + String[] splits = e.split(","); + StructuredRecord record = + StructuredRecord.builder(schema) + .set("id", Long.parseLong(splits[0])) + .set("first", splits[1]) + .set("last", splits[2]) + .build(); + records.add(record); + } + + // Add a record with null key + StructuredRecord recordWithNullKey = + StructuredRecord.builder(schema) + .set("id", 500L) + .set("first", "terry") + .set("last", null) + .build(); + records.add(recordWithNullKey); + + DataSetManager sourceTable = getDataset(inputName); + MockSource.writeInput(sourceTable, records); + + WorkflowManager workflowManager = appManager.getWorkflowManager(SmartWorkflow.NAME); + try { + workflowManager.start(); + workflowManager.waitForRun(ProgramRunStatus.COMPLETED, 1, TimeUnit.MINUTES); + } catch (Exception e) { + Assert.assertTrue(workflowManager.getHistory(ProgramRunStatus.FAILED).size() == 1); + } } private Set readKafkaRecords(String topic, final int maxMessages) throws InterruptedException {