Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key Value store interfaces #194

Merged
merged 1 commit into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVContext.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace NATS.Client.KeyValueStore;

public interface INatsKVContext
{
/// <summary>
/// Create a new Key Value Store or get an existing one
/// </summary>
/// <param name="bucket">Name of the bucket</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
ValueTask<INatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new Key Value Store or get an existing one
/// </summary>
/// <param name="config">Key Value Store configuration</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
/// <exception cref="NatsKVException">There was an issue with configuration</exception>
ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default);

/// <summary>
/// Get a Key Value Store
/// </summary>
/// <param name="bucket">Name of the bucjet</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
/// <exception cref="NatsKVException">There was an issue with configuration</exception>
ValueTask<INatsKVStore> GetStoreAsync(string bucket, CancellationToken cancellationToken = default);

/// <summary>
/// Delete a Key Value Store
/// </summary>
/// <param name="bucket">Name of the bucket</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>True for success</returns>
ValueTask<bool> DeleteStoreAsync(string bucket, CancellationToken cancellationToken = default);
}
124 changes: 124 additions & 0 deletions src/NATS.Client.KeyValueStore/INatsKVStore.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
using System.Runtime.CompilerServices;
using NATS.Client.Core;

namespace NATS.Client.KeyValueStore;

public interface INatsKVStore
{
/// <summary>
/// Name of the Key Value Store bucket
/// </summary>
string Bucket { get; }

/// <summary>
/// Put a value into the bucket using the key
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
ValueTask<ulong> PutAsync<T>(string key, T value, CancellationToken cancellationToken = default);

/// <summary>
/// Create a new entry in the bucket only if it doesn't exist
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the entry</returns>
ValueTask<ulong> CreateAsync<T>(string key, T value, CancellationToken cancellationToken = default);

/// <summary>
/// Update an entry in the bucket only if last update revision matches
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="value">Value of the entry</param>
/// <param name="revision">Last revision number to match</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The revision number of the updated entry</returns>
ValueTask<ulong> UpdateAsync<T>(string key, T value, ulong revision, CancellationToken cancellationToken = default);

/// <summary>
/// Delete an entry from the bucket
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="opts">Delete options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
ValueTask DeleteAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Purge an entry from the bucket
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="opts">Delete options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get an entry from the bucket using the key
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="revision">Revision to retrieve</param>
/// <param name="serializer">Optional serialized to override the default</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>The entry</returns>
/// <exception cref="NatsKVException">There was an error with metadata</exception>
ValueTask<NatsKVEntry<T>> GetEntryAsync<T>(string key, ulong revision = default, INatsDeserialize<T>? serializer = default, CancellationToken cancellationToken = default);

/// <summary>
/// Start a watcher for specific keys
/// </summary>
/// <param name="key">Key to watch (subject-based wildcards may be used)</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>An asynchronous enumerable which can be used in <c>await foreach</c> loops</returns>
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Start a watcher for all the keys in the bucket
/// </summary>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>An asynchronous enumerable which can be used in <c>await foreach</c> loops</returns>
IAsyncEnumerable<NatsKVEntry<T>> WatchAsync<T>(INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get the history of an entry by key
/// </summary>
/// <param name="key">Key of the entry</param>
/// <param name="serializer">Serializer to use for the message type.</param>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <typeparam name="T">Serialized value type</typeparam>
/// <returns>An async enumerable of entries to be used in an <c>await foreach</c></returns>
IAsyncEnumerable<NatsKVEntry<T>> HistoryAsync<T>(string key, INatsDeserialize<T>? serializer = default, NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get the bucket status
/// </summary>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>The Key/Value store status</returns>
ValueTask<NatsKVStatus> GetStatusAsync(CancellationToken cancellationToken = default);

/// <summary>
/// Purge all deleted entries
/// </summary>
/// <param name="opts">Purge options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
ValueTask PurgeDeletesAsync(NatsKVPurgeOpts? opts = default, CancellationToken cancellationToken = default);

/// <summary>
/// Get all the keys in the bucket
/// </summary>
/// <param name="opts">Watch options</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>An async enumerable of keys to be used in an <c>await foreach</c></returns>
IAsyncEnumerable<string> GetKeysAsync(NatsKVWatchOpts? opts = default, CancellationToken cancellationToken = default);
}
8 changes: 4 additions & 4 deletions src/NATS.Client.KeyValueStore/NatsKVContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public enum NatsKVStorageType
/// <summary>
/// Key Value Store context
/// </summary>
public class NatsKVContext
public class NatsKVContext : INatsKVContext
{
private static readonly Regex ValidBucketRegex = new(pattern: @"\A[a-zA-Z0-9_-]+\z", RegexOptions.Compiled);
private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled);
Expand All @@ -36,7 +36,7 @@ public class NatsKVContext
/// <param name="bucket">Name of the bucket</param>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
public ValueTask<NatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default)
public ValueTask<INatsKVStore> CreateStoreAsync(string bucket, CancellationToken cancellationToken = default)
=> CreateStoreAsync(new NatsKVConfig(bucket), cancellationToken);

/// <summary>
Expand All @@ -46,7 +46,7 @@ public ValueTask<NatsKVStore> CreateStoreAsync(string bucket, CancellationToken
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
/// <exception cref="NatsKVException">There was an issue with configuration</exception>
public async ValueTask<NatsKVStore> CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default)
public async ValueTask<INatsKVStore> CreateStoreAsync(NatsKVConfig config, CancellationToken cancellationToken = default)
{
ValidateBucketName(config.Bucket);

Expand Down Expand Up @@ -122,7 +122,7 @@ public async ValueTask<NatsKVStore> CreateStoreAsync(NatsKVConfig config, Cancel
/// <param name="cancellationToken">A <see cref="CancellationToken"/> used to cancel the API call.</param>
/// <returns>Key Value Store</returns>
/// <exception cref="NatsKVException">There was an issue with configuration</exception>
public async ValueTask<NatsKVStore> GetStoreAsync(string bucket, CancellationToken cancellationToken = default)
public async ValueTask<INatsKVStore> GetStoreAsync(string bucket, CancellationToken cancellationToken = default)
{
ValidateBucketName(bucket);

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.KeyValueStore/NatsKVStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public enum NatsKVOperation
/// <summary>
/// Key Value Store
/// </summary>
public class NatsKVStore
public class NatsKVStore : INatsKVStore
{
private const string NatsExpectedLastSubjectSequence = "Nats-Expected-Last-Subject-Sequence";
private const string KVOperation = "KV-Operation";
Expand Down
1 change: 0 additions & 1 deletion tests/NATS.Client.Core.MemoryTests/NatsSubTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public void Subscription_should_not_be_collected_when_in_async_enumerator()
}
});


var pub = Task.Run(async () =>
{
while (Volatile.Read(ref sync) == 0)
Expand Down
4 changes: 2 additions & 2 deletions tests/NATS.Client.KeyValueStore.Tests/NatsKVWatcherTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public async Task Watcher_reconnect_with_history()
var (nats2, proxy) = server.CreateProxiedClientConnection();
var js2 = new NatsJSContext(nats2);
var kv2 = new NatsKVContext(js2);
var store2 = await kv2.CreateStoreAsync(config, cancellationToken: cancellationToken);
var store2 = (NatsKVStore)await kv2.CreateStoreAsync(config, cancellationToken: cancellationToken);
var watcher = await store2.WatchInternalAsync<NatsMemoryOwner<byte>>("k1.*", cancellationToken: cancellationToken);

await store1.PutAsync("k1.p1", 1, cancellationToken);
Expand Down Expand Up @@ -155,7 +155,7 @@ public async Task Watcher_timeout_reconnect()
var (nats2, proxy) = server.CreateProxiedClientConnection();
var js2 = new NatsJSContext(nats2);
var kv2 = new NatsKVContext(js2);
var store2 = await kv2.CreateStoreAsync(bucket, cancellationToken: cancellationToken);
var store2 = (NatsKVStore)await kv2.CreateStoreAsync(bucket, cancellationToken: cancellationToken);
var watcher = await store2.WatchInternalAsync<NatsMemoryOwner<byte>>("k1.*", cancellationToken: cancellationToken);

// Swallow heartbeats
Expand Down