diff --git a/sandbox/Example.JetStream.PullConsumer/Program.cs b/sandbox/Example.JetStream.PullConsumer/Program.cs index fb6d64ee9..671b4f0b6 100644 --- a/sandbox/Example.JetStream.PullConsumer/Program.cs +++ b/sandbox/Example.JetStream.PullConsumer/Program.cs @@ -78,7 +78,8 @@ void Report(int i, Stopwatch sw, string data) var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max }; var fetchMsgCount = 0; - await foreach (var msg in consumer.FetchNoWaitAsync>(opts: fetchNoWaitOpts, cancellationToken: cts.Token)) + // NoWaitFetch is a specialized operation not available on the public interface. + await foreach (var msg in ((NatsJSConsumer)consumer).FetchNoWaitAsync>(opts: fetchNoWaitOpts, cancellationToken: cts.Token)) { fetchMsgCount++; using (msg.Data) diff --git a/src/NATS.Client.JetStream/INatsJSConsumer.cs b/src/NATS.Client.JetStream/INatsJSConsumer.cs new file mode 100644 index 000000000..8765c8757 --- /dev/null +++ b/src/NATS.Client.JetStream/INatsJSConsumer.cs @@ -0,0 +1,82 @@ +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public interface INatsJSConsumer +{ + /// + /// Consumer info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed. + /// + ConsumerInfo Info { get; } + + /// + /// Starts an enumerator consuming messages from the stream using this consumer. + /// + /// Serializer to use for the message type. + /// Consume options. (default: MaxMsgs 1,000) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. + IAsyncEnumerable> ConsumeAsync( + INatsDeserialize? serializer = default, + NatsJSConsumeOpts? opts = default, + CancellationToken cancellationToken = default); + + /// + /// Consume a single message from the stream using this consumer. + /// + /// Serializer to use for the message type. + /// Next message options. (default: 30 seconds timeout) + /// A used to cancel the call. + /// Message type to deserialize. + /// Message retrieved from the stream or NULL + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. + /// + /// + /// If the request to server expires (in 30 seconds by default) this call returns NULL. + /// + /// + /// This method is implemented as a fetch with MaxMsgs=1 which means every request will create a new subscription + /// on the NATS server. This would be inefficient if you're consuming a lot of messages and you should consider using + /// fetch or consume methods. + /// + /// + /// + /// The following example shows how you might process messages: + /// + /// var next = await consumer.NextAsync<Data>(); + /// if (next is { } msg) + /// { + /// // process the message + /// await msg.AckAsync(); + /// } + /// + /// + ValueTask?> NextAsync(INatsDeserialize? serializer = default, NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Consume a set number of messages from the stream using this consumer. + /// + /// Serializer to use for the message type. + /// Fetch options. (default: MaxMsgs 1,000 and timeout in 30 seconds) + /// A used to cancel the call. + /// Message type to deserialize. + /// Async enumerable of messages which can be used in a await foreach loop. + /// Consumer is deleted, it's push based or request sent to server is invalid. + /// There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier. + IAsyncEnumerable> FetchAsync( + INatsDeserialize? serializer = default, + NatsJSFetchOpts? opts = default, + CancellationToken cancellationToken = default); + + /// + /// Retrieve the consumer info from the server and update this consumer. + /// + /// A used to cancel the API call. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask RefreshAsync(CancellationToken cancellationToken = default); +} diff --git a/src/NATS.Client.JetStream/INatsJSContext.cs b/src/NATS.Client.JetStream/INatsJSContext.cs new file mode 100644 index 000000000..705dd42e9 --- /dev/null +++ b/src/NATS.Client.JetStream/INatsJSContext.cs @@ -0,0 +1,251 @@ +using System.Runtime.CompilerServices; +using NATS.Client.Core; +using NATS.Client.JetStream.Models; + +namespace NATS.Client.JetStream; + +public interface INatsJSContext +{ + /// + /// Creates new ordered consumer. + /// + /// Stream name to create the consumer under. + /// Ordered consumer options. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving ordered data from the stream. + ValueTask CreateOrderedConsumerAsync( + string stream, + NatsJSOrderedConsumerOpts? opts = default, + CancellationToken cancellationToken = default); + + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Stream name to create the consumer under. + /// Name of the consumer. + /// Ack policy to use. Must not be set to none. Default is explicit. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. + ValueTask CreateConsumerAsync( + string stream, + string consumer, + ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, + CancellationToken cancellationToken = default); + + /// + /// Creates new consumer if it doesn't exists or returns an existing one with the same name. + /// + /// Consumer creation request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// Ack policy is set to none or there was an issue retrieving the response. + /// Server responded with an error. + ValueTask CreateConsumerAsync( + ConsumerCreateRequest request, + CancellationToken cancellationToken = default); + + /// + /// Gets consumer information from the server and creates a NATS JetStream consumer . + /// + /// Stream name where consumer is associated to. + /// Consumer name. + /// A used to cancel the API call. + /// The NATS JetStream consumer object which can be used retrieving data from the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default); + + /// + /// Enumerates through consumers belonging to a stream. + /// + /// Stream name the consumers belong to. + /// A used to cancel the API call. + /// Async enumerable of consumer info objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them. + /// + IAsyncEnumerable ListConsumersAsync( + string stream, + CancellationToken cancellationToken = default); + + /// + /// Delete a consumer from a stream. + /// + /// Stream name where consumer is associated to. + /// Consumer name to be deleted. + /// A used to cancel the API call. + /// Whether the deletion was successful. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default); + + /// + /// Calls JetStream Account Info API. + /// + /// A used to cancel the API call. + /// The account information based on the NATS connection credentials. + ValueTask GetAccountInfoAsync(CancellationToken cancellationToken = default); + + /// + /// Sends data to a stream associated with the subject. + /// + /// Subject to publish the data to. + /// Data to publish. + /// Serializer to use for the message type. + /// Publish options. + /// Optional message headers. + /// A used to cancel the publishing call or the wait for response. + /// Type of the data being sent. + /// + /// The ACK response to indicate if stream accepted the message as well as additional + /// information like the sequence number of the message stored by the stream. + /// + /// There was a problem receiving the response. + /// + /// + /// Note that if the subject isn't backed by a stream or the connected NATS server + /// isn't running with JetStream enabled, this call will hang waiting for an ACK + /// until the request times out. + /// + /// + ValueTask PublishAsync( + string subject, + T? data, + INatsSerialize? serializer = default, + NatsJSPubOpts? opts = default, + NatsHeaders? headers = default, + CancellationToken cancellationToken = default); + + /// + /// Sends an empty payload to a stream associated with the subject. + /// + /// Subject to publish the data to. + /// Publish options. + /// Optional message headers. + /// A used to cancel the publishing call or the wait for response. + /// + /// The ACK response to indicate if stream accepted the message as well as additional + /// information like the sequence number of the message stored by the stream. + /// + /// There was a problem receiving the response. + /// + /// + /// Note that if the subject isn't backed by a stream or the connected NATS server + /// isn't running with JetStream enabled, this call will hang waiting for an ACK + /// until the request times out. + /// + /// + ValueTask PublishAsync( + string subject, + NatsJSPubOpts? opts = default, + NatsHeaders? headers = default, + CancellationToken cancellationToken = default); + + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Name of the stream to create. (e.g. my_events) + /// List of subjects stream will persist messages from. (e.g. events.*) + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default); + + /// + /// Creates a new stream if it doesn't exist or returns an existing stream with the same name. + /// + /// Stream configuration request to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask CreateStreamAsync( + StreamConfiguration request, + CancellationToken cancellationToken = default); + + /// + /// Deletes a stream. + /// + /// Stream name to be deleted. + /// A used to cancel the API call. + /// Whether delete was successful or not. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask DeleteStreamAsync( + string stream, + CancellationToken cancellationToken = default); + + /// + /// Purges all of the (or filtered) data in a stream, leaves the stream. + /// + /// Stream name to be purged. + /// Purge request. + /// A used to cancel the API call. + /// Purge response + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask PurgeStreamAsync( + string stream, + StreamPurgeRequest request, + CancellationToken cancellationToken = default); + + /// + /// Deletes a message from a stream. + /// + /// Stream name to delete message from. + /// Delete message request. + /// A used to cancel the API call. + /// Delete message response + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask DeleteMessageAsync( + string stream, + StreamMsgDeleteRequest request, + CancellationToken cancellationToken = default); + + /// + /// Get stream information from the server and creates a NATS JetStream stream object . + /// + /// Name of the stream to retrieve. + /// Stream info request options + /// A used to cancel the API call. + /// The NATS JetStream stream object which can be used to manage the stream. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask GetStreamAsync( + string stream, + StreamInfoRequest? request = null, + CancellationToken cancellationToken = default); + + /// + /// Update a NATS JetStream stream's properties. + /// + /// Stream update request object to be sent to NATS JetStream server. + /// A used to cancel the API call. + /// The updated NATS JetStream stream object. + /// There was an issue retrieving the response. + /// Server responded with an error. + ValueTask UpdateStreamAsync( + StreamUpdateRequest request, + CancellationToken cancellationToken = default); + + /// + /// Enumerates through the streams exists on the NATS JetStream server. + /// + /// Limit the list to streams matching this subject filter. + /// A used to cancel the API call. + /// Async enumerable of stream objects. Can be used in a await foreach loop. + /// There was an issue retrieving the response. + /// Server responded with an error. + /// + /// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them. + /// + IAsyncEnumerable ListStreamsAsync( + string? subject = default, + CancellationToken cancellationToken = default); +} diff --git a/src/NATS.Client.JetStream/NatsJSConsumer.cs b/src/NATS.Client.JetStream/NatsJSConsumer.cs index 4d1bd9b14..63f938978 100644 --- a/src/NATS.Client.JetStream/NatsJSConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSConsumer.cs @@ -9,7 +9,7 @@ namespace NATS.Client.JetStream; /// /// Represents a NATS JetStream consumer. /// -public class NatsJSConsumer +public class NatsJSConsumer : INatsJSConsumer { private readonly NatsJSContext _context; private readonly string _stream; diff --git a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs index 3fef3bb2f..b58b7b8e8 100644 --- a/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs +++ b/src/NATS.Client.JetStream/NatsJSContext.Consumers.cs @@ -5,7 +5,7 @@ namespace NATS.Client.JetStream; -public partial class NatsJSContext +public partial class NatsJSContext : INatsJSContext { /// /// Creates new ordered consumer. @@ -14,13 +14,13 @@ public partial class NatsJSContext /// Ordered consumer options. /// A used to cancel the API call. /// The NATS JetStream consumer object which can be used retrieving ordered data from the stream. - public ValueTask CreateOrderedConsumerAsync( + public ValueTask CreateOrderedConsumerAsync( string stream, NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) { opts ??= NatsJSOrderedConsumerOpts.Default; - return new ValueTask(new NatsJSOrderedConsumer(stream, this, opts, cancellationToken)); + return new ValueTask(new NatsJSOrderedConsumer(stream, this, opts, cancellationToken)); } /// @@ -33,7 +33,7 @@ public ValueTask CreateOrderedConsumerAsync( /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// Ack policy is set to none or there was an issue retrieving the response. /// Server responded with an error. - public ValueTask CreateConsumerAsync( + public ValueTask CreateConsumerAsync( string stream, string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, @@ -59,7 +59,7 @@ public ValueTask CreateConsumerAsync( /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// Ack policy is set to none or there was an issue retrieving the response. /// Server responded with an error. - public async ValueTask CreateConsumerAsync( + public async ValueTask CreateConsumerAsync( ConsumerCreateRequest request, CancellationToken cancellationToken = default) { @@ -94,7 +94,7 @@ public async ValueTask CreateConsumerAsync( /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// There was an issue retrieving the response. /// Server responded with an error. - public async ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) + public async ValueTask GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default) { var response = await JSRequestResponseAsync( subject: $"{Opts.Prefix}.CONSUMER.INFO.{stream}.{consumer}", diff --git a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs index 3b29be24a..58aee3a15 100644 --- a/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs +++ b/src/NATS.Client.JetStream/NatsJSOrderedConsumer.cs @@ -7,7 +7,7 @@ namespace NATS.Client.JetStream; /// /// NATS JetStream ordered consumer. /// -public class NatsJSOrderedConsumer +public class NatsJSOrderedConsumer : INatsJSConsumer { private readonly string _stream; private readonly NatsJSContext _context; @@ -29,8 +29,17 @@ public NatsJSOrderedConsumer(string stream, NatsJSContext context, NatsJSOrdered _context = context; _opts = opts; _cancellationToken = cancellationToken; + + // For ordered consumer we start with an empty consumer info object + // since consumers are created and updated during fetch and consume. + Info = new ConsumerInfo(); } + /// + /// Consumer info object created during consume and fetch operations. + /// + public ConsumerInfo Info { get; private set; } + /// /// Consume messages from the stream in order. /// @@ -183,6 +192,12 @@ public async IAsyncEnumerable> FetchAsync( return default; } + /// + /// For ordered consumer this is a no-op. + /// + /// A used to cancel the API call. + public ValueTask RefreshAsync(CancellationToken cancellationToken = default) => ValueTask.CompletedTask; + private async Task RecreateConsumer(string consumer, ulong seq, CancellationToken cancellationToken) { var consumerOpts = _opts; @@ -224,6 +239,8 @@ private async Task RecreateConsumer(string consumer, ulong seq, var info = await _context.CreateOrderedConsumerInternalAsync(_stream, consumerOpts, cancellationToken); + Info = info; + return new NatsJSConsumer(_context, info); } diff --git a/src/NATS.Client.JetStream/NatsJSStream.cs b/src/NATS.Client.JetStream/NatsJSStream.cs index 04638dfef..463e91cef 100644 --- a/src/NATS.Client.JetStream/NatsJSStream.cs +++ b/src/NATS.Client.JetStream/NatsJSStream.cs @@ -93,13 +93,13 @@ public async ValueTask UpdateAsync( /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. - public ValueTask CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default) + public ValueTask CreateConsumerAsync(string consumer, ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.CreateConsumerAsync(_name, consumer, ackPolicy, cancellationToken); } - public ValueTask CreateOrderedConsumerAsync(NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) + public ValueTask CreateOrderedConsumerAsync(NatsJSOrderedConsumerOpts? opts = default, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.CreateOrderedConsumerAsync(_name, opts, cancellationToken); @@ -113,7 +113,7 @@ public ValueTask CreateOrderedConsumerAsync(NatsJSOrdered /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// Ack policy is set to none or there is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. - public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) + public ValueTask CreateConsumerAsync(ConsumerCreateRequest request, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.CreateConsumerAsync(request, cancellationToken); @@ -127,7 +127,7 @@ public ValueTask CreateConsumerAsync(ConsumerCreateRequest reque /// The NATS JetStream consumer object which can be used retrieving data from the stream. /// There is an error retrieving the response or this consumer object isn't valid anymore because it was deleted earlier. /// Server responded with an error. - public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) + public ValueTask GetConsumerAsync(string consumer, CancellationToken cancellationToken = default) { ThrowIfDeleted(); return _context.GetConsumerAsync(_name, consumer, cancellationToken); diff --git a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs index 01773fed7..298175065 100644 --- a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs +++ b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs @@ -62,7 +62,6 @@ public void Subscription_should_not_be_collected_when_in_async_enumerator() } }); - var pub = Task.Run(async () => { while (Volatile.Read(ref sync) == 0) diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs index 33e32bda6..a016089a6 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerConsumeTest.cs @@ -35,7 +35,7 @@ public async Task Consume_msgs_test() } var consumerOpts = new NatsJSConsumeOpts { MaxMsgs = 10 }; - var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var cc = await consumer.ConsumeInternalAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) @@ -108,7 +108,7 @@ public async Task Consume_idle_heartbeat_test() MaxMsgs = 10, IdleHeartbeat = TimeSpan.FromSeconds(5), }; - var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; var cc = await consumer.ConsumeInternalAsync(serializer: TestDataJsonSerializer.Default, consumerOpts, cancellationToken: cts.Token); await foreach (var msg in cc.Msgs.ReadAllAsync(cts.Token)) @@ -171,7 +171,7 @@ public async Task Consume_reconnect_test() MaxMsgs = 10, }; - var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); // Not interested in management messages sent upto this point await proxy.FlushFramesAsync(nats); @@ -237,7 +237,7 @@ public async Task Consume_dispose_test() var js = new NatsJSContext(nats); var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); var consumerOpts = new NatsJSConsumeOpts { @@ -293,7 +293,7 @@ public async Task Consume_stop_test() var js = new NatsJSContext(nats); var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); var consumerOpts = new NatsJSConsumeOpts { diff --git a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs index 5d1090eed..a6f4b991b 100644 --- a/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs +++ b/tests/NATS.Client.JetStream.Tests/ConsumerFetchTest.cs @@ -25,7 +25,7 @@ public async Task Fetch_test() ack.EnsureSuccess(); } - var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await using var fc = await consumer.FetchInternalAsync(serializer: TestDataJsonSerializer.Default, new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token); @@ -55,7 +55,7 @@ public async Task FetchNoWait_test() ack.EnsureSuccess(); } - var consumer = await js.GetConsumerAsync("s1", "c1", cts.Token); + var consumer = (NatsJSConsumer)await js.GetConsumerAsync("s1", "c1", cts.Token); var count = 0; await foreach (var msg in consumer.FetchNoWaitAsync(serializer: TestDataJsonSerializer.Default, new NatsJSFetchOpts { MaxMsgs = 10 }, cancellationToken: cts.Token)) { @@ -77,7 +77,7 @@ public async Task Fetch_dispose_test() var js = new NatsJSContext(nats); var stream = await js.CreateStreamAsync("s1", new[] { "s1.*" }, cts.Token); - var consumer = await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); + var consumer = (NatsJSConsumer)await js.CreateConsumerAsync("s1", "c1", cancellationToken: cts.Token); var fetchOpts = new NatsJSFetchOpts { diff --git a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs index 4bac70a39..85d676af5 100644 --- a/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs +++ b/tests/NATS.Client.JetStream.Tests/JetStreamTest.cs @@ -34,7 +34,7 @@ public async Task Create_stream_test() Assert.Equal("events", stream.Info.Config.Name); // Create consumer - var consumer = await js.CreateConsumerAsync( + var consumer = (NatsJSConsumer)await js.CreateConsumerAsync( new ConsumerCreateRequest { StreamName = "events",