Skip to content

Commit

Permalink
Jetstream CreateOrUpdateStream (#692)
Browse files Browse the repository at this point in the history
* #676 CreateOrUpdateStreamAsync

* Added CreateOrUpdateStream

* #676 CreateOrUpdateStreamAsync

* Added tests for CreateOrUpdateStream api
  • Loading branch information
Ivandemidov00 authored Dec 7, 2024
1 parent 0d0f030 commit ff8cb03
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 0 deletions.
14 changes: 14 additions & 0 deletions src/NATS.Client.JetStream/INatsJSContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,20 @@ ValueTask<INatsJSStream> CreateStreamAsync(
StreamConfig config,
CancellationToken cancellationToken = default);

/// <summary>
/// Creates a new stream if it doesn't exist or update if the stream already exists.
/// </summary>
/// <param name="config">Stream configuration request to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The stream name in <paramref name="config"/> is invalid.</exception>
/// <exception cref="ArgumentNullException">The name in <paramref name="config"/> is <c>null</c>.</exception>
ValueTask<INatsJSStream> CreateOrUpdateStreamAsync(
StreamConfig config,
CancellationToken cancellationToken = default);

/// <summary>
/// Deletes a stream.
/// </summary>
Expand Down
27 changes: 27 additions & 0 deletions src/NATS.Client.JetStream/NatsJSContext.Streams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,33 @@ public async ValueTask<INatsJSStream> CreateStreamAsync(
return new NatsJSStream(this, response);
}

/// <summary>
/// Creates a new stream if it doesn't exist or update if the stream already exists.
/// </summary>
/// <param name="config">Stream configuration request to be sent to NATS JetStream server.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The NATS JetStream stream object which can be used to manage the stream.</returns>
/// <exception cref="NatsJSException">There was an issue retrieving the response.</exception>
/// <exception cref="NatsJSApiException">Server responded with an error.</exception>
/// <exception cref="ArgumentException">The stream name in <paramref name="config"/> is invalid.</exception>
/// <exception cref="ArgumentNullException">The name in <paramref name="config"/> is <c>null</c>.</exception>
public async ValueTask<INatsJSStream> CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default)
{
ThrowIfInvalidStreamName(config.Name, nameof(config.Name));
var response = await JSRequestAsync<StreamConfig, StreamUpdateResponse>(
subject: $"{Opts.Prefix}.STREAM.UPDATE.{config.Name}",
request: config,
cancellationToken);

if (response.Error is { Code: 404 })
{
return await CreateStreamAsync(config, cancellationToken);
}

response.EnsureSuccess();
return new NatsJSStream(this, response.Response!);
}

/// <summary>
/// Deletes a stream.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions tests/NATS.Client.JetStream.Tests/JetStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ public async Task Stream_invalid_name_test(string? streamName)
// Create stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.CreateStreamAsync(cfg));

// Create or update stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.CreateOrUpdateStreamAsync(cfg));

// Delete stream
await Assert.ThrowsAnyAsync<ArgumentException>(async () => await jsmContext.DeleteStreamAsync(streamName!));

Expand Down
57 changes: 57 additions & 0 deletions tests/NATS.Client.JetStream.Tests/ManageStreamTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,61 @@ public async Task Delete_one_msg()

Assert.Equal(2, stream.Info.State.Subjects?.Count);
}

[Fact]
public async Task Create_or_update_stream_should_be_create_stream_if_stream_doesnt_exist()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var streamConfig = new StreamConfig("s1", ["s1.*"])
{ Storage = StreamConfigStorage.File };

var accountInfoBefore = await js.GetAccountInfoAsync(cts.Token);
await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token);
var accountInfoAfter = await js.GetAccountInfoAsync(cts.Token);

Assert.Equal(0, accountInfoBefore.Streams);
Assert.Equal(1, accountInfoAfter.Streams);
}

[Fact]
public async Task Create_or_update_stream_should_be_update_stream_if_stream_exist()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var streamConfig = new StreamConfig("s1", ["s1.*"])
{ Storage = StreamConfigStorage.File, NoAck = false };
var streamConfigForUpdated = streamConfig with { NoAck = true };

var stream = await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token);
var updatedStream = await js.CreateOrUpdateStreamAsync(streamConfigForUpdated, cts.Token);

Assert.False(stream.Info.Config.NoAck);
Assert.True(updatedStream.Info.Config.NoAck);
}

[Fact]
public async Task Create_or_update_stream_should_be_throwing_update_operation_errors()
{
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));

await using var server = NatsServer.StartJS();
var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);

var streamConfig = new StreamConfig("s1", ["s1.*"])
{ Storage = StreamConfigStorage.File };
var streamConfigForUpdated = streamConfig with { Storage = StreamConfigStorage.Memory };

await js.CreateOrUpdateStreamAsync(streamConfig, cts.Token);
await Assert.ThrowsAsync<NatsJSApiException>(async () => await js.CreateOrUpdateStreamAsync(streamConfigForUpdated, cts.Token));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class MockJsContext : INatsJSContext

public ValueTask<INatsJSStream> CreateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<INatsJSStream> CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<bool> DeleteStreamAsync(string stream, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<StreamPurgeResponse> PurgeStreamAsync(string stream, StreamPurgeRequest request, CancellationToken cancellationToken = default) => throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class MockJsContext : INatsJSContext

public ValueTask<INatsJSStream> CreateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<INatsJSStream> CreateOrUpdateStreamAsync(StreamConfig config, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<bool> DeleteStreamAsync(string stream, CancellationToken cancellationToken = default) => throw new NotImplementedException();

public ValueTask<StreamPurgeResponse> PurgeStreamAsync(string stream, StreamPurgeRequest request, CancellationToken cancellationToken = default) => throw new NotImplementedException();
Expand Down

0 comments on commit ff8cb03

Please sign in to comment.