diff --git a/cmd/root.go b/cmd/root.go index acebb4f..5e8eebf 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -94,6 +94,8 @@ func RootCmd() *cobra.Command { "AMQP application properties, eg. key1=val1,val2") amqpPublisherFlags.StringSliceVar(&cfg.Amqp.Subjects, "amqp-subject", []string{}, "AMQP 1.0 message subject(s)") + amqpPublisherFlags.StringSliceVar(&cfg.Amqp.To, "amqp-to", []string{}, + "AMQP 1.0 message To field (required for the anonymous terminus)") amqpPublisherFlags.BoolVar(&cfg.Amqp.SendSettled, "amqp-send-settled", false, "Send settled messages (fire and forget)") diff --git a/pkg/amqp10/publisher.go b/pkg/amqp10/publisher.go index 0e8492c..7e99c2d 100644 --- a/pkg/amqp10/publisher.go +++ b/pkg/amqp10/publisher.go @@ -332,6 +332,7 @@ func (p *Amqp10Publisher) Stop(reason string) { func (p *Amqp10Publisher) prepareMessage() *amqp.Message { utils.UpdatePayload(p.Config.UseMillis, &p.msg) msg := amqp.NewMessage(p.msg) + msg.Properties = &amqp.MessageProperties{} if len(p.Config.Amqp.AppProperties) > 0 { msg.ApplicationProperties = make(map[string]any) @@ -341,7 +342,11 @@ func (p *Amqp10Publisher) prepareMessage() *amqp.Message { } if len(p.Config.Amqp.Subjects) > 0 { - msg.Properties = &amqp.MessageProperties{Subject: &p.Config.Amqp.Subjects[metrics.MessagesPublished.Get()%uint64(len(p.Config.Amqp.Subjects))]} + msg.Properties.Subject = &p.Config.Amqp.Subjects[metrics.MessagesPublished.Get()%uint64(len(p.Config.Amqp.Subjects))] + } + + if len(p.Config.Amqp.To) > 0 { + msg.Properties.To = &p.Config.Amqp.To[metrics.MessagesPublished.Get()%uint64(len(p.Config.Amqp.To))] } if p.Config.StreamFilterValueSet != "" { diff --git a/pkg/config/config.go b/pkg/config/config.go index 2e8c82e..e1be980 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -47,6 +47,7 @@ var QueueTypes = map[QueueType][]string{ type AmqpOptions struct { Subjects []string + To []string SendSettled bool ReleaseRate int RejectRate int