Skip to content

Commit

Permalink
Merge pull request #36 from shrutimantri/doc_correction_1
Browse files Browse the repository at this point in the history
fix(docs): fix the docs on this plugin
  • Loading branch information
anna-geller authored Jan 25, 2024
2 parents acc8b1a + 9838959 commit c0e0dbe
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,22 @@ public abstract class AbstractPulsarConnection extends Task implements PulsarCon
@Value
public static class TlsOptions {
@Schema(
title = "The client certificate",
description = "Must be a pem file as base64."
title = "The client certificate.",
description = "Must be a base64-encoded pem file."

)
String cert;

@Schema(
title = "The key certificate",
description = "Must be a pem file as base64."
title = "The key certificate.",
description = "Must be a base64-encoded pem file."

)
String key;

@Schema(
title = "The ca certificate",
description = "Must be a pem file as base64."
title = "The ca certificate.",
description = "Must be a base64-encoded pem file."

)
String ca;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/pulsar/AbstractReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ List<String> topics(RunContext runContext) throws IllegalVariableEvaluationExcep
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@Schema(
title = "Number of message consumed"
title = "Number of messages consumed."
)
private final Integer messagesCount;

@Schema(
title = "URI of a kestra internal storage file"
title = "URI of a Kestra internal storage file containing the consumed messages."
)
private URI uri;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/pulsar/Consume.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Consume messages from Pulsar topic(s)"
title = "Consume messages from Pulsar topic(s)."
)
@Plugin(
examples = {
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/io/kestra/plugin/pulsar/Produce.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
@Getter
@NoArgsConstructor
@io.swagger.v3.oas.annotations.media.Schema(
title = "Produce message in a Pulsar topic"
title = "Produce message to a Pulsar topic."
)
@Plugin(
examples = {
@Example(
title = "Read a csv, transform it to right format & produce it to Pulsar",
title = "Read a CSV file, transform it to the right format, and publish it to Pulsar topic.",
full = true,
code = {
"id: produce",
Expand Down Expand Up @@ -82,23 +82,24 @@
)
public class Produce extends AbstractPulsarConnection implements RunnableTask<Produce.Output> {
@io.swagger.v3.oas.annotations.media.Schema(
title = "Pulsar topic where to send message"
title = "Pulsar topic to send a message to."
)
@NotNull
@PluginProperty(dynamic = true)
private String topic;

@io.swagger.v3.oas.annotations.media.Schema(
title = "Source of message send",
description = "Can be an internal storage uri, a map or a list." +
"with the following format: key, value, partition, timestamp, headers"
title = "Source of the sent message.",
description = "Can be a Kestra internal storage URI, a map or a list " +
"in the following format: `key`, `value`, `eventTime`, `properties`, " +
"`deliverAt`, `deliverAfter` and `sequenceId`."
)
@NotNull
@PluginProperty(dynamic = true)
private Object from;

@io.swagger.v3.oas.annotations.media.Schema(
title = "Serializer used for the value"
title = "Serializer used for the value."
)
@NotNull
@PluginProperty(dynamic = true)
Expand All @@ -118,11 +119,11 @@ public class Produce extends AbstractPulsarConnection implements RunnableTask<Pr
private Map<String, String> producerProperties;

@io.swagger.v3.oas.annotations.media.Schema(
title = "Configure the type of access mode that the producer requires on the topic",
title = "Configure the type of access mode that the producer requires on the topic.",
description = "Possible values are:\n" +
"* `Shared`: By default multiple producers can publish on a topic\n" +
"* `Shared`: By default, multiple producers can publish to a topic.\n" +
"* `Exclusive`: Require exclusive access for producer. Fail immediately if there's already a producer connected.\n" +
"* `WaitForExclusive`: Producer creation is pending until it can acquire exclusive access"
"* `WaitForExclusive`: Producer creation is pending until it can acquire exclusive access."
)
@PluginProperty
private ProducerAccessMode accessMode;
Expand All @@ -136,9 +137,9 @@ public class Produce extends AbstractPulsarConnection implements RunnableTask<Pr
@io.swagger.v3.oas.annotations.media.Schema(
title = "Set the compression type for the producer.",
description = "By default, message payloads are not compressed. Supported compression types are:\n" +
"* `NONE`: No compression (Default)\n" +
"* `LZ4`: Compress with LZ4 algorithm. Faster but lower compression than ZLib\n" +
"* `ZLIB`: Standard ZLib compression\n" +
"* `NONE`: No compression (Default).\n" +
"* `LZ4`: Compress with LZ4 algorithm. Faster but lower compression than ZLib.\n" +
"* `ZLIB`: Standard ZLib compression.\n" +
"* `ZSTD` Compress with Zstandard codec. Since Pulsar 2.3.\n" +
"* `SNAPPY` Compress with Snappy codec. Since Pulsar 2.4."
)
Expand Down Expand Up @@ -301,7 +302,7 @@ private Long processTimestamp(Object timestamp) {
@Getter
public static class Output implements io.kestra.core.models.tasks.Output {
@io.swagger.v3.oas.annotations.media.Schema(
title = "Number of message produced"
title = "Number of messages produced."
)
private final Integer messagesCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,26 @@

public interface PulsarConnectionInterface {
@Schema(
title = "Connection URLs",
description = "You need to specify a Pulsar protocol URL\n" +
title = "Connection URLs.",
description = "You need to specify a Pulsar protocol URL.\n" +
"- Example of localhost: `pulsar://localhost:6650`\n" +
"- If you have multiple brokers: `pulsar://localhost:6550,localhost:6651,localhost:6652`\n" +
"- If you use TLS authentication: `pulsar+ssl://pulsar.us-west.example.com:6651`\n" +
"- `ssl.truststore.location` "

"- If you have multiple brokers: `pulsar://localhost:6650,localhost:6651,localhost:6652`\n" +
"- If you use TLS authentication: `pulsar+ssl://pulsar.us-west.example.com:6651`"
)
@PluginProperty(dynamic = true)
@NotNull
String getUri();

@Schema(
title = "Authentication Token",
description = "Authentication Token that can be necessary with some providers such as Clever Cloud!"

title = "Authentication token.",
description = "Authentication token that can be required by some providers such as Clever Cloud."
)
@PluginProperty(dynamic = true)
String getAuthenticationToken();

@Schema(
title = "TLS Authentication",
title = "TLS authentication options.",
description = "You need to use \"pulsar+ssl://\" in serviceUrl to enable TLS support."

)
@PluginProperty(dynamic = false)
AbstractPulsarConnection.TlsOptions getTlsOptions();
Expand Down
18 changes: 9 additions & 9 deletions src/main/java/io/kestra/plugin/pulsar/ReadInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,38 +8,38 @@

public interface ReadInterface {
@Schema(
title = "Pulsar topic(s) where to consume message",
description = "Can be a string or a List of string to consume from multiple topic"
title = "Pulsar topic(s) where to consume messages from.",
description = "Can be a string or a list of strings to consume from multiple topics."
)
@NotNull
@PluginProperty(dynamic = true)
Object getTopic();

@Schema(
title = "Deserializer used for the value"
title = "Deserializer used for the value."
)
@NotNull
@PluginProperty(dynamic = true)
SerdeType getDeserializer();

@Schema(
title = "Duration waiting for record to be polled",
description = "If no records are available, the max wait to wait for a new records. "
title = "Duration waiting for record to be polled.",
description = "If no records are available, the maximum wait to wait for a new record. "
)
@NotNull
@PluginProperty(dynamic = true)
Duration getPollDuration();

@Schema(
title = "The max number of rows to fetch before stopping",
description = "It's not an hard limit and is evaluated every second"
title = "The maximum number of records to fetch before stopping.",
description = "It's not a hard limit and is evaluated every second."
)
@PluginProperty(dynamic = false)
Integer getMaxRecords();

@Schema(
title = "The max duration waiting for new rows",
description = "It's not an hard limit and is evaluated every second"
title = "The maximum duration waiting for new record.",
description = "It's not a hard limit and is evaluated every second."
)
@PluginProperty(dynamic = false)
Duration getMaxDuration();
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/pulsar/Reader.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Read messages from Pulsar topic(s) without subscription"
title = "Read messages from Pulsar topic(s) without subscription."
)
@Plugin(
examples = {
Expand All @@ -43,16 +43,16 @@ public class Reader extends AbstractReader {
@Schema(
title = "The initial reader positioning can be set at specific timestamp by providing total rollback duration.",
description = "So, broker can find a latest message that was published before given duration. eg: " +
"rollbackDuration in minute = 5 suggests broker to find message which was published 5 mins back and " +
"set the initial position on that messageId."
"`since` set to 5 minutes (`PT5M`) indicates that broker should find message published 5 minutes in the past, " +
"and set the initial position to that messageId."
)
@PluginProperty(dynamic = true)
private Duration since;

@Schema(
title = "Position the reader on a particular message.",
description = "The first message read will be the one immediately *after* the specified message\n" +
"If no `since` or `messageId` are provide, we start at the beginning of the topic."
description = "The first message read will be the one immediately *after* the specified message.\n" +
"If no `since` or `messageId` are provided, we start at the beginning of the topic."
)
@PluginProperty(dynamic = true)
private String messageId;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/pulsar/SerdeType.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.nio.charset.StandardCharsets;

@io.swagger.v3.oas.annotations.media.Schema(
title = "Serializer / Deserializer used for the value"
title = "Serializer / Deserializer used for the value."
)
public enum SerdeType {
STRING,
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/io/kestra/plugin/pulsar/SubscriptionInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@

public interface SubscriptionInterface {
@Schema(
title = "The subscription name",
description = "Using subscription name, we will fetch only records not already consumed"
title = "The subscription name.",
description = "Using subscription name, we will fetch only records that haven't been consumed yet."
)
@PluginProperty(dynamic = true)
@NotNull
String getSubscriptionName();

@io.swagger.v3.oas.annotations.media.Schema(
title = "Add all the properties in the provided map to the consumer."
title = "The position of a subscription to the topic."
)
@PluginProperty(dynamic = false)
@NotNull
SubscriptionInitialPosition getInitialPosition();

@io.swagger.v3.oas.annotations.media.Schema(
title = "Add all the properties in the provided map to the consumer."
title = "The subscription type."
)
@PluginProperty(dynamic = false)
@NotNull
Expand All @@ -38,13 +38,13 @@ public interface SubscriptionInterface {
Map<String, String> getConsumerProperties();

@io.swagger.v3.oas.annotations.media.Schema(
title = "Add public encryption key, used by producer to decrypt the data key."
title = "Add a public encryption key to the producer/consumer."
)
@PluginProperty(dynamic = true)
String getEncryptionKey();

@io.swagger.v3.oas.annotations.media.Schema(
title = "Set the consumer name."
title = "The consumer name."
)
@PluginProperty(dynamic = true)
String getConsumerName();
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/kestra/plugin/pulsar/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
@Getter
@NoArgsConstructor
@Schema(
title = "Wait for messages on Pulsar topics"
title = "Wait for messages from a Pulsar topic."
)
@Plugin(
examples = {
@Example(
code = {
"interval: PT10S",
"topic: tu_trigger",
"interval: PT30S",
"topic: kestra_trigger",
"uri: pulsar://localhost:26650",
"deserializer: JSON",
"subscriptionName: tu_trigger_sub",
"subscriptionName: kestra_trigger_sub",
}
)
}
Expand Down

0 comments on commit c0e0dbe

Please sign in to comment.