Skip to content

Commit

Permalink
Check the logic status before raising the event (#385)
Browse files Browse the repository at this point in the history
* Fixes: #384
  Check the logic status before raising the event
* Add RPC timeout configuration. 

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Jun 10, 2024
1 parent dbfcdc9 commit 14b6e2d
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 11 deletions.
26 changes: 17 additions & 9 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public record ClientParameters
public IDictionary<string, string> Properties { get; } =
new Dictionary<string, string>
{
{"product", "RabbitMQ Stream"},
{"version", Version.VersionString},
{"platform", ".NET"},
{ "product", "RabbitMQ Stream" },
{ "version", Version.VersionString },
{ "platform", ".NET" },
{
"copyright",
"Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries."
Expand All @@ -48,7 +48,7 @@ public record ClientParameters
"information",
"Licensed under the Apache 2.0 and MPL 2.0 licenses. See https://www.rabbitmq.com/"
},
{"connection_name", "Unknown"}
{ "connection_name", "Unknown" }
};

public string UserName { get; set; } = "guest";
Expand Down Expand Up @@ -77,6 +77,8 @@ public string ClientProvidedName

public AuthMechanism AuthMechanism { get; set; } = AuthMechanism.Plain;

public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);

internal void FireMetadataUpdate(MetaDataUpdate metaDataUpdate)
{
OnMetadataUpdate?.Invoke(metaDataUpdate);
Expand Down Expand Up @@ -113,8 +115,6 @@ public class Client : IClient
{
private bool isClosed = true;

private readonly TimeSpan defaultTimeout = TimeSpan.FromSeconds(10);

private uint correlationId = 0; // allow for some pre-amble

private Connection _connection;
Expand Down Expand Up @@ -150,7 +150,6 @@ public class Client : IClient

public int IncomingFrames => _connection.NumFrames;

//public int IncomingChannelCount => this.incoming.Reader.Count;
private static readonly object Obj = new();

private readonly ILogger _logger;
Expand Down Expand Up @@ -494,7 +493,7 @@ private async ValueTask<TOut> Request<TIn, TOut>(Func<uint, TIn> request, TimeSp
var tcs = PooledTaskSource<TOut>.Rent();
requests.TryAdd(corr, tcs);
await Publish(request(corr)).ConfigureAwait(false);
using var cts = new CancellationTokenSource(timeout ?? defaultTimeout);
using var cts = new CancellationTokenSource(timeout ?? Parameters.RpcTimeOut);
await using (cts.Token.Register(
valueTaskSource =>
((ManualResetValueTaskSource<TOut>)valueTaskSource).SetException(
Expand Down Expand Up @@ -973,7 +972,16 @@ public bool RunContinuationsAsynchronously
public short Version => _logic.Version;
public void Reset() => _logic.Reset();
public void SetResult(T result) => _logic.SetResult(result);
public void SetException(Exception error) => _logic.SetException(error);

public void SetException(Exception error)
{
// https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/issues/384
// we need to check if the task is pending before setting the exception
if (_logic.GetStatus(_logic.Version) == ValueTaskSourceStatus.Pending)
{
_logic.SetException(error);
}
}

void IValueTaskSource.GetResult(short token) => _logic.GetResult(token);
T IValueTaskSource<T>.GetResult(short token) => _logic.GetResult(token);
Expand Down
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Cli
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
RabbitMQ.Stream.Client.ClientParameters.OnMetadataUpdate -> RabbitMQ.Stream.Client.ClientParameters.MetadataUpdateHandler
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.ClientParameters.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.Connection.UpdateCloseStatus(string reason) -> void
RabbitMQ.Stream.Client.ConnectionItem
RabbitMQ.Stream.Client.ConnectionItem.Available.get -> bool
Expand Down Expand Up @@ -324,6 +326,8 @@ RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.C
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.get -> RabbitMQ.Stream.Client.ConnectionPoolConfig
RabbitMQ.Stream.Client.StreamSystemConfig.ConnectionPoolConfig.set -> void
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.get -> System.TimeSpan
RabbitMQ.Stream.Client.StreamSystemConfig.RpcTimeOut.set -> void
RabbitMQ.Stream.Client.SuperStreamSpec
RabbitMQ.Stream.Client.SuperStreamSpec.Args.get -> System.Collections.Generic.IDictionary<string, string>
RabbitMQ.Stream.Client.SuperStreamSpec.LeaderLocator.set -> void
Expand Down
15 changes: 14 additions & 1 deletion RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ internal void Validate()
{
throw new ArgumentException("ConnectionPoolConfig can't be null");
}

if (RpcTimeOut < TimeSpan.FromSeconds(1))
{
throw new ArgumentException("RpcTimeOut must be at least 1 second");
}
}

public string UserName { get; set; } = "guest";
Expand All @@ -44,6 +49,13 @@ internal void Validate()
/// Configure the connection pool for producers and consumers.
/// </summary>
public ConnectionPoolConfig ConnectionPoolConfig { get; set; } = new();

/// <summary>
/// The timeout for RPC calls, like PeerProperties, QueryMetadata, etc.
/// Default value is 10 seconds and in most cases it should be enough.
/// Low value can cause false errors in the client.
/// </summary>
public TimeSpan RpcTimeOut { get; set; } = TimeSpan.FromSeconds(10);
}

public class StreamSystem
Expand Down Expand Up @@ -85,7 +97,8 @@ public static async Task<StreamSystem> Create(StreamSystemConfig config, ILogger
ClientProvidedName = config.ClientProvidedName,
Heartbeat = config.Heartbeat,
Endpoints = config.Endpoints,
AuthMechanism = config.AuthMechanism
AuthMechanism = config.AuthMechanism,
RpcTimeOut = config.RpcTimeOut
};
// create the metadata client connection
foreach (var endPoint in clientParams.Endpoints)
Expand Down
9 changes: 9 additions & 0 deletions Tests/SystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,15 @@ await Assert.ThrowsAsync<AuthMechanismNotSupportedException>(
);
}

[Fact]
public async void ValidateRpCtimeOut()
{
var config = new StreamSystemConfig() { RpcTimeOut = TimeSpan.FromMilliseconds(1) };
await Assert.ThrowsAsync<ArgumentException>(
async () => { await StreamSystem.Create(config); }
);
}

[Fact]
public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError()
{
Expand Down
2 changes: 1 addition & 1 deletion kubernetes/stream_cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ metadata:
namespace: stream-clients-test
spec:
replicas: 3
image: rabbitmq:3.13-rc-management
image: rabbitmq:3.13-management
service:
type: LoadBalancer
# tls:
Expand Down

0 comments on commit 14b6e2d

Please sign in to comment.