From 150d5a85087a74916fb2baf14ed1c2b7f354ac10 Mon Sep 17 00:00:00 2001 From: mtmk Date: Tue, 10 Dec 2024 08:24:33 +0000 Subject: [PATCH] Add KV try get (#688) * Add KV try get * Try-get implementation * Added perf tweaks * Format * Add bench * Add TryGetEntryAsync method to INatsKVStore interface * Remove obsolete GetAsyncNew method from KVBench.cs * Bench string * Tidy up --- sandbox/MicroBenchmark/KVBench.cs | 103 ++++++++++++++++++ sandbox/MicroBenchmark/MicroBenchmark.csproj | 3 +- src/NATS.Client.KeyValueStore/INatsKVStore.cs | 14 +++ .../NatsKVException.cs | 2 + src/NATS.Client.KeyValueStore/NatsKVStore.cs | 59 +++++++--- 5 files changed, 164 insertions(+), 17 deletions(-) create mode 100644 sandbox/MicroBenchmark/KVBench.cs diff --git a/sandbox/MicroBenchmark/KVBench.cs b/sandbox/MicroBenchmark/KVBench.cs new file mode 100644 index 000000000..60b981988 --- /dev/null +++ b/sandbox/MicroBenchmark/KVBench.cs @@ -0,0 +1,103 @@ +using BenchmarkDotNet.Attributes; +using NATS.Client.Core; +using NATS.Client.JetStream; +using NATS.Client.KeyValueStore; + +#pragma warning disable CS8618 + +namespace MicroBenchmark; + +[MemoryDiagnoser] +[PlainExporter] +public class KvBench +{ + private NatsConnection _nats; + private NatsJSContext _js; + private NatsKVContext _kv; + private NatsKVStore _store; + + [GlobalSetup] + public async Task SetupAsync() + { + _nats = new NatsConnection(); + _js = new NatsJSContext(_nats); + _kv = new NatsKVContext(_js); + _store = (NatsKVStore)(await _kv.CreateStoreAsync("benchmark")); + } + + [Benchmark] + public async ValueTask TryGetAsync() + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + { + return 1; + } + + return 0; + } + + [Benchmark] + public async ValueTask GetAsync() + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; + } + + return 0; + } + + [Benchmark] + public async ValueTask TryGetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + var result = await _store.TryGetEntryAsync("does.not.exist"); + if (result is { Success: false, Error: NatsKVKeyNotFoundException }) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } + + [Benchmark] + public async ValueTask GetMultiAsync() + { + List tasks = new(); + for (var i = 0; i < 100; i++) + { + tasks.Add(Task.Run(async () => + { + try + { + await _store.GetEntryAsync("does.not.exist"); + } + catch (NatsKVKeyNotFoundException) + { + return 1; + } + + return 0; + })); + } + + await Task.WhenAll(tasks); + + return 0; + } +} diff --git a/sandbox/MicroBenchmark/MicroBenchmark.csproj b/sandbox/MicroBenchmark/MicroBenchmark.csproj index 96d4079ab..133cafa6b 100644 --- a/sandbox/MicroBenchmark/MicroBenchmark.csproj +++ b/sandbox/MicroBenchmark/MicroBenchmark.csproj @@ -10,7 +10,7 @@ - + @@ -20,6 +20,7 @@ + diff --git a/src/NATS.Client.KeyValueStore/INatsKVStore.cs b/src/NATS.Client.KeyValueStore/INatsKVStore.cs index 1c99a0a3c..ecbf0d072 100644 --- a/src/NATS.Client.KeyValueStore/INatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/INatsKVStore.cs @@ -77,6 +77,20 @@ public interface INatsKVStore /// There was an error with metadata ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// + /// Try to get an entry from the bucket using the key. + /// + /// Key of the entry + /// Revision to retrieve + /// Optional serialized to override the default + /// A used to cancel the API call. + /// Serialized value type + /// A NatsResult object representing the value or an error. + /// + /// Use this method to avoid exceptions when, for example, the key is not found. + /// + ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default); + /// /// Start a watcher for specific keys /// diff --git a/src/NATS.Client.KeyValueStore/NatsKVException.cs b/src/NATS.Client.KeyValueStore/NatsKVException.cs index d5d8ad36e..e89c73622 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVException.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVException.cs @@ -54,6 +54,8 @@ public NatsKVCreateException() public class NatsKVKeyNotFoundException : NatsKVException { + public static readonly NatsKVKeyNotFoundException Default = new(); + public NatsKVKeyNotFoundException() : base("Key not found") { diff --git a/src/NATS.Client.KeyValueStore/NatsKVStore.cs b/src/NATS.Client.KeyValueStore/NatsKVStore.cs index cd2a32fc9..cd2717fc3 100644 --- a/src/NATS.Client.KeyValueStore/NatsKVStore.cs +++ b/src/NATS.Client.KeyValueStore/NatsKVStore.cs @@ -44,13 +44,23 @@ public class NatsKVStore : INatsKVStore private const string NatsSequence = "Nats-Sequence"; private const string NatsTimeStamp = "Nats-Time-Stamp"; private static readonly Regex ValidKeyRegex = new(pattern: @"\A[-/_=\.a-zA-Z0-9]+\z", RegexOptions.Compiled); + private static readonly NatsKVException MissingSequenceHeaderException = new("Missing sequence header"); + private static readonly NatsKVException MissingTimestampHeaderException = new("Missing timestamp header"); + private static readonly NatsKVException MissingHeadersException = new("Missing headers"); + private static readonly NatsKVException UnexpectedSubjectException = new("Unexpected subject"); + private static readonly NatsKVException UnexpectedNumberOfOperationHeadersException = new("Unexpected number of operation headers"); + private static readonly NatsKVException InvalidSequenceException = new("Can't parse sequence header"); + private static readonly NatsKVException InvalidTimestampException = new("Can't parse timestamp header"); + private static readonly NatsKVException InvalidOperationException = new("Can't parse operation header"); private readonly INatsJSStream _stream; + private readonly string _kvBucket; internal NatsKVStore(string bucket, INatsJSContext context, INatsJSStream stream) { Bucket = bucket; JetStreamContext = context; _stream = stream; + _kvBucket = $"$KV.{Bucket}."; } /// @@ -166,13 +176,27 @@ public ValueTask PurgeAsync(string key, NatsKVDeleteOpts? opts = default, Cancel /// public async ValueTask> GetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) + { + var result = await TryGetEntryAsync(key, revision, serializer, cancellationToken); + if (!result.Success) + { + ThrowException(result.Error); + } + + return result.Value; + } + + /// +#if !NETSTANDARD + [AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder<>))] +#endif + public async ValueTask>> TryGetEntryAsync(string key, ulong revision = default, INatsDeserialize? serializer = default, CancellationToken cancellationToken = default) { ValidateKey(key); serializer ??= JetStreamContext.Connection.Opts.SerializerRegistry.GetDeserializer(); + var keySubject = _kvBucket + key; var request = new StreamMsgGetRequest(); - var keySubject = $"$KV.{Bucket}.{key}"; - if (revision == default) { request.LastBySubj = keySubject; @@ -190,44 +214,44 @@ public async ValueTask> GetEntryAsync(string key, ulong revisi if (direct is { Headers: { } headers } msg) { if (headers.Code == 404) - throw new NatsKVKeyNotFoundException(); + return NatsKVKeyNotFoundException.Default; if (!headers.TryGetLastValue(NatsSubject, out var subject)) - throw new NatsKVException("Missing sequence header"); + return MissingSequenceHeaderException; if (revision != default) { if (!string.Equals(subject, keySubject, StringComparison.Ordinal)) { - throw new NatsKVException("Unexpected subject"); + return UnexpectedSubjectException; } } if (!headers.TryGetLastValue(NatsSequence, out var sequenceValue)) - throw new NatsKVException("Missing sequence header"); + return MissingSequenceHeaderException; if (!ulong.TryParse(sequenceValue, out var sequence)) - throw new NatsKVException("Can't parse sequence header"); + return InvalidSequenceException; if (!headers.TryGetLastValue(NatsTimeStamp, out var timestampValue)) - throw new NatsKVException("Missing timestamp header"); + return MissingTimestampHeaderException; if (!DateTimeOffset.TryParse(timestampValue, out var timestamp)) - throw new NatsKVException("Can't parse timestamp header"); + return InvalidTimestampException; var operation = NatsKVOperation.Put; if (headers.TryGetValue(KVOperation, out var operationValues)) { if (operationValues.Count != 1) - throw new NatsKVException("Unexpected number of operation headers"); + return UnexpectedNumberOfOperationHeadersException; if (!Enum.TryParse(operationValues[0], ignoreCase: true, out operation)) - throw new NatsKVException("Can't parse operation header"); + return InvalidOperationException; } if (operation is NatsKVOperation.Del or NatsKVOperation.Purge) { - throw new NatsKVKeyDeletedException(sequence); + return new NatsKVKeyDeletedException(sequence); } return new NatsKVEntry(Bucket, key) @@ -245,7 +269,7 @@ public async ValueTask> GetEntryAsync(string key, ulong revisi } else { - throw new NatsKVException("Missing headers"); + return MissingHeadersException; } } else @@ -256,7 +280,7 @@ public async ValueTask> GetEntryAsync(string key, ulong revisi { if (string.Equals(response.Message.Subject, keySubject, StringComparison.Ordinal)) { - throw new NatsKVException("Unexpected subject"); + return UnexpectedSubjectException; } } @@ -452,12 +476,12 @@ internal async ValueTask> WatchInternalAsync(IEnumerable private static void ValidateKey(string key) { - if (string.IsNullOrWhiteSpace(key)) + if (string.IsNullOrWhiteSpace(key) || key.Length == 0) { ThrowNatsKVException("Key cannot be empty"); } - if (key.StartsWith(".") || key.EndsWith(".")) + if (key[0] == '.' || key[^1] == '.') { ThrowNatsKVException("Key cannot start or end with a period"); } @@ -470,6 +494,9 @@ private static void ValidateKey(string key) [MethodImpl(MethodImplOptions.NoInlining)] private static void ThrowNatsKVException(string message) => throw new NatsKVException(message); + + [MethodImpl(MethodImplOptions.NoInlining)] + private static void ThrowException(Exception exception) => throw exception; } public record NatsKVStatus(string Bucket, bool IsCompressed, StreamInfo Info);