Skip to content

Commit

Permalink
feat: Dependency checking without external service (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo authored Apr 17, 2023
2 parents 71e6036 + 0405571 commit a503989
Show file tree
Hide file tree
Showing 47 changed files with 759 additions and 1,661 deletions.
4 changes: 0 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,6 @@ jobs:
path: ./Control/PartitionMetrics/src/Dockerfile
- img : dockerhubaneo/armonik_control
path: ./Control/Submitter/src/Dockerfile
- img : dockerhubaneo/armonik_control_dependency_checker
path: ./Control/DependencyChecker/src/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_worker
path: ./Tests/Stream/Server/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_client
Expand Down Expand Up @@ -271,8 +269,6 @@ jobs:
path: ./Control/PartitionMetrics/src/Dockerfile
- img : dockerhubaneo/armonik_control
path: ./Control/Submitter/src/Dockerfile
- img : dockerhubaneo/armonik_control_dependency_checker
path: ./Control/DependencyChecker/src/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_worker
path: ./Tests/Stream/Server/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_client
Expand Down
2 changes: 0 additions & 2 deletions .github/workflows/make-release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ jobs:
path: ./Control/PartitionMetrics/src/Dockerfile
- img : dockerhubaneo/armonik_control
path: ./Control/Submitter/src/Dockerfile
- img : dockerhubaneo/armonik_control_dependency_checker
path: ./Control/DependencyChecker/src/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_worker
path: ./Tests/Stream/Server/Dockerfile
- img : dockerhubaneo/armonik_core_stream_test_client
Expand Down
32 changes: 31 additions & 1 deletion Adaptors/Memory/src/TaskTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,35 @@ public Task SetTaskSuccessAsync(string taskId,
return Task.CompletedTask;
}

public Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable<string> dependenciesToRemove)> dependencies,
CancellationToken cancellationToken = default)
{
using var _ = Logger.LogFunction();

foreach (var (taskId, dependenciesToRemove) in dependencies)
{
taskId2TaskData_.AddOrUpdate(taskId,
_ => throw new TaskNotFoundException("The task does not exist."),
(_,
data) =>
{
var remainingDep = data.RemainingDataDependencies.ToList();
foreach (var dep in dependenciesToRemove)
{
remainingDep.Remove(dep);
}
return data with
{
RemainingDataDependencies = remainingDep,
};
});
}

return Task.CompletedTask;
}

/// <inheritdoc />
public Task SetTaskCanceledAsync(string taskId,
CancellationToken cancellationToken = default)
Expand All @@ -437,7 +466,7 @@ public Task SetTaskCanceledAsync(string taskId,
}

