From 79f964cc64e116b5138ef3379fba6d54a8283330 Mon Sep 17 00:00:00 2001 From: Alberto Ricart Date: Wed, 2 Oct 2024 12:44:08 -0500 Subject: [PATCH] Added test on direct get with subject filter Signed-off-by: Alberto Ricart --- server/jetstream_test.go | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index c88e5b23684..f9c9da369ea 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -22486,6 +22486,52 @@ func TestJetStreamDirectGetBatch(t *testing.T) { checkResponses(sub, 3, "foo.foo", "foo.bar", "foo.baz", _EMPTY_) } +func TestJetStreamDirectGetBatchFiltered(t *testing.T) { + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"a", "b"}, + MaxMsgsPerSubject: -1, + AllowDirect: true, + }) + require_NoError(t, err) + + js.Publish("a", []byte("HELLO")) + js.Publish("b", []byte("WORLD")) + js.Publish("a", []byte("AGAIN")) + + si, err := js.StreamInfo("TEST") + require_NoError(t, err) + require_Equal(t, 3, si.State.Msgs) + + // Direct subjects. + sendRequest := func(mreq *JSApiMsgGetRequest) *nats.Subscription { + t.Helper() + req, _ := json.Marshal(mreq) + // We will get multiple responses so can't do normal request. + reply := nats.NewInbox() + sub, err := nc.SubscribeSync(reply) + require_NoError(t, err) + err = nc.PublishRequest("$JS.API.DIRECT.GET.TEST", reply, req) + require_NoError(t, err) + return sub + } + + // Run some simple tests. + sub := sendRequest(&JSApiMsgGetRequest{Seq: 1, Batch: 1, MultiLastFor: []string{"a", "b"}}) + defer sub.Unsubscribe() + msg, err := sub.NextMsg(10 * time.Millisecond) + require_NoError(t, err) + require_Equal(t, "1", msg.Header.Get(JSSequence)) + require_Equal(t, "2", msg.Header.Get(JSNumPending)) + require_Equal(t, "a", msg.Header.Get(JSSubject)) +} + func TestJetStreamDirectGetBatchMaxBytes(t *testing.T) { s := RunBasicJetStreamServer(t) defer s.Shutdown()