From a916b7f7ea90b7cea4aa474ec3a152db3c5712be Mon Sep 17 00:00:00 2001 From: Ziya Suzen Date: Wed, 8 Nov 2023 12:18:43 +0000 Subject: [PATCH] Key Value store interfaces --- .../INatsKVContext.cs | 38 ++++++ src/NATS.Client.KeyValueStore/INatsKVStore.cs | 124 ++++++++++++++++++ .../NatsKVContext.cs | 8 +- src/NATS.Client.KeyValueStore/NatsKVStore.cs | 2 +- .../NatsSubTests.cs | 1 - .../NatsKVWatcherTest.cs | 4 +- 6 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 src/NATS.Client.KeyValueStore/INatsKVContext.cs create mode 100644 src/NATS.Client.KeyValueStore/INatsKVStore.cs diff --git a/src/NATS.Client.KeyValueStore/INatsKVContext.cs b/src/NATS.Client.KeyValueStore/INatsKVContext.cs new file mode 100644 index 000000000..2a41406d6 --- /dev/null +++ b/src/NATS.Client.KeyValueStore/INatsKVContext.cs @@ -0,0 +1,38 @@ +namespace NATS.Client.KeyValueStore; + +public interface INatsKVContext +{ + /// + /// Create a new Key Value Store or get an existing one + /// + /// Name of the bucket + /// A used to cancel the API call. + /// Key Value Store + ValueTask CreateStoreAsync(string bucket, CancellationToken cancellationToken = default); + + /// + /// Create a new Key Value Store or get an existing one + /// + /// Key Value Store configuration + /// A used to cancel the API call. + /// Key Value Store + /// There was an issue with configuration + ValueTask CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default); + + /// + /// Get a Key Value Store + /// + /// Name of the bucjet + /// A used to cancel the API call. + /// Key Value Store + /// There was an issue with configuration + ValueTask GetStoreAsync(string bucket, CancellationToken cancellationToken = default); + + /// + /// Delete a Key Value Store + /// + /// Name of the bucket + /// A used to cancel the API call. + /// True for success + ValueTask DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default); +} diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs new file mode 100644 index 000000000..dbe45c86a --- /dev/null +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -0,0 +1,124 @@ +using System.Runtime.CompilerServices; +using NATS.Client.Core; + +namespace NATS.Client.KeyValueStore; + +public interface INatsKVStore +{ + /// + /// Name of the Key Value Store bucket + /// + string Bucket { get; } + + /// + /// Put a value into the bucket using the key + /// + /// Key of the entry + /// Value of the entry + /// A used to cancel the API call. + /// Serialized value type + ValueTask PutAsync(string key, T value, CancellationToken cancellationToken = default); + + /// + /// Create a new entry in the bucket only if it doesn't exist + /// + /// Key of the entry + /// Value of the entry + /// A used to cancel the API call. + /// Serialized value type + /// The revision number of the entry + ValueTask CreateAsync(string key, T value, CancellationToken cancellationToken = default); + + /// + /// Update an entry in the bucket only if last update revision matches + /// + /// Key of the entry + /// Value of the entry + /// Last revision number to match + /// A used to cancel the API call. + /// Serialized value type + /// The revision number of the updated entry + ValueTask UpdateAsync(string key, T value, ulong revision, CancellationToken cancellationToken = default); + + /// + /// Delete an entry from the bucket + /// + /// Key of the entry + /// Delete options + /// A used to cancel the API call. + ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Purge an entry from the bucket + /// + /// Key of the entry + /// Delete options + /// A used to cancel the API call. + ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Get an entry from the bucket using the key + /// + /// Key of the entry + /// Revision to retrieve + /// Optional serialized to override the default + /// A used to cancel the API call. + /// Serialized value type + /// The entry + /// There was an error with metadata + ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + + /// + /// Start a watcher for specific keys + /// + /// Key to watch (subject-based wildcards may be used) + /// Serializer to use for the message type. + /// Watch options + /// A used to cancel the API call. + /// Serialized value type + /// An asynchronous enumerable which can be used in await foreach loops + IAsyncEnumerable> WatchAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Start a watcher for all the keys in the bucket + /// + /// Serializer to use for the message type. + /// Watch options + /// A used to cancel the API call. + /// Serialized value type + /// An asynchronous enumerable which can be used in await foreach loops + IAsyncEnumerable> WatchAsync(INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Get the history of an entry by key + /// + /// Key of the entry + /// Serializer to use for the message type. + /// Watch options + /// A used to cancel the API call. + /// Serialized value type + /// An async enumerable of entries to be used in an await foreach + IAsyncEnumerable> HistoryAsync(string key, INatsDeserialize? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Get the bucket status + /// + /// A used to cancel the API call. + /// The Key/Value store status + ValueTask GetStatusAsync(CancellationToken cancellationToken = default); + + /// + /// Purge all deleted entries + /// + /// Purge options + /// A used to cancel the API call. + ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, CancellationToken cancellationToken = default); + + /// + /// Get all the keys in the bucket + /// + /// Watch options + /// A used to cancel the API call. + /// An async enumerable of keys to be used in an await foreach + IAsyncEnumerable GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default); +} diff --git a/src/NATS.Client.KeyValueStore/NatsKVContext.cs b/src/NATS.Client.KeyValueStore/NatsKVContext.cs index d51765d31..57c1e2fd9 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVContext.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVContext.cs @@ -17,7 +17,7 @@ public enum NatsKVStorageType /// /// Key Value Store context /// -public class NatsKVContext +public class NatsKVContext : INatsKVContext { private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled); private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); @@ -36,7 +36,7 @@ public class NatsKVContext /// Name of the bucket /// A used to cancel the API call. /// Key Value Store - public ValueTask CreateStoreAsync(string bucket, CancellationToken cancellationToken = default) + public ValueTask CreateStoreAsync(string bucket, CancellationToken cancellationToken = default) => CreateStoreAsync(new NatsKVConfig(bucket), cancellationToken); /// @@ -46,7 +46,7 @@ public ValueTask CreateStoreAsync(string bucket, CancellationToken /// A used to cancel the API call. /// Key Value Store /// There was an issue with configuration - public async ValueTask CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) + public async ValueTask CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default) { ValidateBucketName(config.Bucket); @@ -122,7 +122,7 @@ public async ValueTask CreateStoreAsync(NatsKVConfig config, Cancel /// A used to cancel the API call. /// Key Value Store /// There was an issue with configuration - public async ValueTask GetStoreAsync(string bucket, CancellationToken cancellationToken = default) + public async ValueTask GetStoreAsync(string bucket, CancellationToken cancellationToken = default) { ValidateBucketName(bucket); diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index 58b24e653..78ba5ebe5 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -31,7 +31,7 @@ public enum NatsKVOperation /// /// Key Value Store /// -public class NatsKVStore +public class NatsKVStore : INatsKVStore { private const string NatsExpectedLastSubjectSequence = "Nats-Expected-Last-Subject-Sequence"; private const string KVOperation = "KV-Operation"; diff --git a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs index 01773fed7..298175065 100644 --- a/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs +++ b/tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs @@ -62,7 +62,6 @@ public void Subscription_should_not_be_collected_when_in_async_enumerator() } }); - var pub = Task.Run(async () => { while (Volatile.Read(ref sync) == 0) diff --git a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs index 2da5a1916..1763bd953 100644 --- a/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs +++ b/tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs @@ -30,7 +30,7 @@ public async Task Watcher_reconnect_with_history() var (nats2, proxy) = server.CreateProxiedClientConnection(); var js2 = new NatsJSContext(nats2); var kv2 = new NatsKVContext(js2); - var store2 = await kv2.CreateStoreAsync(config, cancellationToken: cancellationToken); + var store2 = (NatsKVStore)await kv2.CreateStoreAsync(config, cancellationToken: cancellationToken); var watcher = await store2.WatchInternalAsync>("k1.*", cancellationToken: cancellationToken); await store1.PutAsync("k1.p1", 1, cancellationToken); @@ -155,7 +155,7 @@ public async Task Watcher_timeout_reconnect() var (nats2, proxy) = server.CreateProxiedClientConnection(); var js2 = new NatsJSContext(nats2); var kv2 = new NatsKVContext(js2); - var store2 = await kv2.CreateStoreAsync(bucket, cancellationToken: cancellationToken); + var store2 = (NatsKVStore)await kv2.CreateStoreAsync(bucket, cancellationToken: cancellationToken); var watcher = await store2.WatchInternalAsync>("k1.*", cancellationToken: cancellationToken); // Swallow heartbeats