taskId2TaskData_.AddOrUpdate(taskId,
_ => throw new InvalidOperationException("The task does not exist."),
_ => throw new TaskNotFoundException("The task does not exist."),
(_,
data) =>
{
Expand Down Expand Up @@ -602,6 +631,7 @@ public Task<string> RetryTask(TaskData taskData,
taskData.PayloadId,
taskData.ParentTaskIds,
taskData.DataDependencies,
taskData.RemainingDataDependencies,
taskData.ExpectedOutputIds,
taskData.InitialTaskId,
newTaskRetryOfIds,
Expand Down
6 changes: 6 additions & 0 deletions Adaptors/MongoDB/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ public async Task SetResult(string sessionId,
key);
var resultCollection = resultCollectionProvider_.Get();

Logger.LogInformation("Update result {resultId} to completed",
key);

var res = await resultCollection.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Id == Result.GenerateId(sessionId,
key) && model.OwnerTaskId == ownerTaskId),
Builders<Result>.Update.Set(model => model.Status,
Expand Down Expand Up @@ -282,6 +285,9 @@ public async Task SetResult(string sessionId,

var resultCollection = resultCollectionProvider_.Get();

Logger.LogInformation("Update result {resultId} to completed",
key);

var res = await resultCollection.UpdateOneAsync(Builders<Result>.Filter.Where(model => model.Id == Result.GenerateId(sessionId,
key) && model.OwnerTaskId == ownerTaskId),
Builders<Result>.Update.Set(model => model.Status,
Expand Down
4 changes: 4 additions & 0 deletions Adaptors/MongoDB/src/Table/DataModel/TaskDataModelMapping.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ static TaskDataModelMapping()
cm.MapProperty(nameof(TaskData.DataDependencies))
.SetIgnoreIfDefault(true)
.SetDefaultValue(Array.Empty<string>());
cm.MapProperty(nameof(TaskData.RemainingDataDependencies))
.SetIgnoreIfDefault(true)
.SetDefaultValue(Array.Empty<string>());
cm.MapProperty(nameof(TaskData.ExpectedOutputIds))
.SetIsRequired(true);
cm.MapProperty(nameof(TaskData.InitialTaskId))
Expand Down Expand Up @@ -85,6 +88,7 @@ static TaskDataModelMapping()
model.PayloadId,
model.ParentTaskIds,
model.DataDependencies,
model.RemainingDataDependencies,
model.ExpectedOutputIds,
model.InitialTaskId,
model.RetryOfIds,
Expand Down
29 changes: 25 additions & 4 deletions Adaptors/MongoDB/src/TaskTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,26 @@ public async Task SetTaskSuccessAsync(string taskId,
}
}

/// <inheritdoc />
public async Task RemoveRemainingDataDependenciesAsync(IEnumerable<(string taskId, IEnumerable<string> dependenciesToRemove)> dependencies,
CancellationToken cancellationToken = default)
{
using var activity = activitySource_.StartActivity($"{nameof(SetTaskCanceledAsync)}");
var taskCollection = taskCollectionProvider_.Get();
var sessionHandle = sessionProvider_.Get();


await taskCollection.BulkWriteAsync(sessionHandle,
dependencies.Select(tuple
=> new UpdateOneModel<TaskData>(new ExpressionFilterDefinition<TaskData>(data => data.TaskId == tuple
.taskId),
new UpdateDefinitionBuilder<TaskData>()
.PullAll(data => data.RemainingDataDependencies,
tuple.dependenciesToRemove))),
cancellationToken: cancellationToken)
.ConfigureAwait(false);
}

/// <inheritdoc />
public async Task SetTaskCanceledAsync(string taskId,
CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -580,10 +600,10 @@ public async Task<bool> SetTaskErrorAsync(string taskId,

var filter = new FilterDefinitionBuilder<TaskData>().Where(x => x.TaskId == taskId);

Logger.LogInformation("update task {taskId} to status {status} with {output}",
taskId,
TaskStatus.Error,
taskOutput);
Logger.LogDebug("update task {taskId} to status {status} with {output}",
taskId,
TaskStatus.Error,
taskOutput);
var task = await taskCollection.FindOneAndUpdateAsync(filter,
updateDefinition,
new FindOneAndUpdateOptions<TaskData>
Expand Down Expand Up @@ -818,6 +838,7 @@ public async Task<string> RetryTask(TaskData taskData,
taskData.PayloadId,
taskData.ParentTaskIds,
taskData.DataDependencies,
taskData.RemainingDataDependencies,
taskData.ExpectedOutputIds,
taskData.InitialTaskId,
newTaskRetryOfIds,
Expand Down
11 changes: 0 additions & 11 deletions ArmoniK.Core.sln
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Adapters.S3.Te
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Common.Tests.Client", "Tests\Common\Client\src\ArmoniK.Core.Common.Tests.Client.csproj", "{289EAA4A-15E5-4D8B-87B2-0CFFD9510207}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Control.DependencyChecker", "Control\DependencyChecker\src\ArmoniK.Core.Control.DependencyChecker.csproj", "{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Base", "Base\src\ArmoniK.Core.Base.csproj", "{150576C8-A80E-4FB1-9878-37D95A11FAF9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ArmoniK.Core.Adapters.QueueCommon", "Adaptors\QueueCommon\src\ArmoniK.Core.Adapters.QueueCommon.csproj", "{EF5DDE84-CB4B-48E0-A21B-455849835E66}"
Expand Down Expand Up @@ -339,14 +337,6 @@ Global
{289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|Any CPU.Build.0 = Release|Any CPU
{289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|x64.ActiveCfg = Release|Any CPU
{289EAA4A-15E5-4D8B-87B2-0CFFD9510207}.Release|x64.Build.0 = Release|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|x64.ActiveCfg = Debug|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Debug|x64.Build.0 = Debug|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|Any CPU.Build.0 = Release|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|x64.ActiveCfg = Release|Any CPU
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E}.Release|x64.Build.0 = Release|Any CPU
{150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{150576C8-A80E-4FB1-9878-37D95A11FAF9}.Debug|x64.ActiveCfg = Debug|Any CPU
Expand Down Expand Up @@ -414,7 +404,6 @@ Global
{4979ADF2-4574-4FF5-BDF4-B5EC0EFAC803} = {1A9BCE53-79D0-4761-B3A2-6967B610FA94}
{35AC9746-51AC-4EB5-A6B9-AFC169868ABF} = {2D548C40-6260-4A4E-9C56-E8A80C639E13}
{289EAA4A-15E5-4D8B-87B2-0CFFD9510207} = {CD412C3D-63D0-4726-B4C3-FEF701E4DCAF}
{F9A98B7C-B32C-4D3C-99A8-3DD8EA610D3E} = {0E809C1E-CBF5-42E4-8624-2E244A52E422}
{150576C8-A80E-4FB1-9878-37D95A11FAF9} = {79DE0448-1BF2-48D2-8A33-243E402CFD27}
{EF5DDE84-CB4B-48E0-A21B-455849835E66} = {1A9BCE53-79D0-4761-B3A2-6967B610FA94}
{7B455427-859F-4DDA-B67B-510FE450CB35} = {79DE0448-1BF2-48D2-8A33-243E402CFD27}
Expand Down
Loading

0 comments on commit a503989

Please sign in to comment.