Skip to content

Commit

Permalink
RavenDB-21852 Fixing ArgumentOutOfRangeException throw by GetOngoingT…
Browse files Browse the repository at this point in the history
…askInfoOperation() for OngoingTaskType.QueueSink
  • Loading branch information
arekpalinski committed Dec 21, 2023
1 parent 2d246d4 commit fd7cfdc
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,11 @@ public override void SetResponse(JsonOperationContext context, BlittableJsonRead
case OngoingTaskType.PullReplicationAsHub:
Result = JsonDeserializationClient.OngoingTaskPullReplicationAsHubResult(response);
break;
case OngoingTaskType.QueueSink:
Result = JsonDeserializationClient.GetOngoingTaskQueueSinkResult(response);
break;
default:
throw new ArgumentOutOfRangeException();
throw new ArgumentOutOfRangeException(nameof(_type), _type, "Unknown task type");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,5 +310,7 @@ internal sealed class JsonDeserializationClient : JsonDeserializationBase

public static readonly Func<BlittableJsonReaderObject, OlapConnectionString> OlapConnectionString = GenerateJsonDeserializationRoutine<OlapConnectionString>();

public static readonly Func<BlittableJsonReaderObject, OngoingTaskQueueSink> GetOngoingTaskQueueSinkResult = GenerateJsonDeserializationRoutine<OngoingTaskQueueSink>();

}
}
2 changes: 1 addition & 1 deletion src/Raven.Server/Documents/OngoingTasks/OngoingTasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ protected override OngoingTaskConnectionStatus GetQueueSinkTaskConnectionStatus(

if (tag == _server.NodeTag)
{
var process = _database.EtlLoader.Processes.FirstOrDefault(x => x.ConfigurationName == config.Name);
var process = _database.QueueSinkLoader.Processes.FirstOrDefault(x => x.Configuration.Name == config.Name);

if (process != null)
connectionStatus = process.GetConnectionStatus();
Expand Down
34 changes: 34 additions & 0 deletions test/SlowTests/Server/Documents/QueueSink/QueueSinkTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Raven.Server.NotificationCenter.Notifications;
using Raven.Server.NotificationCenter.Notifications.Details;
using Sparrow.Json;
using Tests.Infrastructure.ConnectionString;
using Xunit;
using Xunit.Abstractions;
using QueueSinkConfiguration = Raven.Client.Documents.Operations.QueueSink.QueueSinkConfiguration;
Expand Down Expand Up @@ -127,5 +128,38 @@ protected class User

public string FullName { get; set; }
}

protected QueueSinkConfiguration SetupRabbitMqQueueSink(DocumentStore store, string script, List<string> queues,
string configurationName = null, string transformationName = null)
{
var connectionStringName = $"RabbitMQ to {store.Database}";

QueueSinkScript queueSinkScript = new QueueSinkScript
{
Name = transformationName ?? $"Queue Sink : {connectionStringName}",
Queues = new List<string>(queues),
Script = script,
};
var config = new QueueSinkConfiguration
{
Name = configurationName ?? connectionStringName,
ConnectionStringName = connectionStringName,
Scripts = { queueSinkScript },
BrokerType = QueueBrokerType.RabbitMq
};

AddQueueSink(store, config,
new QueueConnectionString
{
Name = connectionStringName,
BrokerType = QueueBrokerType.RabbitMq,
RabbitMqConnectionSettings = new RabbitMqConnectionSettings
{
ConnectionString = RabbitMqConnectionString.Instance.VerifiedConnectionString.Value
}
});

return config;
}
}
}
33 changes: 0 additions & 33 deletions test/SlowTests/Server/Documents/QueueSink/RabbitMqSinkTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,39 +242,6 @@ public void Error_if_script_is_empty()
Assert.Equal("Script 'test' must not be empty", errors[0]);
}

private QueueSinkConfiguration SetupRabbitMqQueueSink(DocumentStore store, string script, List<string> queues,
string configurationName = null, string transformationName = null)
{
var connectionStringName = $"{store.Database} to Kafka";

QueueSinkScript queueSinkScript = new QueueSinkScript
{
Name = transformationName ?? $"Queue Sink : {connectionStringName}",
Queues = new List<string>(queues),
Script = script,
};
var config = new QueueSinkConfiguration
{
Name = configurationName ?? connectionStringName,
ConnectionStringName = connectionStringName,
Scripts = { queueSinkScript },
BrokerType = QueueBrokerType.RabbitMq
};

AddQueueSink(store, config,
new QueueConnectionString
{
Name = connectionStringName,
BrokerType = QueueBrokerType.RabbitMq,
RabbitMqConnectionSettings = new RabbitMqConnectionSettings
{
ConnectionString = RabbitMqConnectionString.Instance.VerifiedConnectionString.Value
}
});

return config;
}

private IModel CreateRabbitMqProducer()
{
var connectionFactory = new ConnectionFactory { Uri = new Uri(RabbitMqConnectionString.Instance.VerifiedConnectionString.Value) };
Expand Down
38 changes: 38 additions & 0 deletions test/SlowTests/Server/Documents/QueueSink/RavenDB_21852.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
using System;
using System.Collections.Generic;
using Raven.Client.Documents.Operations.ETL.Queue;
using Raven.Client.Documents.Operations.OngoingTasks;
using Tests.Infrastructure;
using Xunit;
using Xunit.Abstractions;

namespace SlowTests.Server.Documents.QueueSink;

public class RavenDB_21852 : QueueSinkTestBase
{
public RavenDB_21852(ITestOutputHelper output) : base(output)
{
}

[RavenFact(RavenTestCategory.ClientApi)]
public void CanGetQueueSinkTaskInfo()
{
using var store = GetDocumentStore();
var config = SetupRabbitMqQueueSink(store, "this['@metadata']['@collection'] = 'Users'; put(this.Id, this)",
new List<string>() {UsersQueueName});

var op = new GetOngoingTaskInfoOperation(config.Name, OngoingTaskType.QueueSink);

var taskInfo = (OngoingTaskQueueSink)store.Maintenance.Send(op);

Assert.NotNull(taskInfo);
Assert.Null(taskInfo.Error);
Assert.Equal(QueueBrokerType.RabbitMq, taskInfo.BrokerType);

var nonExisting = new GetOngoingTaskInfoOperation("non-existing", OngoingTaskType.QueueSink);

var nullTaskInfo = (OngoingTaskQueueSink)store.Maintenance.Send(nonExisting);

Assert.Null(nullTaskInfo);
}
}

0 comments on commit fd7cfdc

Please sign in to comment.