diff --git a/src/main/java/io/kestra/plugin/kafka/Consume.java b/src/main/java/io/kestra/plugin/kafka/Consume.java index 8acbf99..79bf7e4 100644 --- a/src/main/java/io/kestra/plugin/kafka/Consume.java +++ b/src/main/java/io/kestra/plugin/kafka/Consume.java @@ -53,33 +53,47 @@ @Plugin( examples = { @Example( - code = { - "topic: test_kestra", - "properties:", - " bootstrap.servers: localhost:9092", - "serdeProperties:", - " schema.registry.url: http://localhost:8085", - "keyDeserializer: STRING", - "valueDeserializer: AVRO", - } + full = true, + code = """ + id: kafka_consume + namespace: company.name + + tasks: + - id: consume + type: io.kestra.plugin.kafka.Consume + topic: test_kestra + properties: + bootstrap.servers: localhost:9092 + serdeProperties: + schema.registry.url: http://localhost:8085 + keyDeserializer: STRING + valueDeserializer: AVRO + """ ), @Example( title = "Connect to a Kafka cluster with SSL.", - code = { - "properties:", - " security.protocol: SSL", - " bootstrap.servers: localhost:19092", - " ssl.key.password: my-ssl-password", - " ssl.keystore.type: PKCS12", - " ssl.keystore.location: my-base64-encoded-keystore", - " ssl.keystore.password: my-ssl-password", - " ssl.truststore.location: my-base64-encoded-truststore", - " ssl.truststore.password: my-ssl-password", - "topic:", - "- kestra_workerinstance", - "keyDeserializer: STRING", - "valueDeserializer: STRING" - } + full = true, + code = """ + id: kafka_consume + namespace: company.name + + tasks: + - id: consume + type: io.kestra.plugin.kafka.Consume + properties: + security.protocol: SSL + bootstrap.servers: localhost:19092 + ssl.key.password: my-ssl-password + ssl.keystore.type: PKCS12 + ssl.keystore.location: my-base64-encoded-keystore + ssl.keystore.password: my-ssl-password + ssl.truststore.location: my-base64-encoded-truststore + ssl.truststore.password: my-ssl-password + topic: + - kestra_workerinstance + keyDeserializer: STRING + valueDeserializer: STRING + """ ) } ) diff --git a/src/main/java/io/kestra/plugin/kafka/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/kafka/RealtimeTrigger.java index fe83b6a..2133078 100644 --- a/src/main/java/io/kestra/plugin/kafka/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/kafka/RealtimeTrigger.java @@ -50,21 +50,21 @@ namespace: company.team tasks: - - id: log - type: io.kestra.plugin.core.log.Log - message: "{{ trigger.value }}" + - id: log + type: io.kestra.plugin.core.log.Log + message: "{{ trigger.value }}" triggers: - - id: realtime_trigger - type: io.kestra.plugin.kafka.RealtimeTrigger - topic: test_kestra - properties: - bootstrap.servers: localhost:9092 - serdeProperties: - schema.registry.url: http://localhost:8085 - keyDeserializer: STRING - valueDeserializer: AVRO - groupId: kafkaConsumerGroupId""" + - id: realtime_trigger + type: io.kestra.plugin.kafka.RealtimeTrigger + topic: test_kestra + properties: + bootstrap.servers: localhost:9092 + serdeProperties: + schema.registry.url: http://localhost:8085 + keyDeserializer: STRING + valueDeserializer: AVRO + groupId: kafkaConsumerGroupId""" ) } ) diff --git a/src/main/java/io/kestra/plugin/kafka/Trigger.java b/src/main/java/io/kestra/plugin/kafka/Trigger.java index 80c68d0..ecf1e92 100644 --- a/src/main/java/io/kestra/plugin/kafka/Trigger.java +++ b/src/main/java/io/kestra/plugin/kafka/Trigger.java @@ -33,18 +33,30 @@ @Plugin( examples = { @Example( - code = { - "topic: test_kestra", - "properties:", - " bootstrap.servers: localhost:9092", - "serdeProperties:", - " schema.registry.url: http://localhost:8085", - " keyDeserializer: STRING", - " valueDeserializer: AVRO", - "interval: PT30S", - "maxRecords: 5", - "groupId: kafkaConsumerGroupId", - } + full = true, + code = """ + id: kafka_trigger + namespace: company.name + + tasks: + - id: log + type: io.kestra.plugin.core.log.Log + message: "{{ trigger.value }}" + + triggers: + - id: trigger + type: io.kestra.plugin.kafka.Trigger + topic: test_kestra + properties: + bootstrap.servers: localhost:9092 + serdeProperties: + schema.registry.url: http://localhost:8085 + keyDeserializer: STRING + valueDeserializer: AVRO + interval: PT30S + maxRecords: 5 + groupId: kafkaConsumerGroupId + """ ) } )