Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unexpected delay and duplicate messages on Fetch for pull-based subscription #1696

Open
andrey-shalamov opened this issue Aug 8, 2024 · 0 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@andrey-shalamov
Copy link

Observed behavior

Output of the test from "Steps to Reproduce" section:

=== RUN   TestNATSPullBatch
2024/08/08 12:32:41.545645 [0] before fetch
2024/08/08 12:32:51.546283 [0] after fetch dur=10.000473152s, err=context deadline exceeded, msgs=0
2024/08/08 12:32:51.546329 [1] before fetch
2024/08/08 12:32:52.546214 sending msg-1
2024/08/08 12:32:52.551308 [1] after fetch dur=1.00496966s, err=<nil>, msgs=1
2024/08/08 12:32:52.551424 [1] received msg id=msg-1, data=msg-1
2024/08/08 12:32:52.551473 [2] before fetch
2024/08/08 12:32:53.550741 sending msg-2
**// unexpected delay here**
2024/08/08 12:32:58.555258 [2] after fetch dur=6.003755537s, err=<nil>, msgs=2
2024/08/08 12:32:58.555386 [2] received msg id=msg-2, data=msg-2 **// received duplicate messages**
2024/08/08 12:32:58.555511 [2] received msg id=msg-2, data=msg-2
--- PASS: TestNATSPullBatch (17.02s)

Expected behavior

No delay and no duplicates

Server and client version

server: nats:2.10.18-alpine

client go.mod

module nats/test

go 1.22

require (
	github.com/nats-io/nats.go v1.36.0
	github.com/stretchr/testify v1.9.0
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/klauspost/compress v1.17.4 // indirect
	github.com/kr/pretty v0.3.0 // indirect
	github.com/nats-io/nkeys v0.4.7 // indirect
	github.com/nats-io/nuid v1.0.1 // indirect
	github.com/pmezard/go-difflib v1.0.0 // indirect
	github.com/rogpeppe/go-internal v1.8.1 // indirect
	golang.org/x/crypto v0.22.0 // indirect
	golang.org/x/sys v0.19.0 // indirect
	gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
)

Host environment

5.15.133.1-microsoft-standard-WSL2 x86_64
Ubuntu 20.04.3 LTS

Steps to reproduce

run nats server
docker run --rm -p 4222:4222 nats:2.10.18-alpine -js -DV

run test
go text -v -count 1

package nats_test

import (
	"context"
	"log"
	"sync"
	"testing"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/stretchr/testify/require"
)

func TestNATSPullBatch(t *testing.T) {
	const fetchTimeout = 10 * time.Second
	const ackWait = 5 * time.Second

	ctx := context.Background()

	nc, err := nats.Connect("nats://127.0.0.1:4222")
	require.NoError(t, err)
	t.Cleanup(func() { _ = nc.Drain() })
	t.Cleanup(nc.Close)

	jsCtx, err := nc.JetStream()
	require.NoError(t, err)

	const subject = "PullBatch"
	streamCfg := nats.StreamConfig{
		Name:       "PullBatch",
		Subjects:   []string{subject},
		Retention:  nats.InterestPolicy,
		Storage:    nats.FileStorage,
		Duplicates: time.Minute,
	}
	_, err = jsCtx.AddStream(&streamCfg)
	require.NoError(t, err)

	consumerName := "consumer-" + subject

	_, err = jsCtx.AddConsumer(streamCfg.Name, &nats.ConsumerConfig{
		FilterSubject: subject,
		Durable:       consumerName,
		DeliverPolicy: nats.DeliverAllPolicy,
		AckPolicy:     nats.AckExplicitPolicy,
		AckWait:       ackWait,
	})
	require.NoError(t, err)

	sub, err := jsCtx.PullSubscribe(subject, consumerName)
	require.NoError(t, err)

	log.SetFlags(log.Ldate | log.Lmicroseconds | log.LUTC)

	var done bool
	var wg sync.WaitGroup
	wg.Add(1)
	go func() {
		defer wg.Done()
		for i := 0; ; i++ {
			if done {
				return
			}
			log.Printf("[%d] before fetch", i)

			ctxWithTimeout, cancel := context.WithTimeout(ctx, fetchTimeout)
			t1 := time.Now()
			msgs, err := sub.Fetch(100, nats.Context(ctxWithTimeout))
			cancel()

			log.Printf("[%d] after fetch dur=%v, err=%v, msgs=%v", i, time.Since(t1).String(), err, len(msgs))
			for _, m := range msgs {
				log.Printf("[%d] received msg id=%v, data=%v", i, m.Header.Get(nats.MsgIdHdr), string(m.Data))
				require.NoError(t, m.Ack())
			}
		}
	}()

	time.Sleep(fetchTimeout + time.Second)

	log.Println("sending msg-1")
	_, err = jsCtx.PublishMsg(&nats.Msg{
		Subject: subject,
		Header:  map[string][]string{nats.MsgIdHdr: {"msg-1"}},
		Data:    []byte("msg-1"),
	})
	require.NoError(t, err)

	time.Sleep(time.Second)

	log.Println("sending msg-2")
	_, err = jsCtx.PublishMsg(&nats.Msg{
		Subject: subject,
		Header:  map[string][]string{nats.MsgIdHdr: {"msg-2"}},
		Data:    []byte("msg-2"),
	})
	require.NoError(t, err)

	done = true
	wg.Wait()
}

output

=== RUN   TestNATSPullBatch
2024/08/08 12:32:41.545645 [0] before fetch
2024/08/08 12:32:51.546283 [0] after fetch dur=10.000473152s, err=context deadline exceeded, msgs=0
2024/08/08 12:32:51.546329 [1] before fetch
2024/08/08 12:32:52.546214 sending msg-1
2024/08/08 12:32:52.551308 [1] after fetch dur=1.00496966s, err=<nil>, msgs=1
2024/08/08 12:32:52.551424 [1] received msg id=msg-1, data=msg-1
2024/08/08 12:32:52.551473 [2] before fetch
2024/08/08 12:32:53.550741 sending msg-2
**// unexpected delay here**
2024/08/08 12:32:58.555258 [2] after fetch dur=6.003755537s, err=<nil>, msgs=2
2024/08/08 12:32:58.555386 [2] received msg id=msg-2, data=msg-2 **// received duplicate messages**
2024/08/08 12:32:58.555511 [2] received msg id=msg-2, data=msg-2
--- PASS: TestNATSPullBatch (17.02s)

nats server logs in attached file
nats.log

no_wait doesn't work as expected?

[1] 2024/08/08 12:32:41.545972 [TRC] 172.17.0.1:42804 - cid:5 - <<- [PUB $JS.API.CONSUMER.MSG.NEXT.PullBatch.consumer-PullBatch _INBOX.0WIUYssoqh0OGPeYXSQkar.0WIUYssoqh0OGPeYXSQkep 49]
[1] 2024/08/08 12:32:41.545977 [TRC] 172.17.0.1:42804 - cid:5 - <<- MSG_PAYLOAD: ["{\"expires\":9989991100,\"batch\":100,\"no_wait\":true}"]
... // actually we wait here
[1] 2024/08/08 12:32:51.536543 [TRC] 172.17.0.1:42804 - cid:5 - ->> [HMSG _INBOX.0WIUYssoqh0OGPeYXSQkar.0WIUYssoqh0OGPeYXSQkep 2 83 83]
@andrey-shalamov andrey-shalamov added the defect Suspected defect such as a bug or regression label Aug 8, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant