Skip to content

Commit

Permalink
feat(docs): add full examples for kafka tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
piyushmantri committed Aug 25, 2024
1 parent 14c1dfd commit 0785595
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 49 deletions.
62 changes: 38 additions & 24 deletions src/main/java/io/kestra/plugin/kafka/Consume.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,33 +53,47 @@
@Plugin(
examples = {
@Example(
code = {
"topic: test_kestra",
"properties:",
" bootstrap.servers: localhost:9092",
"serdeProperties:",
" schema.registry.url: http://localhost:8085",
"keyDeserializer: STRING",
"valueDeserializer: AVRO",
}
full = true,
code = """
id: kafka_consume
namespace: company.name
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
"""
),
@Example(
title = "Connect to a Kafka cluster with SSL.",
code = {
"properties:",
" security.protocol: SSL",
" bootstrap.servers: localhost:19092",
" ssl.key.password: my-ssl-password",
" ssl.keystore.type: PKCS12",
" ssl.keystore.location: my-base64-encoded-keystore",
" ssl.keystore.password: my-ssl-password",
" ssl.truststore.location: my-base64-encoded-truststore",
" ssl.truststore.password: my-ssl-password",
"topic:",
"- kestra_workerinstance",
"keyDeserializer: STRING",
"valueDeserializer: STRING"
}
full = true,
code = """
id: kafka_consume
namespace: company.name
tasks:
- id: consume
type: io.kestra.plugin.kafka.Consume
properties:
security.protocol: SSL
bootstrap.servers: localhost:19092
ssl.key.password: my-ssl-password
ssl.keystore.type: PKCS12
ssl.keystore.location: my-base64-encoded-keystore
ssl.keystore.password: my-ssl-password
ssl.truststore.location: my-base64-encoded-truststore
ssl.truststore.password: my-ssl-password
topic:
- kestra_workerinstance
keyDeserializer: STRING
valueDeserializer: STRING
"""
)
}
)
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/io/kestra/plugin/kafka/RealtimeTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,21 +50,21 @@
namespace: company.team
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
groupId: kafkaConsumerGroupId"""
- id: realtime_trigger
type: io.kestra.plugin.kafka.RealtimeTrigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
groupId: kafkaConsumerGroupId"""
)
}
)
Expand Down
36 changes: 24 additions & 12 deletions src/main/java/io/kestra/plugin/kafka/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,30 @@
@Plugin(
examples = {
@Example(
code = {
"topic: test_kestra",
"properties:",
" bootstrap.servers: localhost:9092",
"serdeProperties:",
" schema.registry.url: http://localhost:8085",
" keyDeserializer: STRING",
" valueDeserializer: AVRO",
"interval: PT30S",
"maxRecords: 5",
"groupId: kafkaConsumerGroupId",
}
full = true,
code = """
id: kafka_trigger
namespace: company.name
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: trigger
type: io.kestra.plugin.kafka.Trigger
topic: test_kestra
properties:
bootstrap.servers: localhost:9092
serdeProperties:
schema.registry.url: http://localhost:8085
keyDeserializer: STRING
valueDeserializer: AVRO
interval: PT30S
maxRecords: 5
groupId: kafkaConsumerGroupId
"""
)
}
)
Expand Down

0 comments on commit 0785595

Please sign in to comment.