diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index 98111b5f..5d56e77a 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -22,7 +22,7 @@ public record SuperStreamConfig public record ProducerConfig : ReliableConfig { - private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(3); + private readonly TimeSpan _timeoutMessageAfter = TimeSpan.FromSeconds(10); /// /// Reference used for deduplication. diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs index a5a6bc7c..1b031783 100644 --- a/Tests/ClientTests.cs +++ b/Tests/ClientTests.cs @@ -332,7 +332,7 @@ await client.Publish(new Publish(publisherId, Assert.Equal(10, messageCount); await client.Unsubscribe(subId); - // await client.Close("done"); + await client.Close("done"); } [Fact] @@ -422,6 +422,6 @@ public async void ExchangeVersionCommandsShouldNotBeEmpty() Assert.Equal(ResponseCode.Ok, response.ResponseCode); Assert.True(response.Commands.Count > 0); await client.Close("done"); - } + } } } diff --git a/Tests/MultiThreadTests.cs b/Tests/MultiThreadTests.cs index c26f4a32..89246d7d 100644 --- a/Tests/MultiThreadTests.cs +++ b/Tests/MultiThreadTests.cs @@ -79,7 +79,8 @@ public async Task PublishMessagesInMultiThreads() Assert.Equal(TotalMessages * ThreadNumber, confirmed); Assert.Equal(TotalMessages * ThreadNumber, receivedTask.Task.Result); Assert.Equal(0, error); - await system.DeleteStream(stream); + await producer.Close().ConfigureAwait(false); + await SystemUtils.CleanUpStreamSystem(system, stream); } [Fact] @@ -128,7 +129,6 @@ public async Task CloseProducersConsumersInMultiThreads() SystemUtils.WaitUntil(() => consumers.TrueForAll(c => !c.IsOpen())); Assert.All(consumers, c => Assert.False(c.IsOpen())); - await system.DeleteStream(stream); - await system.Close(); + await SystemUtils.CleanUpStreamSystem(system, stream); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index a432287f..87412b49 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -155,6 +155,7 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() new ProducerConfig(system, stream) { ClientProvidedName = clientProvidedName, + TimeoutMessageAfter = TimeSpan.FromSeconds(3), ConfirmationHandler = _ => { if (Interlocked.Increment(ref count) == 10) diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index 7b465798..80aece6f 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -211,7 +211,7 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished var system = await StreamSystem.Create(new StreamSystemConfig()); var publishToSuperStreamTask = SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); - if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(10000)) != publishToSuperStreamTask) + if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(20000)) != publishToSuperStreamTask) { Assert.Fail("timeout waiting to publish messages"); } diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index bca7313e..da95a14c 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -319,6 +319,7 @@ public async void NumberOfPartitionsShouldBeAsDefinition() Assert.Contains(SystemUtils.InvoicesStream1, partitions); Assert.Contains(SystemUtils.InvoicesStream2, partitions); Assert.DoesNotContain(SystemUtils.InvoicesExchange, partitions); + await system.Close().ConfigureAwait(false); } } }