Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream public interfaces #193

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sandbox/Example.JetStream.PullConsumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ void Report(int i, Stopwatch sw, string data)
var fetchNoWaitOpts = new NatsJSFetchOpts { MaxMsgs = max };
var fetchMsgCount = 0;

await foreach (var msg in consumer.FetchNoWaitAsync<NatsMemoryOwner<byte>>(opts: fetchNoWaitOpts, cancellationToken: cts.Token))
// NoWaitFetch is a specialized operation not available on the public interface.
await foreach (var msg in ((NatsJSConsumer)consumer).FetchNoWaitAsync<NatsMemoryOwner<byte>>(opts: fetchNoWaitOpts, cancellationToken: cts.Token))
{
fetchMsgCount++;
using (msg.Data)
Expand Down
82 changes: 82 additions & 0 deletions src/NATS.Client.JetStream/INatsJSConsumer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;

public interface INatsJSConsumer
{
/// <summary>
/// Consumer info object as retrieved from NATS JetStream server at the time this object was created, updated or refreshed.
/// </summary>
ConsumerInfo Info { get; }

/// <summary>
/// Starts an enumerator consuming messages from the stream using this consumer.
/// </summary>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Consume options. (default: <c>MaxMsgs</c> 1,000)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <typeparam name="T">Message type to deserialize.</typeparam>
/// <returns>Async enumerable of messages which can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSProtocolException">Consumer is deleted, it's push based or request sent to server is invalid.</exception>
IAsyncEnumerable<NatsJSMsg<T>> ConsumeAsync<T>(
INatsDeserialize<T>? serializer = default,
NatsJSConsumeOpts? opts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Consume a single message from the stream using this consumer.
/// </summary>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Next message options. (default: 30 seconds timeout)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <typeparam name="T">Message type to deserialize.</typeparam>
/// <returns>Message retrieved from the stream or <c>NULL</c></returns>
/// <exception cref="NatsJSProtocolException">Consumer is deleted, it's push based or request sent to server is invalid.</exception>
/// <exception cref="NatsJSException">There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier.</exception>
/// <remarks>
/// <para>
/// If the request to server expires (in 30 seconds by default) this call returns <c>NULL</c>.
/// </para>
/// <para>
/// This method is implemented as a fetch with <c>MaxMsgs=1</c> which means every request will create a new subscription
/// on the NATS server. This would be inefficient if you're consuming a lot of messages and you should consider using
/// fetch or consume methods.
/// </para>
/// </remarks>
/// <example>
/// The following example shows how you might process messages:
/// <code lang="C#">
/// var next = await consumer.NextAsync&lt;Data&gt;();
/// if (next is { } msg)
/// {
/// // process the message
/// await msg.AckAsync();
/// }
/// </code>
/// </example>
ValueTask<NatsJSMsg<T>?> NextAsync<T>(INatsDeserialize<T>? serializer = default, NatsJSNextOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Consume a set number of messages from the stream using this consumer.
/// </summary>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Fetch options. (default: <c>MaxMsgs</c> 1,000 and timeout in 30 seconds)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the call.</param>
/// <typeparam name="T">Message type to deserialize.</typeparam>
/// <returns>Async enumerable of messages which can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSProtocolException">Consumer is deleted, it's push based or request sent to server is invalid.</exception>
/// <exception cref="NatsJSException">There is an error sending the message or this consumer object isn't valid anymore because it was deleted earlier.</exception>
IAsyncEnumerable<NatsJSMsg<T>> FetchAsync<T>(
INatsDeserialize<T>? serializer = default,
NatsJSFetchOpts? opts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Retrieve the consumer info from the server and update this consumer.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask RefreshAsync(CancellationToken cancellationToken = default);
}
251 changes: 251 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
using System.Runtime.CompilerServices;
using NATS.Client.Core;
using NATS.Client.JetStream.Models;

namespace NATS.Client.JetStream;

public interface INatsJSContext
{
/// <summary>
/// Creates new ordered consumer.
/// </summary>
/// <param name="stream">Stream name to create the consumer under.</param>
/// <param name="opts">Ordered consumer options.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving ordered data from the stream.</returns>
ValueTask<INatsJSConsumer> CreateOrderedConsumerAsync(
string stream,
NatsJSOrderedConsumerOpts? opts = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer if it doesn't exists or returns an existing one with the same name.
/// </summary>
/// <param name="stream">Stream name to create the consumer under.</param>
/// <param name="consumer">Name of the consumer.</param>
/// <param name="ackPolicy">Ack policy to use. Must not be set to <c>none</c>. Default is <c>explicit</c>.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(
string stream,
string consumer,
ConsumerConfigurationAckPolicy ackPolicy = ConsumerConfigurationAckPolicy.@explicit,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates new consumer if it doesn't exists or returns an existing one with the same name.
/// </summary>
/// <param name="request">Consumer creation request to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">Ack policy is set to <c>none</c> or there was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> CreateConsumerAsync(
ConsumerCreateRequest request,
CancellationToken cancellationToken = default);

/// <summary>
/// Gets consumer information from the server and creates a NATS JetStream consumer <see cref="NatsJSConsumer"/>.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream consumer object which can be used retrieving data from the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<INatsJSConsumer> GetConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Enumerates through consumers belonging to a stream.
/// </summary>
/// <param name="stream">Stream name the consumers belong to.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Async enumerable of consumer info objects. Can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <remarks>
/// Note that paging isn't implemented. You might receive only a partial list of consumers if there are a lot of them.
/// </remarks>
IAsyncEnumerable<ConsumerInfo> ListConsumersAsync(
string stream,
CancellationToken cancellationToken = default);

/// <summary>
/// Delete a consumer from a stream.
/// </summary>
/// <param name="stream">Stream name where consumer is associated to.</param>
/// <param name="consumer">Consumer name to be deleted.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether the deletion was successful.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<bool> DeleteConsumerAsync(string stream, string consumer, CancellationToken cancellationToken = default);

/// <summary>
/// Calls JetStream Account Info API.
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The account information based on the NATS connection credentials.</returns>
ValueTask<AccountInfoResponse> GetAccountInfoAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Sends data to a stream associated with the subject.
/// </summary>
/// <param name="subject">Subject to publish the data to.</param>
/// <param name="data">Data to publish.</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Publish options.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the publishing call or the wait for response.</param>
/// <typeparam name="T">Type of the data being sent.</typeparam>
/// <returns>
/// The ACK response to indicate if stream accepted the message as well as additional
/// information like the sequence number of the message stored by the stream.
/// </returns>
/// <exception cref="NatsJSException">There was a problem receiving the response.</exception>
/// <remarks>
/// <para>
/// Note that if the subject isn't backed by a stream or the connected NATS server
/// isn't running with JetStream enabled, this call will hang waiting for an ACK
/// until the request times out.
/// </para>
/// </remarks>
ValueTask<PubAckResponse> PublishAsync<T>(
string subject,
T? data,
INatsSerialize<T>? serializer = default,
NatsJSPubOpts? opts = default,
NatsHeaders? headers = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Sends an empty payload to a stream associated with the subject.
/// </summary>
/// <param name="subject">Subject to publish the data to.</param>
/// <param name="opts">Publish options.</param>
/// <param name="headers">Optional message headers.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the publishing call or the wait for response.</param>
/// <returns>
/// The ACK response to indicate if stream accepted the message as well as additional
/// information like the sequence number of the message stored by the stream.
/// </returns>
/// <exception cref="NatsJSException">There was a problem receiving the response.</exception>
/// <remarks>
/// <para>
/// Note that if the subject isn't backed by a stream or the connected NATS server
/// isn't running with JetStream enabled, this call will hang waiting for an ACK
/// until the request times out.
/// </para>
/// </remarks>
ValueTask<PubAckResponse> PublishAsync(
string subject,
NatsJSPubOpts? opts = default,
NatsHeaders? headers = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new stream if it doesn't exist or returns an existing stream with the same name.
/// </summary>
/// <param name="stream">Name of the stream to create. (e.g. my_events)</param>
/// <param name="subjects">List of subjects stream will persist messages from. (e.g. events.*)</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> CreateStreamAsync(string stream, string[] subjects, CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new stream if it doesn't exist or returns an existing stream with the same name.
/// </summary>
/// <param name="request">Stream configuration request to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> CreateStreamAsync(
StreamConfiguration request,
CancellationToken cancellationToken = default);

/// <summary>
/// Deletes a stream.
/// </summary>
/// <param name="stream">Stream name to be deleted.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Whether delete was successful or not.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<bool> DeleteStreamAsync(
string stream,
CancellationToken cancellationToken = default);

/// <summary>
/// Purges all of the (or filtered) data in a stream, leaves the stream.
/// </summary>
/// <param name="stream">Stream name to be purged.</param>
/// <param name="request">Purge request.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Purge response</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<StreamPurgeResponse> PurgeStreamAsync(
string stream,
StreamPurgeRequest request,
CancellationToken cancellationToken = default);

/// <summary>
/// Deletes a message from a stream.
/// </summary>
/// <param name="stream">Stream name to delete message from.</param>
/// <param name="request">Delete message request.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Delete message response</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<StreamMsgDeleteResponse> DeleteMessageAsync(
string stream,
StreamMsgDeleteRequest request,
CancellationToken cancellationToken = default);

/// <summary>
/// Get stream information from the server and creates a NATS JetStream stream object <see cref="NatsJSStream"/>.
/// </summary>
/// <param name="stream">Name of the stream to retrieve.</param>
/// <param name="request">Stream info request options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> GetStreamAsync(
string stream,
StreamInfoRequest? request = null,
CancellationToken cancellationToken = default);

/// <summary>
/// Update a NATS JetStream stream's properties.
/// </summary>
/// <param name="request">Stream update request object to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The updated NATS JetStream stream object.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
ValueTask<NatsJSStream> UpdateStreamAsync(
StreamUpdateRequest request,
CancellationToken cancellationToken = default);

/// <summary>
/// Enumerates through the streams exists on the NATS JetStream server.
/// </summary>
/// <param name="subject">Limit the list to streams matching this subject filter.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Async enumerable of stream objects. Can be used in a <c>await foreach</c> loop.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <remarks>
/// Note that paging isn't implemented. You might receive only a partial list of streams if there are a lot of them.
/// </remarks>
IAsyncEnumerable<NatsJSStream> ListStreamsAsync(
string? subject = default,
CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/NATS.Client.JetStream/NatsJSConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace NATS.Client.JetStream;
/// <summary>
/// Represents a NATS JetStream consumer.
/// </summary>
public class NatsJSConsumer
public class NatsJSConsumer : INatsJSConsumer
{
private readonly NatsJSContext _context;
private readonly string _stream;
Expand Down
Loading