diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e3bfd19af..31f3a1f3f 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -222,8 +222,6 @@ jobs: path: ./Control/PartitionMetrics/src/Dockerfile - img : dockerhubaneo/armonik_control path: ./Control/Submitter/src/Dockerfile - - img : dockerhubaneo/armonik_control_dependency_checker - path: ./Control/DependencyChecker/src/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_worker path: ./Tests/Stream/Server/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_client @@ -271,8 +269,6 @@ jobs: path: ./Control/PartitionMetrics/src/Dockerfile - img : dockerhubaneo/armonik_control path: ./Control/Submitter/src/Dockerfile - - img : dockerhubaneo/armonik_control_dependency_checker - path: ./Control/DependencyChecker/src/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_worker path: ./Tests/Stream/Server/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_client diff --git a/.github/workflows/make-release.yml b/.github/workflows/make-release.yml index 19b8b05ec..208ca5b0c 100644 --- a/.github/workflows/make-release.yml +++ b/.github/workflows/make-release.yml @@ -78,8 +78,6 @@ jobs: path: ./Control/PartitionMetrics/src/Dockerfile - img : dockerhubaneo/armonik_control path: ./Control/Submitter/src/Dockerfile - - img : dockerhubaneo/armonik_control_dependency_checker - path: ./Control/DependencyChecker/src/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_worker path: ./Tests/Stream/Server/Dockerfile - img : dockerhubaneo/armonik_core_stream_test_client diff --git a/Adaptors/Memory/src/TaskTable.cs b/Adaptors/Memory/src/TaskTable.cs index 7777d5d11..761c0994c 100644 --- a/Adaptors/Memory/src/TaskTable.cs +++ b/Adaptors/Memory/src/TaskTable.cs @@ -417,6 +417,35 @@ public Task SetTaskSuccessAsync(string taskId, return Task.CompletedTask; } + public Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default) + { + using var _ = Logger.LogFunction(); + + foreach (var (taskId, dependenciesToRemove) in dependencies) + { + taskId2TaskData_.AddOrUpdate(taskId, + _ => throw new TaskNotFoundException("The task does not exist."), + (_, + data) => + { + var remainingDep = data.RemainingDataDependencies.ToList(); + + foreach (var dep in dependenciesToRemove) + { + remainingDep.Remove(dep); + } + + return data with + { + RemainingDataDependencies = remainingDep, + }; + }); + } + + return Task.CompletedTask; + } + /// public Task SetTaskCanceledAsync(string taskId, CancellationToken cancellationToken = default) @@ -437,7 +466,7 @@ public Task SetTaskCanceledAsync(string taskId, } taskId2TaskData_.AddOrUpdate(taskId, - _ => throw new InvalidOperationException("The task does not exist."), + _ => throw new TaskNotFoundException("The task does not exist."), (_, data) => { @@ -602,6 +631,7 @@ public Task RetryTask(TaskData taskData, taskData.PayloadId, taskData.ParentTaskIds, taskData.DataDependencies, + taskData.RemainingDataDependencies, taskData.ExpectedOutputIds, taskData.InitialTaskId, newTaskRetryOfIds, diff --git a/Adaptors/MongoDB/src/ResultTable.cs b/Adaptors/MongoDB/src/ResultTable.cs index bfa4dc40a..012f42ab9 100644 --- a/Adaptors/MongoDB/src/ResultTable.cs +++ b/Adaptors/MongoDB/src/ResultTable.cs @@ -252,6 +252,9 @@ public async Task SetResult(string sessionId, key); var resultCollection = resultCollectionProvider_.Get(); + Logger.LogInformation("Update result {resultId} to completed", + key); + var res = await resultCollection.UpdateOneAsync(Builders.Filter.Where(model => model.Id == Result.GenerateId(sessionId, key) && model.OwnerTaskId == ownerTaskId), Builders.Update.Set(model => model.Status, @@ -282,6 +285,9 @@ public async Task SetResult(string sessionId, var resultCollection = resultCollectionProvider_.Get(); + Logger.LogInformation("Update result {resultId} to completed", + key); + var res = await resultCollection.UpdateOneAsync(Builders.Filter.Where(model => model.Id == Result.GenerateId(sessionId, key) && model.OwnerTaskId == ownerTaskId), Builders.Update.Set(model => model.Status, diff --git a/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs b/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs index 584fb6769..618f82c24 100644 --- a/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs +++ b/Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs @@ -49,6 +49,9 @@ static TaskDataModelMapping() cm.MapProperty(nameof(TaskData.DataDependencies)) .SetIgnoreIfDefault(true) .SetDefaultValue(Array.Empty()); + cm.MapProperty(nameof(TaskData.RemainingDataDependencies)) + .SetIgnoreIfDefault(true) + .SetDefaultValue(Array.Empty()); cm.MapProperty(nameof(TaskData.ExpectedOutputIds)) .SetIsRequired(true); cm.MapProperty(nameof(TaskData.InitialTaskId)) @@ -85,6 +88,7 @@ static TaskDataModelMapping() model.PayloadId, model.ParentTaskIds, model.DataDependencies, + model.RemainingDataDependencies, model.ExpectedOutputIds, model.InitialTaskId, model.RetryOfIds, diff --git a/Adaptors/MongoDB/src/TaskTable.cs b/Adaptors/MongoDB/src/TaskTable.cs index 1de1df17d..a5d36a644 100644 --- a/Adaptors/MongoDB/src/TaskTable.cs +++ b/Adaptors/MongoDB/src/TaskTable.cs @@ -526,6 +526,26 @@ public async Task SetTaskSuccessAsync(string taskId, } } + /// + public async Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default) + { + using var activity = activitySource_.StartActivity($"{nameof(SetTaskCanceledAsync)}"); + var taskCollection = taskCollectionProvider_.Get(); + var sessionHandle = sessionProvider_.Get(); + + + await taskCollection.BulkWriteAsync(sessionHandle, + dependencies.Select(tuple + => new UpdateOneModel(new ExpressionFilterDefinition(data => data.TaskId == tuple + .taskId), + new UpdateDefinitionBuilder() + .PullAll(data => data.RemainingDataDependencies, + tuple.dependenciesToRemove))), + cancellationToken: cancellationToken) + .ConfigureAwait(false); + } + /// public async Task SetTaskCanceledAsync(string taskId, CancellationToken cancellationToken = default) @@ -580,10 +600,10 @@ public async Task SetTaskErrorAsync(string taskId, var filter = new FilterDefinitionBuilder().Where(x => x.TaskId == taskId); - Logger.LogInformation("update task {taskId} to status {status} with {output}", - taskId, - TaskStatus.Error, - taskOutput); + Logger.LogDebug("update task {taskId} to status {status} with {output}", + taskId, + TaskStatus.Error, + taskOutput); var task = await taskCollection.FindOneAndUpdateAsync(filter, updateDefinition, new FindOneAndUpdateOptions @@ -818,6 +838,7 @@ public async Task RetryTask(TaskData taskData, taskData.PayloadId, taskData.ParentTaskIds, taskData.DataDependencies, + taskData.RemainingDataDependencies, taskData.ExpectedOutputIds, taskData.InitialTaskId, newTaskRetryOfIds, diff --git a/ArmoniK.Core.sln b/ArmoniK.Core.sln index 01ea27266..e6927cd4c 100644 --- a/ArmoniK.Core.sln +++ b/ArmoniK.Core.sln @@ -83,8 +83,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Adapters.S3.Te EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Common.Tests.Client", "Tests\Common\Client\src\ArmoniK.Core.Common.Tests.Client.csproj", "{289EAA4A-15E5-4D8B-87B2-0CFFD9510207}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Control.DependencyChecker", "Control\DependencyChecker\src\ArmoniK.Core.Control.DependencyChecker.csproj", "{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Base", "Base\src\ArmoniK.Core.Base.csproj", "{150576C8-A80E-4FB1-9878-37D95A11FAF9}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Adapters.QueueCommon", "Adaptors\QueueCommon\src\ArmoniK.Core.Adapters.QueueCommon.csproj", "{EF5DDE84-CB4B-48E0-A21B-455849835E66}" @@ -339,14 +337,6 @@ Global {289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|Any CPU.Build.0 = Release|Any CPU {289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|x64.ActiveCfg = Release|Any CPU {289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|x64.Build.0 = Release|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|x64.ActiveCfg = Debug|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|x64.Build.0 = Debug|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|Any CPU.Build.0 = Release|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|x64.ActiveCfg = Release|Any CPU - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|x64.Build.0 = Release|Any CPU {150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|Any CPU.Build.0 = Debug|Any CPU {150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|x64.ActiveCfg = Debug|Any CPU @@ -414,7 +404,6 @@ Global {4979ADF2-4574-4FF5-BDF4-B5EC0EFAC803} = {1A9BCE53-79D0-4761-B3A2-6967B610FA94} {35AC9746-51AC-4EB5-A6B9-AFC169868ABF} = {2D548C40-6260-4A4E-9C56-E8A80C639E13} {289EAA4A-15E5-4D8B-87B2-0CFFD9510207} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF} - {F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E} = {0E809C1E-CBF5-42E4-8624-2E244A52E422} {150576C8-A80E-4FB1-9878-37D95A11FAF9} = {79DE0448-1BF2-48D2-8A33-243E402CFD27} {EF5DDE84-CB4B-48E0-A21B-455849835E66} = {1A9BCE53-79D0-4761-B3A2-6967B610FA94} {7B455427-859F-4DDA-B67B-510FE450CB35} = {79DE0448-1BF2-48D2-8A33-243E402CFD27} diff --git a/Common/src/DependencyResolver/DependencyResolver.cs b/Common/src/DependencyResolver/DependencyResolver.cs deleted file mode 100644 index 4fad21ca9..000000000 --- a/Common/src/DependencyResolver/DependencyResolver.cs +++ /dev/null @@ -1,256 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.Common.Utils; -using ArmoniK.Core.Base; -using ArmoniK.Core.Common.Storage; -using ArmoniK.Core.Utils; - -using Microsoft.Extensions.Diagnostics.HealthChecks; -using Microsoft.Extensions.Logging; - -using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; - -namespace ArmoniK.Core.Common.DependencyResolver; - -/// -/// Service for checking the status of the dependencies from the tasks in the queue -/// -public class DependencyResolver : IInitializable -{ - private readonly ILogger logger_; - private readonly Injection.Options.DependencyResolver options_; - private readonly IPullQueueStorage pullQueueStorage_; - private readonly IPushQueueStorage pushQueueStorage_; - private readonly IResultTable resultTable_; - private readonly ITaskTable taskTable_; - - /// - /// Initializes the - /// - /// Interface to get tasks from the queue - /// Interface to put tasks in the queue - /// Interface to manage task states - /// Interface to manage result states - /// Dependency checker configurations - /// Logger used to produce logs for this class - public DependencyResolver(IPullQueueStorage pullQueueStorage, - IPushQueueStorage pushQueueStorage, - ITaskTable taskTable, - IResultTable resultTable, - Injection.Options.DependencyResolver options, - ILogger logger) - { - pullQueueStorage_ = pullQueueStorage; - pushQueueStorage_ = pushQueueStorage; - taskTable_ = taskTable; - resultTable_ = resultTable; - options_ = options; - logger_ = logger; - } - - /// - public async Task Check(HealthCheckTag tag) - { - var checks = new List> - { - pullQueueStorage_.Check(tag), - pushQueueStorage_.Check(tag), - resultTable_.Check(tag), - taskTable_.Check(tag), - }; - - var exceptions = new List(); - var data = new Dictionary(); - var description = new StringBuilder(); - var worstStatus = HealthStatus.Healthy; - - foreach (var healthCheckResult in await checks.WhenAll() - .ConfigureAwait(false)) - { - if (healthCheckResult.Status == HealthStatus.Healthy) - { - continue; - } - - if (healthCheckResult.Exception is not null) - { - exceptions.Add(healthCheckResult.Exception); - } - - foreach (var (key, value) in healthCheckResult.Data) - { - data[key] = value; - } - - if (healthCheckResult.Description is not null) - { - description.AppendLine(healthCheckResult.Description); - } - - worstStatus = worstStatus < healthCheckResult.Status - ? worstStatus - : healthCheckResult.Status; - } - - return new HealthCheckResult(worstStatus, - description.ToString(), - new AggregateException(exceptions), - data); - } - - /// - public async Task Init(CancellationToken cancellationToken) - => await Task.WhenAll(pushQueueStorage_.Init(cancellationToken), - pullQueueStorage_.Init(cancellationToken), - resultTable_.Init(cancellationToken), - taskTable_.Init(cancellationToken)) - .ConfigureAwait(false); - - /// - /// Long running task that pulls message that represents tasks from queue, check their dependencies and if dependencies - /// are available, put them in the appropriate queue - /// - /// Token used to cancel the execution of the method - /// - /// Task representing the asynchronous execution of the method - /// - public async Task ExecuteAsync(CancellationToken stoppingToken) - { - using var logFunction = logger_.LogFunction(); - while (!stoppingToken.IsCancellationRequested) - { - try - { - var messages = pullQueueStorage_.PullMessagesAsync(options_.MessagesBatchSize, - stoppingToken); - - await foreach (var message in messages.WithCancellation(stoppingToken) - .ConfigureAwait(false)) - { - using var scopedLogger = logger_.BeginNamedScope("Prefetch messageHandler", - ("messageHandler", message.MessageId), - ("taskId", message.TaskId)); - - var taskData = await taskTable_.ReadTaskAsync(message.TaskId, - stoppingToken) - .ConfigureAwait(false); - - switch (taskData.Status) - { - case TaskStatus.Creating: - - var dependenciesStatus = await TaskLifeCycleHelper.CheckTaskDependencies(taskData, - resultTable_, - logger_, - stoppingToken) - .ConfigureAwait(false); - logger_.LogInformation("task dependencies : {resolved}", - dependenciesStatus); - - switch (dependenciesStatus) - { - case TaskLifeCycleHelper.DependenciesStatus.Aborted: - // not done means that another pod put this task in error so we do not need to do it a second time - // so nothing to do - if (await taskTable_.SetTaskErrorAsync(taskData.TaskId, - "One of the input data is aborted.", - stoppingToken) - .ConfigureAwait(false)) - { - await resultTable_.AbortTaskResults(taskData.SessionId, - taskData.TaskId, - stoppingToken) - .ConfigureAwait(false); - } - - message.Status = QueueMessageStatus.Cancelled; - break; - case TaskLifeCycleHelper.DependenciesStatus.Available: - await pushQueueStorage_.PushMessagesAsync(new[] - { - taskData.TaskId, - }, - taskData.Options.PartitionId, - taskData.Options.Priority, - stoppingToken) - .ConfigureAwait(false); - - await taskTable_.FinalizeTaskCreation(new[] - { - taskData.TaskId, - }, - stoppingToken) - .ConfigureAwait(false); - message.Status = QueueMessageStatus.Processed; - break; - case TaskLifeCycleHelper.DependenciesStatus.Processing: - message.Status = QueueMessageStatus.Processed; - break; - default: - throw new ArgumentOutOfRangeException(); - } - - break; - case TaskStatus.Cancelling: - logger_.LogInformation("Task is being cancelled"); - message.Status = QueueMessageStatus.Cancelled; - await taskTable_.SetTaskCanceledAsync(taskData.TaskId, - CancellationToken.None) - .ConfigureAwait(false); - await resultTable_.AbortTaskResults(taskData.SessionId, - taskData.TaskId, - CancellationToken.None) - .ConfigureAwait(false); - break; - case TaskStatus.Submitted: - case TaskStatus.Dispatched: - case TaskStatus.Completed: - case TaskStatus.Error: - case TaskStatus.Cancelled: - case TaskStatus.Timeout: - case TaskStatus.Processed: - case TaskStatus.Processing: - logger_.LogInformation("Task {status}, task will be removed from this queue", - taskData.Status); - message.Status = QueueMessageStatus.Processed; - break; - case TaskStatus.Unspecified: - default: - throw new ArgumentOutOfRangeException(); - } - - - await message.DisposeAsync() - .ConfigureAwait(false); - } - } - catch (Exception ex) - { - logger_.LogError(ex, - "Error during task processing"); - throw; - } - } - } -} diff --git a/Common/src/Injection/Options/DependencyResolver.cs b/Common/src/Injection/Options/DependencyResolver.cs deleted file mode 100644 index 1ffff4b9b..000000000 --- a/Common/src/Injection/Options/DependencyResolver.cs +++ /dev/null @@ -1,39 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -namespace ArmoniK.Core.Common.Injection.Options; - -/// -/// Configuration for . -/// -public class DependencyResolver -{ - /// - /// Path to the section containing the values in the configuration object - /// - public const string SettingSection = nameof(DependencyResolver); - - /// - /// Name of the queue that holds tasks with unresolved data dependencies - /// - public string UnresolvedDependenciesQueue { get; set; } = string.Empty; - - /// - /// Number of messages retrieved each time by the DependencyChecker from the queue - /// - public int MessagesBatchSize { get; set; } = 1; -} diff --git a/Common/src/Pollster/AgentHandler.cs b/Common/src/Pollster/AgentHandler.cs index b83b8e880..d5fdbacf2 100644 --- a/Common/src/Pollster/AgentHandler.cs +++ b/Common/src/Pollster/AgentHandler.cs @@ -44,43 +44,43 @@ namespace ArmoniK.Core.Common.Pollster; /// public class AgentHandler : IAgentHandler, IAsyncDisposable { - private readonly WebApplication app_; - private readonly ComputePlane computePlaneOptions_; - private readonly Injection.Options.DependencyResolver dependencyResolverOptions_; - private readonly ILogger logger_; - private readonly IObjectStorageFactory objectStorageFactory_; - private readonly IPushQueueStorage pushQueueStorage_; - private readonly IResultTable resultTable_; - private readonly GrpcAgentService service_; - private readonly ISubmitter submitter_; + private readonly WebApplication app_; + private readonly ComputePlane computePlaneOptions_; + private readonly ILogger logger_; + private readonly IObjectStorageFactory objectStorageFactory_; + private readonly IPushQueueStorage pushQueueStorage_; + private readonly IResultTable resultTable_; + private readonly GrpcAgentService service_; + private readonly ISubmitter submitter_; + private readonly ITaskTable taskTable_; /// /// Initializes a new instance /// /// Logger initializer used to configure the loggers needed by the worker /// Options needed for the creation of the servers - /// Configuration for DependencyResolver /// Interface to manage tasks /// Interface class to create object storage /// Interface to put tasks in the queue /// Interface to manage result states + /// Interface to manage task states /// Logger used to produce logs for this class - public AgentHandler(LoggerInit loggerInit, - ComputePlane computePlaneOptions, - Injection.Options.DependencyResolver dependencyResolverOptions, - ISubmitter submitter, - IObjectStorageFactory objectStorageFactory, - IPushQueueStorage pushQueueStorage, - IResultTable resultTable, - ILogger logger) + public AgentHandler(LoggerInit loggerInit, + ComputePlane computePlaneOptions, + ISubmitter submitter, + IObjectStorageFactory objectStorageFactory, + IPushQueueStorage pushQueueStorage, + IResultTable resultTable, + ITaskTable taskTable, + ILogger logger) { - computePlaneOptions_ = computePlaneOptions; - dependencyResolverOptions_ = dependencyResolverOptions; - submitter_ = submitter; - objectStorageFactory_ = objectStorageFactory; - pushQueueStorage_ = pushQueueStorage; - resultTable_ = resultTable; - logger_ = logger; + computePlaneOptions_ = computePlaneOptions; + submitter_ = submitter; + objectStorageFactory_ = objectStorageFactory; + pushQueueStorage_ = pushQueueStorage; + resultTable_ = resultTable; + taskTable_ = taskTable; + logger_ = logger; try { @@ -140,7 +140,7 @@ public async Task Start(string token, objectStorageFactory_, pushQueueStorage_, resultTable_, - dependencyResolverOptions_, + taskTable_, sessionData, taskData, token, diff --git a/Common/src/Pollster/TaskHandler.cs b/Common/src/Pollster/TaskHandler.cs index 170606153..de8cef40c 100644 --- a/Common/src/Pollster/TaskHandler.cs +++ b/Common/src/Pollster/TaskHandler.cs @@ -198,10 +198,12 @@ public async Task AcquireTask() await taskTable_.SetTaskCanceledAsync(messageHandler_.TaskId, CancellationToken.None) .ConfigureAwait(false); - await resultTable_.AbortTaskResults(taskData_.SessionId, - taskData_.TaskId, - CancellationToken.None) - .ConfigureAwait(false); + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + messageHandler_.TaskId, + CancellationToken.None) + .ConfigureAwait(false); + return false; case TaskStatus.Completed: logger_.LogInformation("Task was already completed"); @@ -218,10 +220,11 @@ await resultTable_.AbortTaskResults(taskData_.SessionId, case TaskStatus.Error: logger_.LogInformation("Task was on error elsewhere ; task should have been resubmitted"); messageHandler_.Status = QueueMessageStatus.Cancelled; - await resultTable_.AbortTaskResults(taskData_.SessionId, - taskData_.TaskId, - CancellationToken.None) - .ConfigureAwait(false); + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + messageHandler_.TaskId, + CancellationToken.None) + .ConfigureAwait(false); return false; case TaskStatus.Timeout: logger_.LogInformation("Task was timeout elsewhere ; taking over here"); @@ -229,10 +232,11 @@ await resultTable_.AbortTaskResults(taskData_.SessionId, case TaskStatus.Cancelled: logger_.LogInformation("Task has been cancelled"); messageHandler_.Status = QueueMessageStatus.Cancelled; - await resultTable_.AbortTaskResults(taskData_.SessionId, - taskData_.TaskId, - CancellationToken.None) - .ConfigureAwait(false); + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + messageHandler_.TaskId, + CancellationToken.None) + .ConfigureAwait(false); return false; case TaskStatus.Processing: logger_.LogInformation("Task is processing elsewhere ; taking over here"); @@ -257,10 +261,13 @@ await resultTable_.AbortTaskResults(taskData_.SessionId, await taskTable_.SetTaskCanceledAsync(messageHandler_.TaskId, CancellationToken.None) .ConfigureAwait(false); - await resultTable_.AbortTaskResults(taskData_.SessionId, - taskData_.TaskId, - CancellationToken.None) - .ConfigureAwait(false); + + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + messageHandler_.TaskId, + CancellationToken.None) + .ConfigureAwait(false); + return false; } @@ -349,10 +356,11 @@ await submitter_.CompleteTaskAsync(taskData_, await taskTable_.SetTaskCanceledAsync(messageHandler_.TaskId, CancellationToken.None) .ConfigureAwait(false); - await resultTable_.AbortTaskResults(taskData_.SessionId, - taskData_.TaskId, - CancellationToken.None) - .ConfigureAwait(false); + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + messageHandler_.TaskId, + CancellationToken.None) + .ConfigureAwait(false); return false; } } diff --git a/Common/src/Storage/ITaskTable.cs b/Common/src/Storage/ITaskTable.cs index cf2f518f7..003ff090a 100644 --- a/Common/src/Storage/ITaskTable.cs +++ b/Common/src/Storage/ITaskTable.cs @@ -275,6 +275,17 @@ Task> FindTasksAsync(Expression> filter, Task SetTaskSuccessAsync(string taskId, CancellationToken cancellationToken = default); + /// + /// Remove data dependencies from remaining data dependencies + /// + /// Tuples representing the dependencies to remove from each tasks + /// Token used to cancel the execution of the method + /// + /// Task representing the asynchronous execution of the method + /// + Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default); + /// /// Change the status of the task to canceled /// diff --git a/Common/src/Storage/ResultLifeCycleHelper.cs b/Common/src/Storage/ResultLifeCycleHelper.cs new file mode 100644 index 000000000..22caeadda --- /dev/null +++ b/Common/src/Storage/ResultLifeCycleHelper.cs @@ -0,0 +1,103 @@ +// This file is part of the ArmoniK project +// +// Copyright (C) ANEO, 2021-2023. All rights reserved. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published +// by the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY, without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +using ArmoniK.Api.gRPC.V1; + +using Microsoft.Extensions.Logging; + +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; + +namespace ArmoniK.Core.Common.Storage; + +/// +/// Helper to manage Results +/// +public static class ResultLifeCycleHelper +{ + /// + /// Recursively abort results and put task in error when their dependencies are aborted + /// + /// Interface to manage task states + /// Interface to manage result states + /// The id of the task to process + /// Token used to cancel the execution of the method + /// + /// Task representing the asynchronous execution of the method + /// + public static async Task AbortTaskAndResults(ITaskTable taskTable, + IResultTable resultTable, + string taskId, + CancellationToken cancellationToken) + { + var taskData = await taskTable.ReadTaskAsync(taskId, + cancellationToken) + .ConfigureAwait(false); + + if (taskData.Status is not (TaskStatus.Creating or TaskStatus.Cancelled or TaskStatus.Cancelling or TaskStatus.Error)) + { + return; + } + + if (taskData.Status is TaskStatus.Creating) + { + await taskTable.SetTaskErrorAsync(taskId, + "One of the input data is aborted.", + cancellationToken) + .ConfigureAwait(false); + } + + taskTable.Logger.LogInformation("Abort results from {taskId}", + taskData.TaskId); + + var creatingResults = await resultTable.GetResults(taskData.SessionId, + taskData.ExpectedOutputIds, + cancellationToken) + .Where(result => result.Status == ResultStatus.Created && result.OwnerTaskId == taskId) + .Select(result => result.Name) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + + await resultTable.AbortTaskResults(taskData.SessionId, + taskData.TaskId, + cancellationToken) + .ConfigureAwait(false); + + var dependentTasks = await resultTable.GetResults(taskData.SessionId, + creatingResults, + cancellationToken) + .Where(result => result.Status == ResultStatus.Aborted) + .SelectMany(result => result.DependentTasks.ToAsyncEnumerable()) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + + foreach (var task in (await taskTable.GetTaskStatus(dependentTasks, + cancellationToken) + .ConfigureAwait(false)).Where(status => status.Status != TaskStatus.Error) + .Select(status => status.TaskId)) + { + await AbortTaskAndResults(taskTable, + resultTable, + task, + cancellationToken) + .ConfigureAwait(false); + } + } +} diff --git a/Common/src/Storage/TaskData.cs b/Common/src/Storage/TaskData.cs index 604b5ed57..7de830687 100644 --- a/Common/src/Storage/TaskData.cs +++ b/Common/src/Storage/TaskData.cs @@ -37,6 +37,7 @@ namespace ArmoniK.Core.Common.Storage; /// represents a submission from the client /// /// Unique identifiers of the results the task depends on +/// Copy of data dependencies used for dependency resolution /// /// Identifiers of the outputs the task should produce or should transmit the /// responsibility to produce @@ -61,6 +62,7 @@ public record TaskData(string SessionId, string PayloadId, IList ParentTaskIds, IList DataDependencies, + IList RemainingDataDependencies, IList ExpectedOutputIds, string InitialTaskId, IList RetryOfIds, @@ -116,6 +118,7 @@ public TaskData(string sessionId, payloadId, parentTaskIds, dataDependencies, + dataDependencies, expectedOutputIds, taskId, retryOfIds, diff --git a/Common/src/Storage/TaskLifeCycleHelper.cs b/Common/src/Storage/TaskLifeCycleHelper.cs deleted file mode 100644 index c5d8d8662..000000000 --- a/Common/src/Storage/TaskLifeCycleHelper.cs +++ /dev/null @@ -1,95 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1; - -using Microsoft.Extensions.Logging; - -namespace ArmoniK.Core.Common.Storage; - -/// -/// Helper to manage Tasks lifecycle -/// -public static class TaskLifeCycleHelper -{ - /// - /// Represents the status of the dependencies of a task - /// - public enum DependenciesStatus - { - /// - /// One of the dependency is aborted - /// - Aborted, - - /// - /// All the dependencies are available - /// - Available, - - /// - /// Some of the dependencies are still being created - /// - Processing, - } - - /// - /// Check the status of the dependencies from a task - /// - /// The metadata of the task for which to check dependencies - /// Interface to manage result states - /// Logger used to produce logs for this class - /// Token used to cancel the execution of the method - /// - /// The status of the dependencies - /// - public static async Task CheckTaskDependencies(TaskData taskData, - IResultTable resultTable, - ILogger logger, - CancellationToken cancellationToken) - { - if (!taskData.DataDependencies.Any()) - { - return DependenciesStatus.Available; - } - - var dependencies = await resultTable.AreResultsAvailableAsync(taskData.SessionId, - taskData.DataDependencies, - cancellationToken) - .ConfigureAwait(false); - - var dictionary = dependencies.GroupBy(resultStatusCount => resultStatusCount.Status) - .ToDictionary(counts => counts.Key, - counts => counts.Sum(count => count.Count)); - - if (dictionary.GetValueOrDefault(ResultStatus.Completed, - 0) == taskData.DataDependencies.Count) - { - return DependenciesStatus.Available; - } - - return dictionary.GetValueOrDefault(ResultStatus.Aborted, - 0) > 0 - ? DependenciesStatus.Aborted - : DependenciesStatus.Processing; - } -} diff --git a/Common/src/gRPC/Services/Agent.cs b/Common/src/gRPC/Services/Agent.cs index d3641cc51..b54c0e6ec 100644 --- a/Common/src/gRPC/Services/Agent.cs +++ b/Common/src/gRPC/Services/Agent.cs @@ -37,6 +37,7 @@ using Microsoft.Extensions.Logging; using Result = ArmoniK.Api.gRPC.V1.Agent.Result; +using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; namespace ArmoniK.Core.Common.gRPC.Services; @@ -46,7 +47,6 @@ namespace ArmoniK.Core.Common.gRPC.Services; public class Agent : IAgent { private readonly List<(IEnumerable requests, int priority, string partitionId)> createdTasks_; - private readonly Injection.Options.DependencyResolver dependencyResolverOptions_; private readonly ILogger logger_; private readonly IPushQueueStorage pushQueueStorage_; private readonly IObjectStorage resourcesStorage_; @@ -55,6 +55,7 @@ public class Agent : IAgent private readonly SessionData sessionData_; private readonly ISubmitter submitter_; private readonly TaskData taskData_; + private readonly ITaskTable taskTable_; private readonly string token_; /// @@ -64,32 +65,32 @@ public class Agent : IAgent /// Interface class to create object storage /// Interface to put tasks in the queue /// Interface to manage result states - /// Configuration for the Dependency Resolver + /// Interface to manage task states /// Data of the session /// Data of the task /// Token send to the worker to identify the running task /// Logger used to produce logs for this class - public Agent(ISubmitter submitter, - IObjectStorageFactory objectStorageFactory, - IPushQueueStorage pushQueueStorage, - IResultTable resultTable, - Injection.Options.DependencyResolver dependencyResolverOptions, - SessionData sessionData, - TaskData taskData, - string token, - ILogger logger) + public Agent(ISubmitter submitter, + IObjectStorageFactory objectStorageFactory, + IPushQueueStorage pushQueueStorage, + IResultTable resultTable, + ITaskTable taskTable, + SessionData sessionData, + TaskData taskData, + string token, + ILogger logger) { - submitter_ = submitter; - pushQueueStorage_ = pushQueueStorage; - resultTable_ = resultTable; - dependencyResolverOptions_ = dependencyResolverOptions; - logger_ = logger; - resourcesStorage_ = objectStorageFactory.CreateResourcesStorage(); - createdTasks_ = new List<(IEnumerable requests, int priority, string partitionId)>(); - sentResults_ = new List(); - sessionData_ = sessionData; - taskData_ = taskData; - token_ = token; + submitter_ = submitter; + pushQueueStorage_ = pushQueueStorage; + resultTable_ = resultTable; + taskTable_ = taskTable; + logger_ = logger; + resourcesStorage_ = objectStorageFactory.CreateResourcesStorage(); + createdTasks_ = new List<(IEnumerable requests, int priority, string partitionId)>(); + sentResults_ = new List(); + sessionData_ = sessionData; + taskData_ = taskData; + token_ = token; } /// @@ -113,20 +114,77 @@ await submitter_.FinalizeTaskCreation(createdTask.requests, .ConfigureAwait(false); } - logger_.LogDebug("Send tasks which new data are available in the dependency checker queue"); + logger_.LogDebug("Submit tasks which new data are available"); var dependentTasks = await resultTable_.GetResults(sessionData_.SessionId, sentResults_, cancellationToken) - .SelectMany(result => result.DependentTasks.ToAsyncEnumerable()) - .ToHashSetAsync(cancellationToken) + .Select(result => (result.Name, result.DependentTasks)) + .ToListAsync(cancellationToken) .ConfigureAwait(false); - await pushQueueStorage_.PushMessagesAsync(dependentTasks, - dependencyResolverOptions_.UnresolvedDependenciesQueue, - taskData_.Options.Priority, - cancellationToken) - .ConfigureAwait(false); + if (!dependentTasks.Any()) + { + return; + } + + var toUpdate = new Dictionary>(); + + foreach (var (name, tasks) in dependentTasks) + { + foreach (var task in tasks) + { + if (toUpdate.TryGetValue(task, + out var dicTasks)) + { + dicTasks.Add(name); + } + else + { + toUpdate.TryAdd(task, + new List + { + name, + }); + } + } + } + + if (logger_.IsEnabled(LogLevel.Debug)) + { + logger_.LogDebug("Dependent Tasks Dictionary {@dependents}", + toUpdate); + } + + + if (!toUpdate.Any()) + { + return; + } + + await taskTable_.RemoveRemainingDataDependenciesAsync(toUpdate.Select(pair => (pair.Key, pair.Value.AsEnumerable())), + cancellationToken) + .ConfigureAwait(false); + + var groups = (await taskTable_.FindTasksAsync(data => toUpdate.Keys.Contains(data.TaskId), + _ => _, + cancellationToken)).Where(data => data.Status == TaskStatus.Creating && !data.RemainingDataDependencies.Any()) + .GroupBy(data => (data.Options.PartitionId, data.Options.Priority)); + + foreach (var group in groups) + { + var ids = group.Select(data => data.TaskId) + .ToList(); + await pushQueueStorage_.PushMessagesAsync(ids, + group.Key.PartitionId, + group.Key.Priority, + cancellationToken) + .ConfigureAwait(false); + + await taskTable_.FinalizeTaskCreation(ids, + cancellationToken) + .ConfigureAwait(false); + } } /// diff --git a/Common/src/gRPC/Services/Submitter.cs b/Common/src/gRPC/Services/Submitter.cs index 9fcd5877f..4226c5725 100644 --- a/Common/src/gRPC/Services/Submitter.cs +++ b/Common/src/gRPC/Services/Submitter.cs @@ -46,39 +46,36 @@ namespace ArmoniK.Core.Common.gRPC.Services; public class Submitter : ISubmitter { - private readonly ActivitySource activitySource_; - private readonly Injection.Options.DependencyResolver dependencyResolverOptions_; - private readonly ILogger logger_; - private readonly IObjectStorageFactory objectStorageFactory_; - private readonly IPartitionTable partitionTable_; - private readonly IPushQueueStorage pushQueueStorage_; - private readonly IResultTable resultTable_; - private readonly ISessionTable sessionTable_; - private readonly Injection.Options.Submitter submitterOptions_; - private readonly ITaskTable taskTable_; + private readonly ActivitySource activitySource_; + private readonly ILogger logger_; + private readonly IObjectStorageFactory objectStorageFactory_; + private readonly IPartitionTable partitionTable_; + private readonly IPushQueueStorage pushQueueStorage_; + private readonly IResultTable resultTable_; + private readonly ISessionTable sessionTable_; + private readonly Injection.Options.Submitter submitterOptions_; + private readonly ITaskTable taskTable_; [UsedImplicitly] - public Submitter(IPushQueueStorage pushQueueStorage, - IObjectStorageFactory objectStorageFactory, - ILogger logger, - ISessionTable sessionTable, - ITaskTable taskTable, - IResultTable resultTable, - IPartitionTable partitionTable, - Injection.Options.Submitter submitterOptions, - Injection.Options.DependencyResolver dependencyResolverOptions, - ActivitySource activitySource) + public Submitter(IPushQueueStorage pushQueueStorage, + IObjectStorageFactory objectStorageFactory, + ILogger logger, + ISessionTable sessionTable, + ITaskTable taskTable, + IResultTable resultTable, + IPartitionTable partitionTable, + Injection.Options.Submitter submitterOptions, + ActivitySource activitySource) { - objectStorageFactory_ = objectStorageFactory; - logger_ = logger; - sessionTable_ = sessionTable; - taskTable_ = taskTable; - resultTable_ = resultTable; - partitionTable_ = partitionTable; - submitterOptions_ = submitterOptions; - dependencyResolverOptions_ = dependencyResolverOptions; - activitySource_ = activitySource; - pushQueueStorage_ = pushQueueStorage; + objectStorageFactory_ = objectStorageFactory; + logger_ = logger; + sessionTable_ = sessionTable; + taskTable_ = taskTable; + resultTable_ = resultTable; + partitionTable_ = partitionTable; + submitterOptions_ = submitterOptions; + activitySource_ = activitySource; + pushQueueStorage_ = pushQueueStorage; } /// @@ -132,16 +129,9 @@ await ChangeResultOwnership(sessionId, foreach (var request in taskRequests) { var dependencies = request.DataDependencies.ToList(); - if (dependencies.Any()) - { - dependencies = await resultTable_.GetResults(sessionId, - dependencies, - cancellationToken) - .Where(result => result.Status != ResultStatus.Completed) - .Select(result => result.Name) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - } + + logger_.LogDebug("Process task request {request}", + request); if (dependencies.Any()) { @@ -160,16 +150,27 @@ await resultTable_.AddTaskDependency(sessionId, // This is a typical case of TOCTOU issues (Time Of Check, Time Of Use). // The second check ensures that the result completion will be visible, // even if it happens in parallel to the task submission. - dependencies = await resultTable_.GetResults(sessionId, - dependencies, - cancellationToken) - .Where(result => result.Status != ResultStatus.Completed) - .Select(result => result.Name) - .ToListAsync(cancellationToken) - .ConfigureAwait(false); - } + var completedDependencies = await resultTable_.GetResults(sessionId, + dependencies, + cancellationToken) + .Where(result => result.Status == ResultStatus.Completed) + .Select(result => result.Name) + .ToListAsync(cancellationToken) + .ConfigureAwait(false); + + await taskTable_.RemoveRemainingDataDependenciesAsync(new List<(string taskId, IEnumerable dependenciesToRemove)> + { + (request.Id, completedDependencies), + }, + cancellationToken) + .ConfigureAwait(false); - if (!dependencies.Any()) + if (dependencies.Count == completedDependencies.Count) + { + readyTasks.Add(request.Id); + } + } + else { readyTasks.Add(request.Id); } @@ -488,10 +489,11 @@ await FinalizeTaskCreation(new List } else { - await resultTable_.AbortTaskResults(taskData.SessionId, - taskData.TaskId, - cancellationToken) - .ConfigureAwait(false); + await ResultLifeCycleHelper.AbortTaskAndResults(taskTable_, + resultTable_, + taskData.TaskId, + CancellationToken.None) + .ConfigureAwait(false); } } } diff --git a/Common/tests/DependencyResolverTest.cs b/Common/tests/DependencyResolverTest.cs deleted file mode 100644 index 4736d8c21..000000000 --- a/Common/tests/DependencyResolverTest.cs +++ /dev/null @@ -1,500 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Collections.Generic; -using System.Linq; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Api.gRPC.V1; -using ArmoniK.Core.Base; -using ArmoniK.Core.Common.Storage; -using ArmoniK.Core.Common.Tests.Helpers; -using ArmoniK.Core.Utils; - -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; - -using NUnit.Framework; - -using Output = ArmoniK.Core.Common.Storage.Output; -using TaskOptions = ArmoniK.Core.Common.Storage.TaskOptions; -using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; - -namespace ArmoniK.Core.Common.Tests; - -[TestFixture] -public class DependencyResolverTest -{ - [SetUp] - public void SetUp() - { - } - - [TearDown] - public void TearDown() - { - } - - private const string TaskCreatingWithDependencies = nameof(TaskCreatingWithDependencies); - private const string TaskCreatingWithAvailableDependencies = nameof(TaskCreatingWithAvailableDependencies); - private const string TaskCreatingWithAbortedDependencies = nameof(TaskCreatingWithAbortedDependencies); - private const string CreatedData1 = nameof(CreatedData1); - private const string AbortedData1 = nameof(AbortedData1); - private const string AvailableData1 = nameof(AvailableData1); - private const string AvailableData2 = nameof(AvailableData2); - private const string TaskCreatingNoDependencies = nameof(TaskCreatingNoDependencies); - private const string TaskSubmitted = nameof(TaskSubmitted); - private const string TaskCancelling = nameof(TaskCancelling); - private const string TaskCancelled = nameof(TaskCancelled); - - private static async Task Populate() - { - var pushQueueStorage = new SimplePushQueueStorage(); - var pullQueueStorage = new SimplePullQueueStorage(); - - var provider = new TestDatabaseProvider(collection => - { - var configuration = new ConfigurationManager(); - - collection.AddInitializedOption(configuration, - Injection.Options.DependencyResolver.SettingSection); - collection.AddSingleton(); - collection.AddSingleton(pushQueueStorage); - collection.AddSingleton(pullQueueStorage); - collection.AddSingleton(pullQueueStorage); - collection.AddSingleton(pushQueueStorage); - }); - - var resultTable = provider.GetRequiredService(); - var taskTable = provider.GetRequiredService(); - - TaskOptions options = new(new Dictionary - { - { - "key1", "val1" - }, - { - "key2", "val2" - }, - }, - TimeSpan.MaxValue, - 5, - 1, - "part1", - "applicationName", - "applicationVersion", - "applicationNamespace", - "applicationService", - "engineType"); - - await resultTable.Create(new[] - { - new Result("SessionId", - AvailableData1, - TaskCreatingWithAvailableDependencies, - ResultStatus.Completed, - new List(), - DateTime.UtcNow, - Array.Empty()), - new Result("SessionId", - AvailableData2, - TaskCreatingWithAvailableDependencies, - ResultStatus.Completed, - new List(), - DateTime.UtcNow, - Array.Empty()), - new Result("SessionId", - CreatedData1, - TaskCreatingWithDependencies, - ResultStatus.Created, - new List(), - DateTime.UtcNow, - Array.Empty()), - new Result("SessionId", - AbortedData1, - TaskCreatingWithAbortedDependencies, - ResultStatus.Aborted, - new List(), - DateTime.UtcNow, - Array.Empty()), - }) - .ConfigureAwait(false); - - await taskTable.CreateTasks(new[] - { - new TaskData("SessionId", - "TaskCompletedId", - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - "output2", - }, - Array.Empty(), - TaskStatus.Completed, - options, - new Output(true, - "")), - new TaskData("SessionId", - TaskSubmitted, - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Submitted, - options, - new Output(false, - "")), - new TaskData("SessionId", - TaskCreatingWithAvailableDependencies, - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - AvailableData1, - AvailableData2, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Creating, - options, - new Output(false, - "")), - new TaskData("SessionId", - TaskCreatingWithAbortedDependencies, - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - AbortedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Creating, - options, - new Output(false, - "")), - new TaskData("SessionId", - "TaskProcessingId", - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Processing, - options, - new Output(false, - "")), - new TaskData("SessionId", - "TaskAnotherProcessingId", - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Processing, - options, - new Output(false, - "")), - new TaskData("SessionId", - TaskCreatingNoDependencies, - "", - "", - "PayloadId", - new[] - { - "parent1", - }, - new List(), - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Creating, - options with - { - PartitionId = "part2", - }, - new Output(false, - "")), - new TaskData("SessionId", - TaskCreatingWithDependencies, - "", - "", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Creating, - options with - { - PartitionId = "part2", - }, - new Output(false, - "")), - new TaskData("SessionId", - TaskCancelled, - "", - "", - "PayloadId", - new[] - { - "parent1", - }, - Array.Empty(), - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Cancelled, - options with - { - PartitionId = "part2", - }, - new Output(false, - "")), - new TaskData("SessionId", - TaskCancelling, - "", - "", - "PayloadId", - new[] - { - "parent1", - }, - Array.Empty(), - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Cancelling, - options with - { - PartitionId = "part2", - }, - new Output(false, - "")), - new TaskData("SessionId", - "TaskFailedId", - "OwnerPodId", - "OwnerPodName", - "PayloadId", - new[] - { - "parent1", - }, - new[] - { - CreatedData1, - }, - new[] - { - "output1", - }, - Array.Empty(), - TaskStatus.Error, - options, - new Output(false, - "sad task")), - }) - .ConfigureAwait(false); - - return provider; - } - - - [Test] - public async Task InitShouldSucceed() - { - using var provider = await Populate() - .ConfigureAwait(false); - var dp = provider.GetRequiredService(); - Assert.DoesNotThrowAsync(() => dp.Init(CancellationToken.None)); - } - - - [Test] - [TestCase(TaskCreatingNoDependencies, - ExpectedResult = TaskLifeCycleHelper.DependenciesStatus.Available)] - [TestCase(TaskCreatingWithAbortedDependencies, - ExpectedResult = TaskLifeCycleHelper.DependenciesStatus.Aborted)] - [TestCase(TaskCreatingWithAvailableDependencies, - ExpectedResult = TaskLifeCycleHelper.DependenciesStatus.Available)] - [TestCase(TaskCreatingWithDependencies, - ExpectedResult = TaskLifeCycleHelper.DependenciesStatus.Processing)] - public async Task ValidateDependenciesStatus(string taskId) - { - using var provider = await Populate() - .ConfigureAwait(false); - - var logger = provider.GetRequiredService(); - var resultTable = provider.GetRequiredService(); - var taskTable = provider.GetRequiredService(); - - var taskData = await taskTable.ReadTaskAsync(taskId, - CancellationToken.None) - .ConfigureAwait(false); - - return await TaskLifeCycleHelper.CheckTaskDependencies(taskData, - resultTable, - logger, - CancellationToken.None) - .ConfigureAwait(false); - } - - - [Test] - public async Task CheckForDependenciesShouldSucceed() - { - using var provider = await Populate() - .ConfigureAwait(false); - var dp = provider.GetRequiredService(); - await dp.Init(CancellationToken.None) - .ConfigureAwait(false); - - var pullQueueStorage = provider.GetRequiredService(); - pullQueueStorage.Messages.Add(TaskCreatingNoDependencies); - pullQueueStorage.Messages.Add(TaskCreatingWithDependencies); - pullQueueStorage.Messages.Add(TaskCreatingWithAbortedDependencies); - pullQueueStorage.Messages.Add(TaskCreatingWithAvailableDependencies); - pullQueueStorage.Messages.Add(TaskSubmitted); - pullQueueStorage.Messages.Add(TaskCancelled); - pullQueueStorage.Messages.Add(TaskCancelling); - - var cts = new CancellationTokenSource(); - cts.CancelAfter(TimeSpan.FromSeconds(2)); - - await dp.ExecuteAsync(cts.Token) - .ConfigureAwait(false); - - await Task.Delay(TimeSpan.FromSeconds(1)) - .ConfigureAwait(false); - - var pushQueueStorage = provider.GetRequiredService(); - Assert.AreEqual(2, - pushQueueStorage.Messages.Count); - Assert.Contains(TaskCreatingWithAvailableDependencies, - pushQueueStorage.Messages); - Assert.Contains(TaskCreatingNoDependencies, - pushQueueStorage.Messages); - Assert.IsFalse(pushQueueStorage.Messages.Contains(TaskSubmitted)); - Assert.IsFalse(pushQueueStorage.Messages.Contains(TaskCancelled)); - Assert.IsFalse(pushQueueStorage.Messages.Contains(TaskCancelling)); - - var taskTable = provider.GetRequiredService(); - Assert.AreEqual(TaskStatus.Error, - taskTable.ReadTaskAsync(TaskCreatingWithAbortedDependencies, - CancellationToken.None) - .Result.Status); - Assert.AreEqual(TaskStatus.Submitted, - taskTable.ReadTaskAsync(TaskCreatingNoDependencies, - CancellationToken.None) - .Result.Status); - Assert.AreEqual(TaskStatus.Submitted, - taskTable.ReadTaskAsync(TaskCreatingWithAvailableDependencies, - CancellationToken.None) - .Result.Status); - Assert.AreEqual(TaskStatus.Creating, - taskTable.ReadTaskAsync(TaskCreatingWithDependencies, - CancellationToken.None) - .Result.Status); - Assert.AreEqual(TaskStatus.Cancelled, - taskTable.ReadTaskAsync(TaskCancelled, - CancellationToken.None) - .Result.Status); - Assert.AreEqual(TaskStatus.Cancelled, - taskTable.ReadTaskAsync(TaskCancelling, - CancellationToken.None) - .Result.Status); - } -} diff --git a/Common/tests/Helpers/SimpleTaskTable.cs b/Common/tests/Helpers/SimpleTaskTable.cs index f875b8872..4227d79d3 100644 --- a/Common/tests/Helpers/SimpleTaskTable.cs +++ b/Common/tests/Helpers/SimpleTaskTable.cs @@ -236,6 +236,10 @@ public Task SetTaskSuccessAsync(string taskId, CancellationToken cancellationToken = default) => Task.CompletedTask; + public Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default) + => Task.CompletedTask; + public Task SetTaskCanceledAsync(string taskId, CancellationToken cancellationToken = default) => Task.CompletedTask; diff --git a/Common/tests/Helpers/TestPollsterProvider.cs b/Common/tests/Helpers/TestPollsterProvider.cs index 84405f9cd..b0e8fafb0 100644 --- a/Common/tests/Helpers/TestPollsterProvider.cs +++ b/Common/tests/Helpers/TestPollsterProvider.cs @@ -106,10 +106,6 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, { $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.GraceDelay)}", "00:00:02" }, - { - $"{Injection.Options.DependencyResolver.SettingSection}:{nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue)}", - nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue) - }, }; Console.WriteLine(minimalConfig.ToJson()); @@ -128,8 +124,6 @@ public TestPollsterProvider(IWorkerStreamHandler workerStreamHandler, .AddSingleton() .AddOption(builder.Configuration, Injection.Options.Submitter.SettingSection) - .AddOption(builder.Configuration, - Injection.Options.DependencyResolver.SettingSection) .AddSingleton() .AddSingleton("ownerpodid") .AddSingleton() diff --git a/Common/tests/Helpers/TestTaskHandlerProvider.cs b/Common/tests/Helpers/TestTaskHandlerProvider.cs index 360cee3e7..7fa5e928b 100644 --- a/Common/tests/Helpers/TestTaskHandlerProvider.cs +++ b/Common/tests/Helpers/TestTaskHandlerProvider.cs @@ -53,7 +53,7 @@ public class TestTaskHandlerProvider : IDisposable private readonly LoggerFactory loggerFactory_; private readonly IObjectStorageFactory objectStorageFactory_; public readonly IPartitionTable PartitionTable; - private readonly IResultTable resultTable_; + public readonly IResultTable ResultTable; private readonly IMongoRunner runner_; public readonly ISessionTable SessionTable; public readonly ISubmitter Submitter; @@ -111,10 +111,6 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, { $"{Injection.Options.Pollster.SettingSection}:{nameof(Injection.Options.Pollster.GraceDelay)}", "00:00:02" }, - { - $"{Injection.Options.DependencyResolver.SettingSection}:{nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue)}", - nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue) - }, }; Console.WriteLine(minimalConfig.ToJson()); @@ -137,8 +133,6 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, Injection.Options.Submitter.SettingSection) .AddOption(builder.Configuration, Injection.Options.Pollster.SettingSection) - .AddOption(builder.Configuration, - Injection.Options.DependencyResolver.SettingSection) .AddSingleton(cancellationTokenSource) .AddSingleton() .AddSingleton("ownerpodid") @@ -164,7 +158,7 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, app_ = builder.Build(); - resultTable_ = app_.Services.GetRequiredService(); + ResultTable = app_.Services.GetRequiredService(); TaskTable = app_.Services.GetRequiredService(); PartitionTable = app_.Services.GetRequiredService(); SessionTable = app_.Services.GetRequiredService(); @@ -172,8 +166,8 @@ public TestTaskHandlerProvider(IWorkerStreamHandler workerStreamHandler, TaskHandler = app_.Services.GetRequiredService(); objectStorageFactory_ = app_.Services.GetRequiredService(); - resultTable_.Init(CancellationToken.None) - .Wait(); + ResultTable.Init(CancellationToken.None) + .Wait(); TaskTable.Init(CancellationToken.None) .Wait(); PartitionTable.Init(CancellationToken.None) diff --git a/Common/tests/Pollster/AgentTest.cs b/Common/tests/Pollster/AgentTest.cs index 7e1590cc2..6e1be1c01 100644 --- a/Common/tests/Pollster/AgentTest.cs +++ b/Common/tests/Pollster/AgentTest.cs @@ -40,7 +40,7 @@ using NUnit.Framework; using Agent = ArmoniK.Core.Common.gRPC.Services.Agent; -using Result = ArmoniK.Api.gRPC.V1.Agent.Result; +using Result = ArmoniK.Core.Common.Storage.Result; using TaskOptions = ArmoniK.Core.Common.Storage.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -63,6 +63,8 @@ public virtual void TearDown() private const string Partition = "PartitionId"; private const string ExpectedOutput1 = "ExpectedOutput1"; private const string ExpectedOutput2 = "ExpectedOutput2"; + private const string DataDependency1 = "Task1DD"; + private const string DataDependency2 = "Task2DD"; private static readonly TaskOptions Options = new(new Dictionary @@ -90,12 +92,6 @@ public virtual void TearDown() MaxErrorAllowed = -1, }; - private static readonly Injection.Options.DependencyResolver DependencyResolverOptions = new() - { - UnresolvedDependenciesQueue = - nameof(DependencyResolverOptions.UnresolvedDependenciesQueue), - }; - public class MyPushQueueStorage : IPushQueueStorage { public ConcurrentDictionary> Messages = new(); @@ -144,7 +140,6 @@ public AgentHolder() QueueStorage = new MyPushQueueStorage(); prov_ = new TestDatabaseProvider(collection => collection.AddSingleton() .AddSingleton(SubmitterOptions) - .AddSingleton(DependencyResolverOptions) .AddSingleton(QueueStorage)); ResultTable = prov_.GetRequiredService(); @@ -168,6 +163,25 @@ public AgentHolder() CancellationToken.None) .Result; + ResultTable.Create(new[] + { + new Result(sessionData.SessionId, + DataDependency1, + "", + ResultStatus.Completed, + new List(), + DateTime.UtcNow, + Array.Empty()), + new Result(sessionData.SessionId, + DataDependency2, + "", + ResultStatus.Completed, + new List(), + DateTime.UtcNow, + Array.Empty()), + }) + .Wait(); + var createdTasks = submitter.CreateTasks(Session, Session, Options, @@ -257,7 +271,7 @@ public AgentHolder() objectStorageFactory, QueueStorage, ResultTable, - DependencyResolverOptions, + TaskTable, sessionData, TaskData, Token, @@ -291,13 +305,13 @@ public async Task WrongTokens(string token) Assert.AreEqual(CreateTaskReply.ResponseOneofCase.Error, createTaskReply.ResponseCase); - var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] - { - new Result - { - CommunicationToken = token, - }, - }), + var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] + { + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = token, + }, + }), CancellationToken.None) .ConfigureAwait(false); @@ -413,54 +427,53 @@ public async Task SendResultShouldSucceed() { using var holder = new AgentHolder(); - Assert.IsFalse(holder.QueueStorage.Messages.Keys.Contains(DependencyResolverOptions.UnresolvedDependenciesQueue)); Assert.AreEqual(0, holder.QueueStorage.Messages.SelectMany(pair => pair.Value) .Count()); - var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] - { - new Result - { - CommunicationToken = holder.Token, - Init = new InitKeyedDataStream - { - Key = ExpectedOutput1, - }, - }, - new Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - Data = ByteString.CopyFromUtf8("Data1"), - }, - }, - new Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - Data = ByteString.CopyFromUtf8("Data2"), - }, - }, - new Result - { - CommunicationToken = holder.Token, - Data = new DataChunk - { - DataComplete = true, - }, - }, - new Result - { - CommunicationToken = holder.Token, - Init = new InitKeyedDataStream - { - LastResult = true, - }, - }, - }), + var resultReply = await holder.Agent.SendResult(new TestHelperAsyncStreamReader(new[] + { + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = holder.Token, + Init = new InitKeyedDataStream + { + Key = ExpectedOutput1, + }, + }, + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = holder.Token, + Data = new DataChunk + { + Data = ByteString.CopyFromUtf8("Data1"), + }, + }, + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = holder.Token, + Data = new DataChunk + { + Data = ByteString.CopyFromUtf8("Data2"), + }, + }, + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = holder.Token, + Data = new DataChunk + { + DataComplete = true, + }, + }, + new Api.gRPC.V1.Agent.Result + { + CommunicationToken = holder.Token, + Init = new InitKeyedDataStream + { + LastResult = true, + }, + }, + }), CancellationToken.None) .ConfigureAwait(false); @@ -494,12 +507,32 @@ await holder.Agent.FinalizeTaskCreation(CancellationToken.None) dependents); Assert.Contains(holder.TaskWithDependencies1, - holder.QueueStorage.Messages[DependencyResolverOptions.UnresolvedDependenciesQueue]); + holder.QueueStorage.Messages[Partition]); Assert.Contains(holder.TaskWithDependencies2, - holder.QueueStorage.Messages[DependencyResolverOptions.UnresolvedDependenciesQueue]); + holder.QueueStorage.Messages[Partition]); Assert.AreEqual(2, - holder.QueueStorage.Messages[DependencyResolverOptions.UnresolvedDependenciesQueue] + holder.QueueStorage.Messages[Partition] .Count); + + var taskData1 = await holder.TaskTable.ReadTaskAsync(holder.TaskWithDependencies1, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.Contains(ExpectedOutput1, + taskData1.DataDependencies.ToList()); + Assert.IsEmpty(taskData1.RemainingDataDependencies); + Assert.AreEqual(TaskStatus.Submitted, + taskData1.Status); + + var taskData2 = await holder.TaskTable.ReadTaskAsync(holder.TaskWithDependencies2, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.Contains(ExpectedOutput1, + taskData2.DataDependencies.ToList()); + Assert.IsEmpty(taskData2.RemainingDataDependencies); + Assert.AreEqual(TaskStatus.Submitted, + taskData2.Status); } [Test] @@ -526,7 +559,7 @@ public async Task CreateTasksShouldSucceed() { DataDependencies = { - "Task1DD", + DataDependency1, }, ExpectedOutputKeys = { @@ -568,7 +601,7 @@ public async Task CreateTasksShouldSucceed() { DataDependencies = { - "Task1DD", + DataDependency1, }, ExpectedOutputKeys = { @@ -645,7 +678,7 @@ public async Task CreateTasksShouldSucceed() { DataDependencies = { - "Task1DD", + DataDependency1, }, ExpectedOutputKeys = { diff --git a/Common/tests/Pollster/TaskHandlerTest.cs b/Common/tests/Pollster/TaskHandlerTest.cs index 4893929e1..d5f8023e4 100644 --- a/Common/tests/Pollster/TaskHandlerTest.cs +++ b/Common/tests/Pollster/TaskHandlerTest.cs @@ -47,7 +47,7 @@ using NUnit.Framework; using Output = ArmoniK.Core.Common.Storage.Output; -using Result = ArmoniK.Api.gRPC.V1.Agent.Result; +using Result = ArmoniK.Core.Common.Storage.Result; using TaskOptions = ArmoniK.Core.Common.Storage.TaskOptions; using TaskRequest = ArmoniK.Core.Common.gRPC.Services.TaskRequest; using TaskStatus = ArmoniK.Api.gRPC.V1.TaskStatus; @@ -172,6 +172,18 @@ await testServiceProvider.PartitionTable.CreatePartitionsAsync(new[] CancellationToken.None) .ConfigureAwait(false)).SessionId; + await testServiceProvider.ResultTable.Create(new[] + { + new Result(sessionId, + "DataDep", + "", + ResultStatus.Created, + new List(), + DateTime.UtcNow, + Array.Empty()), + }) + .ConfigureAwait(false); + var (requestsIEnumerable, priority, whichPartitionId) = await testServiceProvider.Submitter.CreateTasks(sessionId, sessionId, new Api.gRPC.V1.TaskOptions @@ -354,6 +366,7 @@ public async Task AcquireStatusShouldFail(TaskStatus status) new List(), new List(), new List(), + new List(), "init", new List(), status, @@ -445,6 +458,7 @@ await Task.Delay(delay_) new List(), new List(), new List(), + new List(), "taskId", new List(), TaskStatus.Submitted, @@ -544,6 +558,10 @@ public Task SetTaskSuccessAsync(string taskId, CancellationToken cancellationToken) => throw new NotImplementedException(); + public Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default) + => Task.CompletedTask; + public Task SetTaskCanceledAsync(string taskId, CancellationToken cancellationToken) => throw new NotImplementedException(); @@ -577,6 +595,7 @@ await Task.Delay(delay_) new List(), new List(), new List(), + new List(), "taskId", new List(), TaskStatus.Dispatched, @@ -613,6 +632,7 @@ public Task ReleaseTask(string taskId, new List(), new List(), new List(), + new List(), "taskId", new List(), TaskStatus.Submitted, @@ -839,7 +859,7 @@ public async Task AcquireNotReadyTaskShouldFail() var acquired = await testServiceProvider.TaskHandler.AcquireTask() .ConfigureAwait(false); - Assert.IsTrue(acquired); + Assert.IsFalse(acquired); } public class ExceptionStartWorkerStreamHandler : IWorkerStreamHandler @@ -1194,10 +1214,10 @@ await agentHandler.Agent.CreateTask(taskStreamReader, .ConfigureAwait(false); - var resultStreamReader = new TestHelperAsyncStreamReader(new[] - { - new Result(), - }); + var resultStreamReader = new TestHelperAsyncStreamReader(new[] + { + new Api.gRPC.V1.Agent.Result(), + }); await agentHandler.Agent.SendResult(resultStreamReader, CancellationToken.None) .ConfigureAwait(false); diff --git a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs index 90f85486e..1c2b2a3a4 100644 --- a/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs +++ b/Common/tests/Submitter/IntegrationGrpcSubmitterServiceTest.cs @@ -926,6 +926,10 @@ public Task SetTaskSuccessAsync(string taskId, CancellationToken cancellationToken = default) => throw new T(); + public Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable dependenciesToRemove)> dependencies, + CancellationToken cancellationToken = default) + => throw new T(); + public Task SetTaskCanceledAsync(string taskId, CancellationToken cancellationToken = default) => throw new T(); diff --git a/Common/tests/Submitter/SubmitterTests.cs b/Common/tests/Submitter/SubmitterTests.cs index 1f4da8b57..49f28107e 100644 --- a/Common/tests/Submitter/SubmitterTests.cs +++ b/Common/tests/Submitter/SubmitterTests.cs @@ -103,33 +103,27 @@ public async Task SetUp() $"{Injection.Options.Submitter.SettingSection}:{nameof(Injection.Options.Submitter.DefaultPartition)}", DefaultPartition }, - { - $"{Injection.Options.DependencyResolver.SettingSection}:{nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue)}", - nameof(Injection.Options.DependencyResolver.UnresolvedDependenciesQueue) - }, }; Console.WriteLine(minimalConfig.ToJson()); - var loggerFactory = new LoggerFactory(); - loggerFactory.AddProvider(new ConsoleForwardingLoggerProvider()); + var loggerProvider = new ConsoleForwardingLoggerProvider(); + var loggerFactory = new LoggerFactory(); + loggerFactory.AddProvider(loggerProvider); var configuration = new ConfigurationManager(); configuration.AddInMemoryCollection(minimalConfig); var services = new ServiceCollection(); - services.AddMongoStorages(configuration, logger) .AddSingleton(ActivitySource) .AddSingleton(client_) - .AddLogging() + .AddLogging(builder => builder.AddProvider(loggerProvider)) .AddSingleton() .AddOption(configuration, Injection.Options.Submitter.SettingSection) - .AddOption(configuration, - Injection.Options.DependencyResolver.SettingSection) .AddSingleton(); var provider = services.BuildServiceProvider(new ServiceProviderOptions @@ -187,24 +181,26 @@ public virtual void TearDown() private static readonly string ExpectedOutput1 = "ExpectedOutput1"; private static readonly string ExpectedOutput2 = "ExpectedOutput2"; private static readonly string ExpectedOutput3 = "ExpectedOutput3"; + private static readonly string ExpectedOutput4 = "ExpectedOutput4"; + private static readonly string ExpectedOutput5 = "ExpectedOutput5"; private static readonly string DefaultPartition = "DefaultPartition"; private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Common.Tests.Submitter"); private ISessionTable? sessionTable_; private ITaskTable? taskTable_; private IPartitionTable? partitionTable_; + public static readonly TaskOptions DefaultTaskOptionsPart1 = new() + { + MaxDuration = Duration.FromTimeSpan(TimeSpan.FromSeconds(2)), + MaxRetries = 2, + Priority = 1, + PartitionId = "part1", + }; + private static async Task<(string sessionId, string taskCreating, string taskSubmitted)> InitSubmitter(ISubmitter submitter, IPartitionTable partitionTable, CancellationToken token) { - var defaultTaskOptions = new TaskOptions - { - MaxDuration = Duration.FromTimeSpan(TimeSpan.FromSeconds(2)), - MaxRetries = 2, - Priority = 1, - PartitionId = "part1", - }; - await partitionTable.CreatePartitionsAsync(new[] { new PartitionData("part1", @@ -230,13 +226,13 @@ await partitionTable.CreatePartitionsAsync(new[] "part1", "part2", }, - defaultTaskOptions, + DefaultTaskOptionsPart1, token) .ConfigureAwait(false)).SessionId; var taskCreating = (await submitter.CreateTasks(sessionId, sessionId, - defaultTaskOptions, + DefaultTaskOptionsPart1, new List { new(new[] @@ -255,7 +251,7 @@ await partitionTable.CreatePartitionsAsync(new[] var tuple = await submitter.CreateTasks(sessionId, sessionId, - defaultTaskOptions, + DefaultTaskOptionsPart1, new List { new(new[] @@ -806,4 +802,90 @@ await InitSubmitterCompleteTask(submitter_!, 1), result[2]); } + + [Test] + public async Task SetTaskError() + { + var (sessionId, _, _) = await InitSubmitter(submitter_!, + partitionTable_!, + CancellationToken.None) + .ConfigureAwait(false); + + var tuple = await submitter_!.CreateTasks(sessionId, + sessionId, + DefaultTaskOptionsPart1, + new List + { + new(new[] + { + ExpectedOutput4, + }, + new List(), + new List> + { + new(Encoding.ASCII.GetBytes("AAAA")), + }.ToAsyncEnumerable()), + new(new[] + { + ExpectedOutput5, + }, + new[] + { + ExpectedOutput4, + }, + new List> + { + new(Encoding.ASCII.GetBytes("AAAA")), + }.ToAsyncEnumerable()), + }.ToAsyncEnumerable(), + CancellationToken.None) + .ConfigureAwait(false); + + var abortedTask = tuple.requests.First() + .Id; + var taskWithDependencies = tuple.requests.Last() + .Id; + + await submitter_.FinalizeTaskCreation(tuple.requests, + DefaultTaskOptionsPart1.Priority, + DefaultTaskOptionsPart1.PartitionId, + sessionId, + sessionId, + CancellationToken.None) + .ConfigureAwait(false); + + var taskData = await taskTable_!.ReadTaskAsync(abortedTask, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.AreEqual(TaskStatus.Submitted, + taskData.Status); + + await submitter_.CompleteTaskAsync(taskData, + false, + new Api.gRPC.V1.Output + { + Error = new Api.gRPC.V1.Output.Types.Error + { + Details = "This error should be propagated to other tasks", + }, + }) + .ConfigureAwait(false); + + taskData = await taskTable_.ReadTaskAsync(abortedTask, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.AreEqual(TaskStatus.Error, + taskData.Status); + + taskData = await taskTable_.ReadTaskAsync(taskWithDependencies, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.AreEqual(TaskStatus.Error, + taskData.Status); + Assert.AreEqual("One of the input data is aborted.", + taskData.Output.Error); + } } diff --git a/Common/tests/TestBase/TaskTableTestBase.cs b/Common/tests/TestBase/TaskTableTestBase.cs index b6f18ebda..df56cf5ad 100644 --- a/Common/tests/TestBase/TaskTableTestBase.cs +++ b/Common/tests/TestBase/TaskTableTestBase.cs @@ -1582,4 +1582,115 @@ public async Task FindTasksAsyncContainsShouldSucceed() .Count(s => s == "dependency1")); } } + + [Test] + public async Task RemoveRemainingDataDependenciesShouldSucceed() + { + if (RunTests) + { + var taskId = Guid.NewGuid() + .ToString(); + var dd1 = "dependency1"; + var dd2 = "dependency2"; + + await TaskTable!.CreateTasks(new[] + { + new TaskData("SessionId", + taskId, + "OwnerPodId", + "OwnerPodName", + "PayloadId", + new[] + { + "parent1", + }, + new[] + { + dd1, + dd2, + }, + new[] + { + "output1", + "output2", + }, + Array.Empty(), + TaskStatus.Creating, + options_, + new Output(true, + "")), + }) + .ConfigureAwait(false); + + await TaskTable.RemoveRemainingDataDependenciesAsync(new List<(string taskId, IEnumerable dependenciesToRemove)> + { + (taskId, new List + { + dd1, + dd2, + }), + }, + CancellationToken.None) + .ConfigureAwait(false); + + var taskData = await TaskTable.ReadTaskAsync(taskId, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.IsEmpty(taskData.RemainingDataDependencies); + } + } + + [Test] + public async Task RemoveRemainingDataDependenciesDepDoesNotExistShouldSucceed() + { + if (RunTests) + { + var taskId = Guid.NewGuid() + .ToString(); + var dd1 = "dependency1"; + + await TaskTable!.CreateTasks(new[] + { + new TaskData("SessionId", + taskId, + "OwnerPodId", + "OwnerPodName", + "PayloadId", + new[] + { + "parent1", + }, + new List(), + new[] + { + "output1", + "output2", + }, + Array.Empty(), + TaskStatus.Creating, + options_, + new Output(true, + "")), + }) + .ConfigureAwait(false); + + await TaskTable.RemoveRemainingDataDependenciesAsync(new List<(string taskId, IEnumerable dependenciesToRemove)> + { + (taskId, new List + { + dd1, + }), + }, + CancellationToken.None) + .ConfigureAwait(false); + + var taskData = await TaskTable.ReadTaskAsync(taskId, + CancellationToken.None) + .ConfigureAwait(false); + + Assert.IsEmpty(taskData.RemainingDataDependencies); + Assert.IsEmpty(taskData.DataDependencies); + } + } } diff --git a/Compute/PollingAgent/src/Program.cs b/Compute/PollingAgent/src/Program.cs index 772c5002b..98ec4aee1 100644 --- a/Compute/PollingAgent/src/Program.cs +++ b/Compute/PollingAgent/src/Program.cs @@ -28,7 +28,6 @@ using ArmoniK.Core.Base; using ArmoniK.Core.Common.gRPC.Services; using ArmoniK.Core.Common.Injection; -using ArmoniK.Core.Common.Injection.Options; using ArmoniK.Core.Common.Pollster; using ArmoniK.Core.Common.Pollster.TaskProcessingChecker; using ArmoniK.Core.Common.Utils; @@ -93,8 +92,6 @@ public static async Task Main(string[] args) .AddSingleton() .AddInitializedOption(builder.Configuration, Submitter.SettingSection) - .AddInitializedOption(builder.Configuration, - DependencyResolver.SettingSection) .AddSingleton(pollsterOptions) .AddSingleton() .AddSingleton() diff --git a/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj b/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj deleted file mode 100644 index 56a97e8d6..000000000 --- a/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj +++ /dev/null @@ -1,48 +0,0 @@ - - - - - net6.0 - ANEO - Copyright (C) ANEO, 2021-2021 - AGPL-3.0-or-later - True - 61626152-69fe-468e-b033-201f8c9716ad - Linux - ..\..\.. - True - true - win-x64;linux-x64 - enable - - - - Embedded - true - DEBUG;TRACE - - - - true - true - snupkg - - - - - - - - - - - - - - - - - - - - diff --git a/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj.DotSettings b/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj.DotSettings deleted file mode 100644 index 89316e414..000000000 --- a/Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj.DotSettings +++ /dev/null @@ -1,2 +0,0 @@ - - Library \ No newline at end of file diff --git a/Control/DependencyChecker/src/Dockerfile b/Control/DependencyChecker/src/Dockerfile deleted file mode 100644 index 442facccb..000000000 --- a/Control/DependencyChecker/src/Dockerfile +++ /dev/null @@ -1,58 +0,0 @@ -#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. - -FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base -WORKDIR /app - -FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build -WORKDIR /src -COPY ["Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj", "Control/DependencyChecker/src/"] -COPY ["Utils/src/ArmoniK.Core.Utils.csproj", "Utils/src/"] -COPY ["Base/src/ArmoniK.Core.Base.csproj", "Base/src/"] -COPY ["Common/src/ArmoniK.Core.Common.csproj", "Common/src/"] -COPY ["Adaptors/MongoDB/src/ArmoniK.Core.Adapters.MongoDB.csproj", "Adaptors/MongoDB/src/"] -RUN dotnet restore "Control/DependencyChecker/src/ArmoniK.Core.Control.DependencyChecker.csproj" -COPY . . -WORKDIR "/src/Control/DependencyChecker/src" -RUN dotnet build "ArmoniK.Core.Control.DependencyChecker.csproj" -c Release -o /app/build - -FROM mcr.microsoft.com/dotnet/sdk:6.0 AS adapters -WORKDIR /src -COPY ["Utils/src/ArmoniK.Core.Utils.csproj", "Utils/src/"] -COPY ["Base/src/ArmoniK.Core.Base.csproj", "Base/src/"] -COPY ["Adaptors/RabbitMQ/src/ArmoniK.Core.Adapters.RabbitMQ.csproj", "Adaptors/RabbitMQ/src/"] -COPY ["Adaptors/Amqp/src/ArmoniK.Core.Adapters.Amqp.csproj", "Adaptors/Amqp/src/"] -COPY ["Adaptors/QueueCommon/src/ArmoniK.Core.Adapters.QueueCommon.csproj", "Adaptors/QueueCommon/src/"] -RUN dotnet restore "Adaptors/RabbitMQ/src/ArmoniK.Core.Adapters.RabbitMQ.csproj" -RUN dotnet restore "Adaptors/Amqp/src/ArmoniK.Core.Adapters.Amqp.csproj" -COPY . . -WORKDIR "/src/Adaptors/RabbitMQ/src" -RUN dotnet build "ArmoniK.Core.Adapters.RabbitMQ.csproj" -c Release -o /app/build/rabbit -WORKDIR "/src/Adaptors/Amqp/src" -RUN dotnet build "ArmoniK.Core.Adapters.Amqp.csproj" -c Release -o /app/build/amq - -FROM build AS publish -RUN dotnet publish "ArmoniK.Core.Control.DependencyChecker.csproj" -c Release -o /app/publish - -FROM adapters AS publisha -WORKDIR "/src/Adaptors/RabbitMQ/src" -RUN dotnet publish "ArmoniK.Core.Adapters.RabbitMQ.csproj" -c Release -o /app/publish/rabbit /p:UseAppHost=false -WORKDIR "/src/Adaptors/Amqp/src" -RUN dotnet publish "ArmoniK.Core.Adapters.Amqp.csproj" -c Release -o /app/publish/amqp /p:UseAppHost=false - - -FROM base AS final -WORKDIR /adapters/queue/amqp -COPY --from=publisha /app/publish/amqp . -WORKDIR /adapters/queue/rabbit -COPY --from=publisha /app/publish/rabbit . -WORKDIR /app -COPY --from=publish /app/publish . -RUN groupadd --gid 5000 armonikuser && useradd --home-dir /home/armonikuser --create-home --uid 5000 --gid 5000 --shell /bin/sh --skel /dev/null armonikuser -RUN mkdir /local_storage && chown armonikuser: /local_storage -USER armonikuser - -ENV ASPNETCORE_URLS http://+:1080, http://+:1081 -EXPOSE 1080 -EXPOSE 1081 - -ENTRYPOINT ["dotnet", "ArmoniK.Core.Control.DependencyChecker.dll"] diff --git a/Control/DependencyChecker/src/Program.cs b/Control/DependencyChecker/src/Program.cs deleted file mode 100644 index 427b7db1f..000000000 --- a/Control/DependencyChecker/src/Program.cs +++ /dev/null @@ -1,161 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System; -using System.Diagnostics; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Core.Adapters.MongoDB; -using ArmoniK.Core.Adapters.MongoDB.Common; -using ArmoniK.Core.Base; -using ArmoniK.Core.Common.DependencyResolver; -using ArmoniK.Core.Common.Injection; -using ArmoniK.Core.Common.Utils; -using ArmoniK.Core.Utils; - -using Microsoft.AspNetCore.Builder; -using Microsoft.AspNetCore.Diagnostics.HealthChecks; -using Microsoft.AspNetCore.Hosting; -using Microsoft.AspNetCore.Server.Kestrel.Core; -using Microsoft.Extensions.Configuration; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; - -using OpenTelemetry.Trace; - -using Serilog; - -namespace ArmoniK.Core.Control.DependencyChecker; - -public static class Program -{ - private static readonly ActivitySource ActivitySource = new("ArmoniK.Core.Control.Submitter"); - - public static async Task Main(string[] args) - { - var builder = WebApplication.CreateBuilder(args); - - builder.Configuration.SetBasePath(Directory.GetCurrentDirectory()) - .AddJsonFile("appsettings.json", - true, - false) - .AddEnvironmentVariables() - .AddCommandLine(args); - - var logger = new LoggerInit(builder.Configuration); - - try - { - builder.Host.UseSerilog(logger.GetSerilogConf()) - .ConfigureHostOptions(options => options.BackgroundServiceExceptionBehavior = BackgroundServiceExceptionBehavior.Ignore); - - builder.Services.AddLogging(logger.Configure) - .AddMongoComponents(builder.Configuration, - logger.GetLogger()) - .AddQueue(builder.Configuration, - logger.GetLogger()) - .AddInitializedOption(builder.Configuration, - Common.Injection.Options.DependencyResolver.SettingSection) - .AddSingletonWithHealthCheck(nameof(DependencyResolver)) - .AddHostedService(); - - builder.Services.AddHealthChecks(); - - if (!string.IsNullOrEmpty(builder.Configuration["Zipkin:Uri"])) - { - ActivitySource.AddActivityListener(new ActivityListener - { - ShouldListenTo = _ => true, - //Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllDataAndRecorded, - ActivityStopped = activity => - { - foreach (var (key, value) in activity.Baggage) - { - activity.AddTag(key, - value); - } - }, - }); - - builder.Services.AddSingleton(ActivitySource) - .AddOpenTelemetry() - .WithTracing(b => - { - b.AddSource(ActivitySource.Name); - b.AddAspNetCoreInstrumentation(); - b.AddZipkinExporter(options => options.Endpoint = - new Uri(builder.Configuration["Zipkin:Uri"] ?? - throw new InvalidOperationException("Zipkin uri should not be null"))); - }); - } - - builder.WebHost.UseKestrel(options => options.ListenAnyIP(1081, - listenOptions => listenOptions.Protocols = HttpProtocols.Http1)); - - var app = builder.Build(); - - if (app.Environment.IsDevelopment()) - { - app.UseDeveloperExceptionPage(); - } - - app.UseRouting(); - app.UseSerilogRequestLogging(); - - app.UseHealthChecks("/startup", - 1081, - new HealthCheckOptions - { - Predicate = check => check.Tags.Contains(nameof(HealthCheckTag.Startup)), - }); - - app.UseHealthChecks("/liveness", - 1081, - new HealthCheckOptions - { - Predicate = check => check.Tags.Contains(nameof(HealthCheckTag.Liveness)), - }); - - var sessionProvider = app.Services.GetRequiredService(); - var dependencyChecker = app.Services.GetRequiredService(); - - await sessionProvider.Init(CancellationToken.None) - .ConfigureAwait(false); - await dependencyChecker.Init(CancellationToken.None) - .ConfigureAwait(false); - - await app.RunAsync() - .ConfigureAwait(false); - - return 0; - } - catch (Exception ex) - { - logger.GetLogger() - .LogCritical(ex, - "Host terminated unexpectedly"); - return 1; - } - finally - { - Log.CloseAndFlush(); - } - } -} diff --git a/Control/DependencyChecker/src/Worker.cs b/Control/DependencyChecker/src/Worker.cs deleted file mode 100644 index 3acd65e38..000000000 --- a/Control/DependencyChecker/src/Worker.cs +++ /dev/null @@ -1,36 +0,0 @@ -// This file is part of the ArmoniK project -// -// Copyright (C) ANEO, 2021-2023. All rights reserved. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as published -// by the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY, without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -using System.Threading; -using System.Threading.Tasks; - -using ArmoniK.Core.Common.DependencyResolver; - -using Microsoft.Extensions.Hosting; - -namespace ArmoniK.Core.Control.DependencyChecker; - -public class Worker : BackgroundService -{ - private readonly DependencyResolver pollster_; - - public Worker(DependencyResolver pollster) - => pollster_ = pollster; - - protected override Task ExecuteAsync(CancellationToken stoppingToken) - => pollster_.ExecuteAsync(stoppingToken); -} diff --git a/Control/DependencyChecker/src/appsettings.json b/Control/DependencyChecker/src/appsettings.json deleted file mode 100644 index 4e46464c1..000000000 --- a/Control/DependencyChecker/src/appsettings.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "Logging": { - "LogLevel": { - "Default": "Information", - "Grpc": "Information", - "Microsoft": "Warning", - "Microsoft.Hosting.Lifetime": "Information" - } - }, - "AllowedHosts": "*", - "Serilog": { - "Properties": { - "Application": "ArmoniK.Control.DependencyChecker" - } - }, - "Components": { - "AuthenticationStorage": "ArmoniK.Adapters.MongoDB.AuthenticationTable", - "TableStorage": "ArmoniK.Adapters.MongoDB.TableStorage", - "QueueStorage": "ArmoniK.Adapters.MongoDB.LockedQueueStorage", - "ObjectStorage": "ArmoniK.Adapters.MongoDB.ObjectStorage" - }, - "MongoDB": { - "Host": "database", - "Port": "27017", - "DatabaseName": "database", - "DataRetention": "10.00:00:00", - "TableStorage": { - "PollingDelay": "00:00:10", - "DispatchAcquisitionPeriod": "00:00:10", - "DispatchTimeToLive": "00:00:20" - }, - "ObjectStorage": { - "ChunkSize": "100000" - } - }, - "Amqp": { - "MaxRetries": "10", - "LinkCredit": "2" - } -} diff --git a/Control/Submitter/src/Program.cs b/Control/Submitter/src/Program.cs index b7de6711e..f1a92d752 100644 --- a/Control/Submitter/src/Program.cs +++ b/Control/Submitter/src/Program.cs @@ -31,7 +31,6 @@ using ArmoniK.Core.Common.gRPC; using ArmoniK.Core.Common.gRPC.Services; using ArmoniK.Core.Common.Injection; -using ArmoniK.Core.Common.Injection.Options; using ArmoniK.Core.Common.Storage; using ArmoniK.Core.Common.Utils; using ArmoniK.Core.Utils; @@ -88,8 +87,6 @@ public static async Task Main(string[] args) .AddSingletonWithHealthCheck(nameof(ExceptionInterceptor)) .AddOption(builder.Configuration, Common.Injection.Options.Submitter.SettingSection) - .AddOption(builder.Configuration, - DependencyResolver.SettingSection) .AddGrpcReflection() .ValidateGrpcRequests(); diff --git a/Tests/HtcMock/Client/src/SessionClient.cs b/Tests/HtcMock/Client/src/SessionClient.cs index 9910c6b84..6c2ac6506 100644 --- a/Tests/HtcMock/Client/src/SessionClient.cs +++ b/Tests/HtcMock/Client/src/SessionClient.cs @@ -153,6 +153,9 @@ public IEnumerable SubmitTasksWithDependencies(IEnumerable status.TaskInfo.TaskId) + .Single()); return taskRequests.Select(request => request.ExpectedOutputKeys.Single()); case CreateTaskReply.ResponseOneofCase.Error: throw new Exception("Error : " + createTaskReply.Error); diff --git a/justfile b/justfile index f2082397c..edc7f9d2a 100644 --- a/justfile +++ b/justfile @@ -66,7 +66,6 @@ export ARMONIK_METRICS := "dockerhubaneo/armonik_control_metrics:" + export ARMONIK_PARTITIONMETRICS := "dockerhubaneo/armonik_control_partition_metrics:" + tag export ARMONIK_SUBMITTER := "dockerhubaneo/armonik_control:" + tag export ARMONIK_POLLINGAGENT := "dockerhubaneo/armonik_pollingagent:" + tag -export ARMONIK_DEPENDENCYCHECKER := "dockerhubaneo/armonik_control_dependency_checker:" + tag # Environment variables used to build client images of htcmock, stream and bench export HTCMOCK_CLIENT_IMAGE := "dockerhubaneo/armonik_core_htcmock_test_client:" + tag @@ -216,18 +215,15 @@ buildStreamClient: (build STREAM_CLIENT_IMAGE "./Tests/Stream/Client/Dockerfile # Build Bench Client buildBenchClient: (build BENCH_CLIENT_IMAGE "./Tests/Bench/Client/src/Dockerfile") -# Build Dependency Checker -buildDependencyChecker: (build ARMONIK_DEPENDENCYCHECKER "./Control/DependencyChecker/src/Dockerfile") - # Build all images necessary for the deployment -build-all: buildWorker buildMetrics buildPartitionMetrics buildSubmimtter buildPollingAgent buildDependencyChecker +build-all: buildWorker buildMetrics buildPartitionMetrics buildSubmimtter buildPollingAgent # Build and Deploy ArmoniK Core; this recipe should only be used with local_images=false build-deploy: build-all deploy # Custom command to restore a deployment after restarting a given service -restoreDeployment serviceName: (restart serviceName) (restart "armonik.control.submitter") (restart "armonik.control.dependency_checker") +restoreDeployment serviceName: (restart serviceName) (restart "armonik.control.submitter") #!/usr/bin/env bash set -euo pipefail for (( i=0; i<{{replicas}}; i++ )); do @@ -252,7 +248,3 @@ healthChecks: echo -e "\nHealth Checking Submitter" echo -n " startup: " && curl -sSL localhost:5011/startup echo -n " liveness: " && curl -sSL localhost:5011/liveness - - echo -e "\nHealth Checking DependencyChecker" - echo -n " startup: " && curl -sSL localhost:5012/startup - echo -n " liveness: " && curl -sSL localhost:5012/liveness \ No newline at end of file diff --git a/terraform/main.tf b/terraform/main.tf index 6a03d1a8e..c9875120c 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -86,47 +86,32 @@ module "queue_artemis" { } module "submitter" { - source = "./modules/submitter" - container_name = local.submitter.name - core_tag = local.submitter.tag - use_local_image = var.use_local_image - docker_image = local.submitter.image - network = docker_network.armonik.name - generated_env_vars = local.environment - zipkin_uri = module.zipkin.zipkin_uri - log_driver = module.fluenbit.log_driver - volumes = local.volumes - unresolved_dependencies_queue = var.unresolved_dependencies_queue -} - -module "dependency_checker" { - source = "./modules/dependency_checker" - container_name = local.dependency_checker.name - core_tag = local.dependency_checker.tag - use_local_image = var.use_local_image - docker_image = local.dependency_checker.image - network = docker_network.armonik.name - generated_env_vars = local.environment - zipkin_uri = module.zipkin.zipkin_uri - log_driver = module.fluenbit.log_driver - unresolved_dependencies_queue = var.unresolved_dependencies_queue + source = "./modules/submitter" + container_name = local.submitter.name + core_tag = local.submitter.tag + use_local_image = var.use_local_image + docker_image = local.submitter.image + network = docker_network.armonik.name + generated_env_vars = local.environment + zipkin_uri = module.zipkin.zipkin_uri + log_driver = module.fluenbit.log_driver + volumes = local.volumes } module "compute_plane" { - source = "./modules/compute_plane" - for_each = local.replicas - replica_counter = each.key - num_partitions = var.num_partitions - core_tag = local.compute_plane.tag - use_local_image = var.use_local_image - polling_agent = local.compute_plane.polling_agent - worker = local.compute_plane.worker - generated_env_vars = local.environment - volumes = local.volumes - network = docker_network.armonik.name - zipkin_uri = module.zipkin.zipkin_uri - log_driver = module.fluenbit.log_driver - unresolved_dependencies_queue = var.unresolved_dependencies_queue + source = "./modules/compute_plane" + for_each = local.replicas + replica_counter = each.key + num_partitions = var.num_partitions + core_tag = local.compute_plane.tag + use_local_image = var.use_local_image + polling_agent = local.compute_plane.polling_agent + worker = local.compute_plane.worker + generated_env_vars = local.environment + volumes = local.volumes + network = docker_network.armonik.name + zipkin_uri = module.zipkin.zipkin_uri + log_driver = module.fluenbit.log_driver } module "metrics_exporter" { diff --git a/terraform/modules/compute_plane/inputs.tf b/terraform/modules/compute_plane/inputs.tf index 3526c394b..88d352a79 100644 --- a/terraform/modules/compute_plane/inputs.tf +++ b/terraform/modules/compute_plane/inputs.tf @@ -58,7 +58,3 @@ variable "log_driver" { address = string, }) } - -variable "unresolved_dependencies_queue" { - type = string -} diff --git a/terraform/modules/compute_plane/locals.tf b/terraform/modules/compute_plane/locals.tf index 28da6522c..f651c0523 100644 --- a/terraform/modules/compute_plane/locals.tf +++ b/terraform/modules/compute_plane/locals.tf @@ -11,7 +11,6 @@ locals { "InitWorker__WorkerCheckDelay=${var.polling_agent.worker_check_delay}", "Zipkin__Uri=${var.zipkin_uri}", "Amqp__PartitionId=TestPartition${local.partition_chooser}", - "DependencyResolver__UnresolvedDependenciesQueue=${var.unresolved_dependencies_queue}", ] gen_env = [for k, v in var.generated_env_vars : "${k}=${v}"] } diff --git a/terraform/modules/dependency_checker/inputs.tf b/terraform/modules/dependency_checker/inputs.tf deleted file mode 100644 index 7cf135b8b..000000000 --- a/terraform/modules/dependency_checker/inputs.tf +++ /dev/null @@ -1,39 +0,0 @@ -variable "core_tag" { - type = string -} - -variable "container_name" { - type = string -} - -variable "docker_image" { - type = string -} - -variable "use_local_image" { - type = bool - default = false -} - -variable "network" { - type = string -} - -variable "zipkin_uri" { - type = string -} - -variable "generated_env_vars" { - type = map(string) -} - -variable "log_driver" { - type = object({ - name = string, - address = string, - }) -} - -variable "unresolved_dependencies_queue" { - type = string -} diff --git a/terraform/modules/dependency_checker/locals.tf b/terraform/modules/dependency_checker/locals.tf deleted file mode 100644 index 0d3324e89..000000000 --- a/terraform/modules/dependency_checker/locals.tf +++ /dev/null @@ -1,7 +0,0 @@ -locals { - env = [ - "Zipkin__Uri=${var.zipkin_uri}", - "Amqp__PartitionId=${var.unresolved_dependencies_queue}", - ] - gen_env = [for k, v in var.generated_env_vars : "${k}=${v}"] -} \ No newline at end of file diff --git a/terraform/modules/dependency_checker/main.tf b/terraform/modules/dependency_checker/main.tf deleted file mode 100644 index 2ddbb3fae..000000000 --- a/terraform/modules/dependency_checker/main.tf +++ /dev/null @@ -1,44 +0,0 @@ -resource "docker_image" "dependency_checker" { - count = var.use_local_image ? 0 : 1 - name = "${var.docker_image}:${var.core_tag}" - keep_locally = true -} - -module "dependency_checker_local" { - count = var.use_local_image ? 1 : 0 - source = "../build_image" - use_local_image = var.use_local_image - image_name = "dependency_checker_local" - context_path = "${path.root}/../" - dockerfile_path = "${path.root}/../Control/DependencyChecker/src/" -} - -resource "docker_container" "dependency_checker" { - name = var.container_name - image = one(concat(module.dependency_checker_local, docker_image.dependency_checker)).image_id - - networks_advanced { - name = var.network - } - - env = concat(local.env, local.gen_env) - - log_driver = var.log_driver.name - - log_opts = { - fluentd-address = var.log_driver.address - } - - ports { - internal = 1081 - external = 5012 - } - - healthcheck { - test = ["CMD", "bash", "-c", "exec 3<>\"/dev/tcp/localhost/1081\" && echo -en \"GET /liveness HTTP/1.1\r\nHost: localhost:1081\r\nConnection: close\r\n\r\n\">&3 && grep Healthy <&3 &>/dev/null || exit 1"] - interval = "5s" - timeout = "3s" - start_period = "20s" - retries = 5 - } -} \ No newline at end of file diff --git a/terraform/modules/dependency_checker/outputs.tf b/terraform/modules/dependency_checker/outputs.tf deleted file mode 100644 index e69de29bb..000000000 diff --git a/terraform/modules/dependency_checker/versions.tf b/terraform/modules/dependency_checker/versions.tf deleted file mode 100644 index 76b04b359..000000000 --- a/terraform/modules/dependency_checker/versions.tf +++ /dev/null @@ -1,8 +0,0 @@ -terraform { - required_providers { - docker = { - source = "kreuzwerker/docker" - version = ">= 3.0.1" - } - } -} \ No newline at end of file diff --git a/terraform/modules/submitter/inputs.tf b/terraform/modules/submitter/inputs.tf index 51fc2dd35..1a5e786a9 100644 --- a/terraform/modules/submitter/inputs.tf +++ b/terraform/modules/submitter/inputs.tf @@ -38,6 +38,3 @@ variable "log_driver" { }) } -variable "unresolved_dependencies_queue" { - type = string -} diff --git a/terraform/modules/submitter/locals.tf b/terraform/modules/submitter/locals.tf index d8b88ee6e..60a7c98d1 100644 --- a/terraform/modules/submitter/locals.tf +++ b/terraform/modules/submitter/locals.tf @@ -2,7 +2,6 @@ locals { env = [ "Submitter__DefaultPartition=TestPartition0", "Zipkin__Uri=${var.zipkin_uri}", - "DependencyResolver__UnresolvedDependenciesQueue=${var.unresolved_dependencies_queue}", ] gen_env = [for k, v in var.generated_env_vars : "${k}=${v}"] } diff --git a/terraform/variables.tf b/terraform/variables.tf index df6b38222..33142309b 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -18,11 +18,6 @@ variable "num_partitions" { default = "3" } -variable "unresolved_dependencies_queue" { - type = string - default = "UnresolvedDependenciesQueueDefault" -} - variable "mongodb_params" { type = object({ max_connection_pool_size = optional(string, "500")