From f1a4619705000b3d850fecaee38935d28cda5e6f Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Wed, 6 Dec 2023 15:55:48 +0100 Subject: [PATCH] Add stream filtering support in STOMP --- cmd/root.go | 1 + pkg/config/config.go | 39 ++++++++++++++++++------------------ pkg/stomp_client/consumer.go | 5 +++++ 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 4c21f2b..9d348df 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -175,6 +175,7 @@ func RootCmd() *cobra.Command { rootCmd.PersistentFlags(). BoolVarP(&cfg.MessageDurability, "message-durability", "d", true, "Mark messages as durable (default=true)") rootCmd.PersistentFlags().StringVar(&cfg.StreamOffset, "stream-offset", "", "Stream consumer offset specification (default=next)") + rootCmd.PersistentFlags().StringVar(&cfg.StreamFilterValues, "stream-filter-values", "", "Stream consumer filter") rootCmd.PersistentFlags().IntVar(&cfg.ConsumerCredits, "consumer-credits", 1, "AMQP-1.0 consumer credits / STOMP prefetch count") rootCmd.AddCommand(amqp_amqp) diff --git a/pkg/config/config.go b/pkg/config/config.go index f85cb1d..a217616 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -30,25 +30,26 @@ type MqttOptions struct { } type Config struct { - PublisherUri string - ConsumerUri string - Publishers int - Consumers int - PublishCount int - ConsumeCount int - PublishTo string - ConsumeFrom string - ConsumerCredits int - Size int - Rate int - Duration time.Duration - UseMillis bool - QueueDurability AmqpDurabilityMode - MessageDurability bool - StreamOffset string - Amqp AmqpOptions - MqttPublisher MqttOptions - MqttConsumer MqttOptions + PublisherUri string + ConsumerUri string + Publishers int + Consumers int + PublishCount int + ConsumeCount int + PublishTo string + ConsumeFrom string + ConsumerCredits int + Size int + Rate int + Duration time.Duration + UseMillis bool + QueueDurability AmqpDurabilityMode + MessageDurability bool + StreamOffset string + StreamFilterValues string + Amqp AmqpOptions + MqttPublisher MqttOptions + MqttConsumer MqttOptions } func NewConfig() Config { diff --git a/pkg/stomp_client/consumer.go b/pkg/stomp_client/consumer.go index 429bc3a..687c7df 100644 --- a/pkg/stomp_client/consumer.go +++ b/pkg/stomp_client/consumer.go @@ -110,5 +110,10 @@ func buildSubscribeOpts(cfg config.Config) []func(*frame.Frame) error { ) } + if cfg.StreamFilterValues != "" { + subscribeOpts = append(subscribeOpts, + stomp.SubscribeOpt.Header("x-stream-filter", cfg.StreamFilterValues)) + } + log.Info("subscribe options", "filter", cfg.StreamFilterValues) return subscribeOpts }