diff --git a/jetstream/jetstream_test.go b/jetstream/jetstream_test.go index 93e2026d9..cf7180ca1 100644 --- a/jetstream/jetstream_test.go +++ b/jetstream/jetstream_test.go @@ -16,6 +16,7 @@ package jetstream import ( "errors" "fmt" + "reflect" "testing" "time" @@ -274,3 +275,26 @@ func TestRetryWithBackoff(t *testing.T) { }) } } + +func TestOptions(t *testing.T) { + t.Run("publish", func(t *testing.T) { + t.Run("WithHeader", func(t *testing.T) { + customHeader := "test" + customHeaderValues := []string{"some", "values"} + po := pubOpts{} + sut := WithHeader(customHeader, customHeaderValues) + + err := sut(&po) + if err != nil { + t.Fatalf("expected nil error got: %s", err) + } + if po.headerValues == nil { + t.Fatal("nil headerValues") + } + + if got := po.headerValues.Values(customHeader); !reflect.DeepEqual(got, customHeaderValues) { + t.Fatalf("header %s not set - expected %+v got %+v", customHeader, customHeaderValues, got) + } + }) + }) +} diff --git a/jetstream/options.go b/jetstream/options.go index b47268e99..428c142e9 100644 --- a/jetstream/options.go +++ b/jetstream/options.go @@ -16,6 +16,8 @@ package jetstream import ( "fmt" "time" + + "github.com/nats-io/nats.go" ) type pullOptFunc func(*consumeOpts) error @@ -281,6 +283,17 @@ func WithStreamListSubject(subject string) StreamListOpt { } } +// WithHeader adds an arbitrary header to the underlying message. +func WithHeader(key string, value []string) PublishOpt { + return func(opts *pubOpts) error { + if opts.headerValues == nil { + opts.headerValues = nats.Header{} + } + opts.headerValues[key] = value + return nil + } +} + // WithMsgID sets the message ID used for deduplication. func WithMsgID(id string) PublishOpt { return func(opts *pubOpts) error { diff --git a/jetstream/publish.go b/jetstream/publish.go index 30371cdc8..90b47d92a 100644 --- a/jetstream/publish.go +++ b/jetstream/publish.go @@ -52,6 +52,9 @@ type ( // stallWait is the max wait of a async pub ack. stallWait time.Duration + + // headerValues contains any arbitrary headers provided by the caller + headerValues nats.Header } // PubAckFuture is a future for a PubAck. @@ -171,6 +174,12 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) } + if o.headerValues != nil { + for k, v := range o.headerValues { + m.Header[k] = v + } + } + var resp *nats.Msg var err error @@ -245,6 +254,12 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10)) } + if o.headerValues != nil { + for k, v := range o.headerValues { + m.Header[k] = v + } + } + // Reply if m.Reply != "" { return nil, ErrAsyncPublishReplySubjectSet