Skip to content

Commit

Permalink
feat(docs): add full examples for pulsar tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
piyushmantri committed Sep 1, 2024
1 parent b6e8269 commit f3ad98e
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 52 deletions.
19 changes: 13 additions & 6 deletions src/main/java/io/kestra/plugin/pulsar/Consume.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,19 @@
@Plugin(
examples = {
@Example(
code = {
"uri: pulsar://localhost:26650",
"topic: test_kestra",
"deserializer: JSON",
"subscriptionName: kestra_flow"
}
full = true,
code = """
id: pulsar_consume
namespace: company.name
tasks:
- id: consume
type: io.kestra.plugin.pulsar.Consume
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
subscriptionName: kestra_flow
"""
)
}
)
Expand Down
71 changes: 37 additions & 34 deletions src/main/java/io/kestra/plugin/pulsar/Produce.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,43 @@
@Example(
title = "Read a CSV file, transform it to the right format, and publish it to Pulsar topic.",
full = true,
code = {
"id: produce",
"namespace: company.team",
"inputs:",
" - type: FILE",
" id: file",
"",
"tasks:",
" - id: csv_reader",
" type: io.kestra.plugin.serdes.csv.CsvToIon",
" from: \"{{ inputs.file }}\"",
" - id: file_transform",
" type: io.kestra.plugin.scripts.nashorn.FileTransform",
" from: \"{{ outputs.csv_reader.uri }}\"",
" script: |",
" var result = {",
" \"key\": row.id,",
" \"value\": {",
" \"username\": row.username,",
" \"tweet\": row.tweet",
" },",
" \"eventTime\": row.timestamp,",
" \"properties\": {",
" \"key\": \"value\"",
" }",
" };",
" row = result",
" - id: produce",
" type: io.kestra.plugin.pulsar.Produce",
" from: \"{{ outputs.file_transform.uri }}\"",
" uri: pulsar://localhost:26650",
" serializer: JSON",
" topic: test_kestra",
}
code = """
id: produce
namespace: company.team
inputs:
- type: FILE
id: file
tasks:
- id: csv_reader
type: io.kestra.plugin.serdes.csv.CsvToIon
from: "{{ inputs.file }}"
- id: file_transform
type: io.kestra.plugin.scripts.nashorn.FileTransform
from: {{ outputs.csv_reader.uri }}"
script: |
var result = {
"key": row.id,
"value": {
"username": row.username,
"tweet": row.tweet
},
"eventTime": row.timestamp,
"properties": {
"key": "value"
}
};
row = result
- id: produce
type: io.kestra.plugin.pulsar.Produce
from: "{{ outputs.file_transform.uri }}"
uri: pulsar://localhost:26650
serializer: JSON
topic: test_kestra
"""
)
}
)
Expand Down
17 changes: 12 additions & 5 deletions src/main/java/io/kestra/plugin/pulsar/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,18 @@
@Plugin(
examples = {
@Example(
code = {
"uri: pulsar://localhost:26650",
"topic: test_kestra",
"deserializer: JSON",
}
full = true,
code = """
id: pulsar_reader
namespace: company.name
tasks:
- id: reader
type: io.kestra.plugin.pulsar.Reader
uri: pulsar://localhost:26650
topic: test_kestra
deserializer: JSON
"""
)
}
)
Expand Down
26 changes: 19 additions & 7 deletions src/main/java/io/kestra/plugin/pulsar/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,25 @@
@Plugin(
examples = {
@Example(
code = {
"interval: PT30S",
"topic: kestra_trigger",
"uri: pulsar://localhost:26650",
"deserializer: JSON",
"subscriptionName: kestra_trigger_sub",
}
full = true,
code = """
id: pulsar_trigger
namespace: company.name
tasks:
- id: log
type: io.kestra.plugin.core.log.Log
message: "{{ trigger.value }}"
triggers:
- id: trigger
type: io.kestra.plugin.pulsar.Trigger
interval: PT30S
topic: kestra_trigger
uri: pulsar://localhost:26650
deserializer: JSON
subscriptionName: kestra_trigger_sub
"""
)
}
)
Expand Down

0 comments on commit f3ad98e

Please sign in to comment.