From fd7cfdc946cb4bb86c19a8e56f2c0959f3cd47e9 Mon Sep 17 00:00:00 2001 From: Arkadiusz Palinski Date: Thu, 21 Dec 2023 08:49:29 +0100 Subject: [PATCH] RavenDB-21852 Fixing ArgumentOutOfRangeException throw by GetOngoingTaskInfoOperation() for OngoingTaskType.QueueSink --- .../GetOngoingTaskInfoOperation.cs | 5 ++- .../JsonDeserializationClient.cs | 2 + .../Documents/OngoingTasks/OngoingTasks.cs | 2 +- .../Documents/QueueSink/QueueSinkTestBase.cs | 34 +++++++++++++++++ .../Documents/QueueSink/RabbitMqSinkTests.cs | 33 ---------------- .../Documents/QueueSink/RavenDB_21852.cs | 38 +++++++++++++++++++ 6 files changed, 79 insertions(+), 35 deletions(-) create mode 100644 test/SlowTests/Server/Documents/QueueSink/RavenDB_21852.cs diff --git a/src/Raven.Client/Documents/Operations/OngoingTasks/GetOngoingTaskInfoOperation.cs b/src/Raven.Client/Documents/Operations/OngoingTasks/GetOngoingTaskInfoOperation.cs index d29e0d906d36..81fa70817700 100644 --- a/src/Raven.Client/Documents/Operations/OngoingTasks/GetOngoingTaskInfoOperation.cs +++ b/src/Raven.Client/Documents/Operations/OngoingTasks/GetOngoingTaskInfoOperation.cs @@ -107,8 +107,11 @@ public override void SetResponse(JsonOperationContext context, BlittableJsonRead case OngoingTaskType.PullReplicationAsHub: Result = JsonDeserializationClient.OngoingTaskPullReplicationAsHubResult(response); break; + case OngoingTaskType.QueueSink: + Result = JsonDeserializationClient.GetOngoingTaskQueueSinkResult(response); + break; default: - throw new ArgumentOutOfRangeException(); + throw new ArgumentOutOfRangeException(nameof(_type), _type, "Unknown task type"); } } } diff --git a/src/Raven.Client/Json/Serialization/JsonDeserializationClient.cs b/src/Raven.Client/Json/Serialization/JsonDeserializationClient.cs index 9fa7cc2de729..77c841543a29 100644 --- a/src/Raven.Client/Json/Serialization/JsonDeserializationClient.cs +++ b/src/Raven.Client/Json/Serialization/JsonDeserializationClient.cs @@ -310,5 +310,7 @@ internal sealed class JsonDeserializationClient : JsonDeserializationBase public static readonly Func OlapConnectionString = GenerateJsonDeserializationRoutine(); + public static readonly Func GetOngoingTaskQueueSinkResult = GenerateJsonDeserializationRoutine(); + } } diff --git a/src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs b/src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs index 4ab93b76f3c7..c29dc867165f 100644 --- a/src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs +++ b/src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs @@ -114,7 +114,7 @@ protected override OngoingTaskConnectionStatus GetQueueSinkTaskConnectionStatus( if (tag == _server.NodeTag) { - var process = _database.EtlLoader.Processes.FirstOrDefault(x => x.ConfigurationName == config.Name); + var process = _database.QueueSinkLoader.Processes.FirstOrDefault(x => x.Configuration.Name == config.Name); if (process != null) connectionStatus = process.GetConnectionStatus(); diff --git a/test/SlowTests/Server/Documents/QueueSink/QueueSinkTestBase.cs b/test/SlowTests/Server/Documents/QueueSink/QueueSinkTestBase.cs index c5bc7b885412..2817a994c568 100644 --- a/test/SlowTests/Server/Documents/QueueSink/QueueSinkTestBase.cs +++ b/test/SlowTests/Server/Documents/QueueSink/QueueSinkTestBase.cs @@ -13,6 +13,7 @@ using Raven.Server.NotificationCenter.Notifications; using Raven.Server.NotificationCenter.Notifications.Details; using Sparrow.Json; +using Tests.Infrastructure.ConnectionString; using Xunit; using Xunit.Abstractions; using QueueSinkConfiguration = Raven.Client.Documents.Operations.QueueSink.QueueSinkConfiguration; @@ -127,5 +128,38 @@ protected class User public string FullName { get; set; } } + + protected QueueSinkConfiguration SetupRabbitMqQueueSink(DocumentStore store, string script, List queues, + string configurationName = null, string transformationName = null) + { + var connectionStringName = $"RabbitMQ to {store.Database}"; + + QueueSinkScript queueSinkScript = new QueueSinkScript + { + Name = transformationName ?? $"Queue Sink : {connectionStringName}", + Queues = new List(queues), + Script = script, + }; + var config = new QueueSinkConfiguration + { + Name = configurationName ?? connectionStringName, + ConnectionStringName = connectionStringName, + Scripts = { queueSinkScript }, + BrokerType = QueueBrokerType.RabbitMq + }; + + AddQueueSink(store, config, + new QueueConnectionString + { + Name = connectionStringName, + BrokerType = QueueBrokerType.RabbitMq, + RabbitMqConnectionSettings = new RabbitMqConnectionSettings + { + ConnectionString = RabbitMqConnectionString.Instance.VerifiedConnectionString.Value + } + }); + + return config; + } } } diff --git a/test/SlowTests/Server/Documents/QueueSink/RabbitMqSinkTests.cs b/test/SlowTests/Server/Documents/QueueSink/RabbitMqSinkTests.cs index 4fdd824c250e..298c883d0c8d 100644 --- a/test/SlowTests/Server/Documents/QueueSink/RabbitMqSinkTests.cs +++ b/test/SlowTests/Server/Documents/QueueSink/RabbitMqSinkTests.cs @@ -242,39 +242,6 @@ public void Error_if_script_is_empty() Assert.Equal("Script 'test' must not be empty", errors[0]); } - private QueueSinkConfiguration SetupRabbitMqQueueSink(DocumentStore store, string script, List queues, - string configurationName = null, string transformationName = null) - { - var connectionStringName = $"{store.Database} to Kafka"; - - QueueSinkScript queueSinkScript = new QueueSinkScript - { - Name = transformationName ?? $"Queue Sink : {connectionStringName}", - Queues = new List(queues), - Script = script, - }; - var config = new QueueSinkConfiguration - { - Name = configurationName ?? connectionStringName, - ConnectionStringName = connectionStringName, - Scripts = { queueSinkScript }, - BrokerType = QueueBrokerType.RabbitMq - }; - - AddQueueSink(store, config, - new QueueConnectionString - { - Name = connectionStringName, - BrokerType = QueueBrokerType.RabbitMq, - RabbitMqConnectionSettings = new RabbitMqConnectionSettings - { - ConnectionString = RabbitMqConnectionString.Instance.VerifiedConnectionString.Value - } - }); - - return config; - } - private IModel CreateRabbitMqProducer() { var connectionFactory = new ConnectionFactory { Uri = new Uri(RabbitMqConnectionString.Instance.VerifiedConnectionString.Value) }; diff --git a/test/SlowTests/Server/Documents/QueueSink/RavenDB_21852.cs b/test/SlowTests/Server/Documents/QueueSink/RavenDB_21852.cs new file mode 100644 index 000000000000..c8d1138102b1 --- /dev/null +++ b/test/SlowTests/Server/Documents/QueueSink/RavenDB_21852.cs @@ -0,0 +1,38 @@ +using System; +using System.Collections.Generic; +using Raven.Client.Documents.Operations.ETL.Queue; +using Raven.Client.Documents.Operations.OngoingTasks; +using Tests.Infrastructure; +using Xunit; +using Xunit.Abstractions; + +namespace SlowTests.Server.Documents.QueueSink; + +public class RavenDB_21852 : QueueSinkTestBase +{ + public RavenDB_21852(ITestOutputHelper output) : base(output) + { + } + + [RavenFact(RavenTestCategory.ClientApi)] + public void CanGetQueueSinkTaskInfo() + { + using var store = GetDocumentStore(); + var config = SetupRabbitMqQueueSink(store, "this['@metadata']['@collection'] = 'Users'; put(this.Id, this)", + new List() {UsersQueueName}); + + var op = new GetOngoingTaskInfoOperation(config.Name, OngoingTaskType.QueueSink); + + var taskInfo = (OngoingTaskQueueSink)store.Maintenance.Send(op); + + Assert.NotNull(taskInfo); + Assert.Null(taskInfo.Error); + Assert.Equal(QueueBrokerType.RabbitMq, taskInfo.BrokerType); + + var nonExisting = new GetOngoingTaskInfoOperation("non-existing", OngoingTaskType.QueueSink); + + var nullTaskInfo = (OngoingTaskQueueSink)store.Maintenance.Send(nonExisting); + + Assert.Null(nullTaskInfo); + } +}