From b969bbfe88679f8800083850d696fa44cc6ea7b5 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Sun, 3 Nov 2024 21:25:09 -0800 Subject: [PATCH] Add receiverQueueSize to pulsar (#3589) Signed-off-by: yaron2 --- pubsub/pulsar/metadata.go | 1 + pubsub/pulsar/metadata.yaml | 9 ++++++++- pubsub/pulsar/pulsar.go | 4 ++++ 3 files changed, 13 insertions(+), 1 deletion(-) diff --git a/pubsub/pulsar/metadata.go b/pubsub/pulsar/metadata.go index 3533c6de37..62b3b06bbc 100644 --- a/pubsub/pulsar/metadata.go +++ b/pubsub/pulsar/metadata.go @@ -36,6 +36,7 @@ type pulsarMetadata struct { PrivateKey string `mapstructure:"privateKey"` Keys string `mapstructure:"keys"` MaxConcurrentHandlers uint `mapstructure:"maxConcurrentHandlers"` + ReceiverQueueSize int `mapstructure:"receiverQueueSize"` Token string `mapstructure:"token"` oauth2.ClientCredentialsMetadata `mapstructure:",squash"` diff --git a/pubsub/pulsar/metadata.yaml b/pubsub/pulsar/metadata.yaml index e81d6f75f8..7cc216cf12 100644 --- a/pubsub/pulsar/metadata.yaml +++ b/pubsub/pulsar/metadata.yaml @@ -176,4 +176,11 @@ metadata: description: | Sets the maximum number of concurrent messages sent to the application. Default is 100. default: '"100"' - example: '"100"' \ No newline at end of file + example: '"100"' + - name: receiverQueueSize + type: number + description: | + Sets the size of the consumer receive queue. + Controls how many messages can be accumulated by the consumer before it is explicitly called to read messages by Dapr. + default: '"1000"' + example: '"1000"' \ No newline at end of file diff --git a/pubsub/pulsar/pulsar.go b/pubsub/pulsar/pulsar.go index 6e4fb94d87..7822d63f5e 100644 --- a/pubsub/pulsar/pulsar.go +++ b/pubsub/pulsar/pulsar.go @@ -80,6 +80,8 @@ const ( defaultRedeliveryDelay = 30 * time.Second // defaultConcurrency controls the number of concurrent messages sent to the app. defaultConcurrency = 100 + // defaultReceiverQueueSize controls the number of messages the pulsar sdk pulls before dapr explicitly consumes the messages. + defaultReceiverQueueSize = 1000 subscribeTypeKey = "subscribeType" @@ -125,6 +127,7 @@ func parsePulsarMetadata(meta pubsub.Metadata) (*pulsarMetadata, error) { BatchingMaxSize: defaultMaxBatchSize, RedeliveryDelay: defaultRedeliveryDelay, MaxConcurrentHandlers: defaultConcurrency, + ReceiverQueueSize: defaultReceiverQueueSize, } if err := kitmd.DecodeMetadata(meta.Properties, &m); err != nil { @@ -403,6 +406,7 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han Type: getSubscribeType(req.Metadata), MessageChannel: channel, NackRedeliveryDelay: p.metadata.RedeliveryDelay, + ReceiverQueueSize: p.metadata.ReceiverQueueSize, } if p.useConsumerEncryption() {