Skip to content

Commit

Permalink
Obj store empty list fixed (#578)
Browse files Browse the repository at this point in the history
* Added OnNoData Async Func for break the async enumeration

* Consumer Dispose fix, CreateSub operation didn't have time for execute request, when call dispose

* Added a comment to the dispose order
  • Loading branch information
Ivandemidov00 authored Jul 24, 2024
1 parent 4832d15 commit a8daa4b
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 3 deletions.
10 changes: 7 additions & 3 deletions src/NATS.Client.JetStream/Internal/NatsJSOrderedPushConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -141,13 +141,17 @@ public async ValueTask DisposeAsync()
{
_nats.ConnectionDisconnected -= OnDisconnected;

// For correctly Dispose,
// first stop the consumer Creation operations and then the command execution operations.
// It is necessary that all consumerCreation operations have time to complete before command CommandLoop stop
_consumerCreateChannel.Writer.TryComplete();
_commandChannel.Writer.TryComplete();
_msgChannel.Writer.TryComplete();

await _consumerCreateTask;

_commandChannel.Writer.TryComplete();
await _commandTask;

_msgChannel.Writer.TryComplete();

await _context.DeleteConsumerAsync(_stream, Consumer, _cancellationToken);
}

Expand Down
19 changes: 19 additions & 0 deletions src/NATS.Client.ObjectStore/NatsObjStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ public IAsyncEnumerable<ObjectMetadata> ListAsync(NatsObjListOpts? opts = defaul
InitialSetOnly = true,
UpdatesOnly = false,
IgnoreDeletes = !opts.ShowDeleted,
OnNoData = opts.OnNoData,
};
return WatchAsync(watchOpts, cancellationToken);
}
Expand Down Expand Up @@ -625,6 +626,14 @@ public async IAsyncEnumerable<ObjectMetadata> WatchAsync(NatsObjWatchOpts? opts

pushConsumer.Init();

if (pushConsumer.Msgs.Count == 0 && opts.OnNoData != null)
{
if (await opts.OnNoData(cancellationToken))
{
yield break;
}
}

await foreach (var msg in pushConsumer.Msgs.ReadAllAsync(cancellationToken).ConfigureAwait(false))
{
if (pushConsumer.IsDone)
Expand Down Expand Up @@ -727,11 +736,21 @@ public record NatsObjWatchOpts
/// Only return the initial set of objects and don't watch for further updates.
/// </summary>
public bool InitialSetOnly { get; init; }

/// <summary>
/// Async function called when the enumerator reaches the end of data. Return True to break the async enumeration, False to allow the enumeration to continue.
/// </summary>
public Func<CancellationToken, ValueTask<bool>>? OnNoData { get; init; }
}

public record NatsObjListOpts
{
public bool ShowDeleted { get; init; }

/// <summary>
/// Async function called when the enumerator reaches the end of data. Return True to break the async enumeration, False to allow the enumeration to continue.
/// </summary>
public Func<CancellationToken, ValueTask<bool>>? OnNoData { get; init; }
}

public record NatsObjStatus(string Bucket, bool IsCompressed, StreamInfo Info);
43 changes: 43 additions & 0 deletions tests/NATS.Client.ObjectStore.Tests/ObjectStoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,49 @@ public async Task List()
}
}

[Fact]
public async Task List_empty_store_for_end_of_data()
{
var timeout = TimeSpan.FromSeconds(10);
var cts = new CancellationTokenSource(timeout);
var cancellationToken = cts.Token;

await using var server = NatsServer.StartJS();
await using var nats = server.CreateClientConnection();
var js = new NatsJSContext(nats);
var obj = new NatsObjContext(js);

var store = await obj.CreateObjectStoreAsync(new NatsObjConfig("b1"), cancellationToken);

var signal = new WaitSignal(timeout);
var endOfDataHit = false;
var watchTask = Task.Run(
async () =>
{
var opts = new NatsObjListOpts()
{
OnNoData = async (_) =>
{
await Task.CompletedTask;
endOfDataHit = true;
signal.Pulse();
return true;
},
};
await foreach (var info in store.ListAsync(opts: opts, cancellationToken: cancellationToken))
{
}
},
cancellationToken);

await signal;

Assert.True(endOfDataHit, "End of Current Data not set");

await watchTask;
}

[SkipIfNatsServer(versionEarlierThan: "2.10")]
public async Task Compressed_storage()
{
Expand Down

0 comments on commit a8daa4b

Please sign in to comment.