diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 93bf5861..80d64890 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -37,9 +37,9 @@ public record ClientParameters public IDictionary Properties { get; } = new Dictionary { - {"product", "RabbitMQ Stream"}, - {"version", Version.VersionString}, - {"platform", ".NET"}, + { "product", "RabbitMQ Stream" }, + { "version", Version.VersionString }, + { "platform", ".NET" }, { "copyright", "Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries." @@ -48,7 +48,7 @@ public record ClientParameters "information", "Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/" }, - {"connection_name", "Unknown"} + { "connection_name", "Unknown" } }; public string UserName { get; set; } = "guest"; @@ -77,6 +77,8 @@ public string ClientProvidedName public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain; + public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10); + internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate) { OnMetadataUpdate?.Invoke(metaDataUpdate); @@ -113,8 +115,6 @@ public class Client : IClient { private bool isClosed = true; - private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10); - private uint correlationId = 0; // allow for some pre-amble private Connection _connection; @@ -150,7 +150,6 @@ public class Client : IClient public int IncomingFrames => _connection.NumFrames; - //public int IncomingChannelCount => this.incoming.Reader.Count; private static readonly object Obj = new(); private readonly ILogger _logger; @@ -494,7 +493,7 @@ private async ValueTask Request(Func request, TimeSp var tcs = PooledTaskSource.Rent(); requests.TryAdd(corr, tcs); await Publish(request(corr)).ConfigureAwait(false); - using var cts = new CancellationTokenSource(timeout ?? defaultTimeout); + using var cts = new CancellationTokenSource(timeout ?? Parameters.RpcTimeOut); await using (cts.Token.Register( valueTaskSource => ((ManualResetValueTaskSource)valueTaskSource).SetException( @@ -973,7 +972,16 @@ public bool RunContinuationsAsynchronously public short Version => _logic.Version; public void Reset() => _logic.Reset(); public void SetResult(T result) => _logic.SetResult(result); - public void SetException(Exception error) => _logic.SetException(error); + + public void SetException(Exception error) + { + // https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/384 + // we need to check if the task is pending before setting the exception + if (_logic.GetStatus(_logic.Version) == ValueTaskSourceStatus.Pending) + { + _logic.SetException(error); + } + } void IValueTaskSource.GetResult(short token) => _logic.GetResult(token); T IValueTaskSource.GetResult(short token) => _logic.GetResult(token); diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index ca65ddd7..aeace579 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -54,6 +54,8 @@ RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Cli RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler +RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan +RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void RabbitMQ.Stream.Client.ConnectionItem RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool @@ -324,6 +326,8 @@ RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.C RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.set -> void +RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.get -> System.TimeSpan +RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.set -> void RabbitMQ.Stream.Client.SuperStreamSpec RabbitMQ.Stream.Client.SuperStreamSpec.Args.get -> System.Collections.Generic.IDictionary RabbitMQ.Stream.Client.SuperStreamSpec.LeaderLocator.set -> void diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index d6156558..cc7c8c8d 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -20,6 +20,11 @@ internal void Validate() { throw new ArgumentException("ConnectionPoolConfig can't be null"); } + + if (RpcTimeOut < TimeSpan.FromSeconds(1)) + { + throw new ArgumentException("RpcTimeOut must be at least 1 second"); + } } public string UserName { get; set; } = "guest"; @@ -44,6 +49,13 @@ internal void Validate() /// Configure the connection pool for producers and consumers. /// public ConnectionPoolConfig ConnectionPoolConfig { get; set; } = new(); + + /// + /// The timeout for RPC calls, like PeerProperties, QueryMetadata, etc. + /// Default value is 10 seconds and in most cases it should be enough. + /// Low value can cause false errors in the client. + /// + public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10); } public class StreamSystem @@ -85,7 +97,8 @@ public static async Task Create(StreamSystemConfig config, ILogger ClientProvidedName = config.ClientProvidedName, Heartbeat = config.Heartbeat, Endpoints = config.Endpoints, - AuthMechanism = config.AuthMechanism + AuthMechanism = config.AuthMechanism, + RpcTimeOut = config.RpcTimeOut }; // create the metadata client connection foreach (var endPoint in clientParams.Endpoints) diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index b22e6419..c3518ee1 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -274,6 +274,15 @@ await Assert.ThrowsAsync( ); } + [Fact] + public async void ValidateRpCtimeOut() + { + var config = new StreamSystemConfig() { RpcTimeOut = TimeSpan.FromMilliseconds(1) }; + await Assert.ThrowsAsync( + async () => { await StreamSystem.Create(config); } + ); + } + [Fact] public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError() { diff --git a/kubernetes/stream_cluster.yaml b/kubernetes/stream_cluster.yaml index 184e5e11..0d8c1a51 100644 --- a/kubernetes/stream_cluster.yaml +++ b/kubernetes/stream_cluster.yaml @@ -10,7 +10,7 @@ metadata: namespace: stream-clients-test spec: replicas: 3 - image: rabbitmq:3.13-rc-management + image: rabbitmq:3.13-management service: type: LoadBalancer # tls: