diff --git a/jetstream/errors.go b/jetstream/errors.go index fb364341c..8d2fec642 100644 --- a/jetstream/errors.go +++ b/jetstream/errors.go @@ -270,6 +270,9 @@ var ( // of an ordered consumer which was not yet created. ErrOrderedConsumerNotCreated JetStreamError = &jsError{message: "consumer instance not yet created"} + // ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called. + ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"} + // KeyValue Errors // ErrKeyExists is returned when attempting to create a key that already diff --git a/jetstream/jetstream.go b/jetstream/jetstream.go index 554400bf2..239cf383f 100644 --- a/jetstream/jetstream.go +++ b/jetstream/jetstream.go @@ -101,6 +101,19 @@ type ( // outstanding asynchronously published messages are acknowledged by the // server. PublishAsyncComplete() <-chan struct{} + + // CleanupPublisher will cleanup the publishing side of JetStreamContext. + // + // This will unsubscribe from the internal reply subject if needed. + // All pending async publishes will fail with ErrJetStreamContextClosed. + // + // If an error handler was provided, it will be called for each pending async + // publish and PublishAsyncComplete will be closed. + // + // After completing JetStreamContext is still usable - internal subscription + // will be recreated on next publish, but the acks from previous publishes will + // be lost. + CleanupPublisher() } // StreamManager provides CRUD API for managing streams. It is available as @@ -1032,6 +1045,41 @@ func wrapContextWithoutDeadline(ctx context.Context) (context.Context, context.C return context.WithTimeout(ctx, defaultAPITimeout) } +// CleanupPublisher will cleanup the publishing side of JetStreamContext. +// +// This will unsubscribe from the internal reply subject if needed. +// All pending async publishes will fail with ErrJetStreamContextClosed. +// +// If an error handler was provided, it will be called for each pending async +// publish and PublishAsyncComplete will be closed. +// +// After completing JetStreamContext is still usable - internal subscription +// will be recreated on next publish, but the acks from previous publishes will +// be lost. +func (js *jetStream) CleanupPublisher() { + js.cleanupReplySub() + js.publisher.Lock() + errCb := js.publisher.aecb + for id, paf := range js.publisher.acks { + paf.err = ErrJetStreamPublisherClosed + if paf.errCh != nil { + paf.errCh <- paf.err + } + if errCb != nil { + // clear reply subject so that new one is created on republish + js.publisher.Unlock() + errCb(js, paf.msg, ErrJetStreamPublisherClosed) + js.publisher.Lock() + } + delete(js.publisher.acks, id) + } + if js.publisher.doneCh != nil { + close(js.publisher.doneCh) + js.publisher.doneCh = nil + } + js.publisher.Unlock() +} + func (js *jetStream) cleanupReplySub() { if js.publisher == nil { return diff --git a/jetstream/test/jetstream_test.go b/jetstream/test/jetstream_test.go index f5c9c8ee1..a5bf59f18 100644 --- a/jetstream/test/jetstream_test.go +++ b/jetstream/test/jetstream_test.go @@ -1963,3 +1963,125 @@ func TestConsumerConfigMatches(t *testing.T) { t.Fatalf("ConsumerConfig doesn't match") } } + +func TestJetStreamCleanupPublisher(t *testing.T) { + + t.Run("cleanup js publisher", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + // Create a stream + if _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numSubs := nc.NumSubscriptions() + if _, err := js.PublishAsync("FOO", []byte("hello")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + if numSubs+1 != nc.NumSubscriptions() { + t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions()) + } + + js.CleanupPublisher() + + if numSubs != nc.NumSubscriptions() { + t.Fatalf("Expected subscriptions to be back to original count") + } + }) + + t.Run("cleanup js publisher, cancel pending acks", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + cbErr := make(chan error, 10) + js, err := jetstream.New(nc, jetstream.WithPublishAsyncErrHandler(func(js jetstream.JetStream, m *nats.Msg, err error) { + cbErr <- err + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a stream with NoAck so that we can test that we cancel ack futures. + if _, err := js.CreateStream(context.Background(), jetstream.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}, NoAck: true}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numSubs := nc.NumSubscriptions() + + var acks []jetstream.PubAckFuture + for i := 0; i < 10; i++ { + ack, err := js.PublishAsync("FOO", []byte("hello")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + acks = append(acks, ack) + } + + asyncComplete := js.PublishAsyncComplete() + select { + case <-asyncComplete: + t.Fatalf("Should not complete, NoAck is set") + case <-time.After(200 * time.Millisecond): + } + + if numSubs+1 != nc.NumSubscriptions() { + t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions()) + } + + js.CleanupPublisher() + + if numSubs != nc.NumSubscriptions() { + t.Fatalf("Expected subscriptions to be back to original count") + } + + // check that PublishAsyncComplete channel is closed + select { + case <-asyncComplete: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // check that all ack futures are cancelled + for _, ack := range acks { + select { + case err := <-ack.Err(): + if !errors.Is(err, jetstream.ErrJetStreamPublisherClosed) { + t.Fatalf("Expected JetStreamContextClosed error, got %v", err) + } + case <-ack.Ok(): + t.Fatalf("Expected error on the ack future") + case <-time.After(200 * time.Millisecond): + t.Fatalf("Expected an error on the ack future") + } + } + + // check that async error handler is called for each pending ack + for i := 0; i < 10; i++ { + select { + case err := <-cbErr: + if !errors.Is(err, jetstream.ErrJetStreamPublisherClosed) { + t.Fatalf("Expected JetStreamContextClosed error, got %v", err) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("Expected errors to be passed from the async handler") + } + } + }) + +} diff --git a/js.go b/js.go index 5f8dfe3ee..7c0f19c6b 100644 --- a/js.go +++ b/js.go @@ -58,6 +58,19 @@ type JetStream interface { // PublishAsyncComplete returns a channel that will be closed when all outstanding messages are ack'd. PublishAsyncComplete() <-chan struct{} + // CleanupPublisher will cleanup the publishing side of JetStreamContext. + // + // This will unsubscribe from the internal reply subject if needed. + // All pending async publishes will fail with ErrJetStreamContextClosed. + // + // If an error handler was provided, it will be called for each pending async + // publish and PublishAsyncComplete will be closed. + // + // After completing JetStreamContext is still usable - internal subscription + // will be recreated on next publish, but the acks from previous publishes will + // be lost. + CleanupPublisher() + // Subscribe creates an async Subscription for JetStream. // The stream and consumer names can be provided with the nats.Bind() option. // For creating an ephemeral (where the consumer name is picked by the server), @@ -734,6 +747,41 @@ func (js *js) resetPendingAcksOnReconnect() { } } +// CleanupPublisher will cleanup the publishing side of JetStreamContext. +// +// This will unsubscribe from the internal reply subject if needed. +// All pending async publishes will fail with ErrJetStreamContextClosed. +// +// If an error handler was provided, it will be called for each pending async +// publish and PublishAsyncComplete will be closed. +// +// After completing JetStreamContext is still usable - internal subscription +// will be recreated on next publish, but the acks from previous publishes will +// be lost. +func (js *js) CleanupPublisher() { + js.cleanupReplySub() + js.mu.Lock() + errCb := js.opts.aecb + for id, paf := range js.pafs { + paf.err = ErrJetStreamPublisherClosed + if paf.errCh != nil { + paf.errCh <- paf.err + } + if errCb != nil { + // clear reply subject so that new one is created on republish + js.mu.Unlock() + errCb(js, paf.msg, ErrJetStreamPublisherClosed) + js.mu.Lock() + } + delete(js.pafs, id) + } + if js.dch != nil { + close(js.dch) + js.dch = nil + } + js.mu.Unlock() +} + func (js *js) cleanupReplySub() { js.mu.Lock() if js.rsub != nil { diff --git a/jserrors.go b/jserrors.go index 2d942e771..1c22d812b 100644 --- a/jserrors.go +++ b/jserrors.go @@ -151,6 +151,9 @@ var ( // ErrSubscriptionClosed is returned when attempting to send pull request to a closed subscription ErrSubscriptionClosed JetStreamError = &jsError{message: "subscription closed"} + // ErrJetStreamPublisherClosed is returned for each unfinished ack future when JetStream.Cleanup is called. + ErrJetStreamPublisherClosed JetStreamError = &jsError{message: "jetstream context closed"} + // Deprecated: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") diff --git a/test/js_test.go b/test/js_test.go index 0363706c0..e8f5bdac2 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -8207,6 +8207,128 @@ func TestJetStreamPublishAsyncPerf(t *testing.T) { fmt.Printf("%.0f msgs/sec\n\n", float64(toSend)/tt.Seconds()) } +func TestJetStreamCleanupPublisher(t *testing.T) { + + t.Run("cleanup js publisher", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + // Create a stream. + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numSubs := nc.NumSubscriptions() + if _, err := js.PublishAsync("FOO", []byte("hello")); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + select { + case <-js.PublishAsyncComplete(): + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + if numSubs+1 != nc.NumSubscriptions() { + t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions()) + } + + js.CleanupPublisher() + + if numSubs != nc.NumSubscriptions() { + t.Fatalf("Expected subscriptions to be back to original count") + } + }) + + t.Run("cleanup js publisher, cancel pending acks", func(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + cbErr := make(chan error, 10) + js, err := nc.JetStream(nats.PublishAsyncErrHandler(func(js nats.JetStream, m *nats.Msg, err error) { + cbErr <- err + })) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Create a stream with NoAck so that we can test that we cancel ack futures. + if _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"FOO"}, NoAck: true}); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + numSubs := nc.NumSubscriptions() + + var acks []nats.PubAckFuture + for i := 0; i < 10; i++ { + ack, err := js.PublishAsync("FOO", []byte("hello")) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + acks = append(acks, ack) + } + + asyncComplete := js.PublishAsyncComplete() + select { + case <-asyncComplete: + t.Fatalf("Should not complete, NoAck is set") + case <-time.After(200 * time.Millisecond): + } + + if numSubs+1 != nc.NumSubscriptions() { + t.Fatalf("Expected an additional subscription after publish, got %d", nc.NumSubscriptions()) + } + + js.CleanupPublisher() + + if numSubs != nc.NumSubscriptions() { + t.Fatalf("Expected subscriptions to be back to original count") + } + + // check that PublishAsyncComplete channel is closed + select { + case <-asyncComplete: + case <-time.After(5 * time.Second): + t.Fatalf("Did not receive completion signal") + } + + // check that all ack futures are cancelled + for _, ack := range acks { + select { + case err := <-ack.Err(): + if !errors.Is(err, nats.ErrJetStreamPublisherClosed) { + t.Fatalf("Expected JetStreamContextClosed error, got %v", err) + } + case <-ack.Ok(): + t.Fatalf("Expected error on the ack future") + case <-time.After(200 * time.Millisecond): + t.Fatalf("Expected an error on the ack future") + } + } + + // check that async error handler is called for each pending ack + for i := 0; i < 10; i++ { + select { + case err := <-cbErr: + if !errors.Is(err, nats.ErrJetStreamPublisherClosed) { + t.Fatalf("Expected JetStreamContextClosed error, got %v", err) + } + case <-time.After(200 * time.Millisecond): + t.Fatalf("Expected errors to be passed from the async handler") + } + } + }) + +} + func TestJetStreamPublishExpectZero(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)