Skip to content

Commit

Permalink
refactor: Clean most warning and messages (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemaitre-aneo authored Aug 11, 2023
2 parents a9bd09a + e582dc0 commit fa68172
Show file tree
Hide file tree
Showing 121 changed files with 1,592 additions and 1,551 deletions.
42 changes: 17 additions & 25 deletions Adaptors/Amqp/src/ConnectionAmqp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,18 @@ public ConnectionAmqp(QueueCommon.Amqp options,
public Connection? Connection { get; private set; }

public Task<HealthCheckResult> Check(HealthCheckTag tag)
{
switch (tag)
{
case HealthCheckTag.Startup:
case HealthCheckTag.Readiness:
return Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy($"{nameof(ConnectionAmqp)} is not yet initialized."));
case HealthCheckTag.Liveness:
return Task.FromResult(isInitialized_ && Connection is not null && Connection.ConnectionState == ConnectionState.Opened
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy($"{nameof(ConnectionAmqp)} not initialized or connection dropped."));
default:
throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null);
}
}
=> tag switch
{
HealthCheckTag.Startup or HealthCheckTag.Readiness => Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy($"{nameof(ConnectionAmqp)} is not yet initialized.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && Connection is not null && Connection.ConnectionState == ConnectionState.Opened
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy($"{nameof(ConnectionAmqp)} not initialized or connection dropped.")),
_ => throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null),
};

public async Task Init(CancellationToken cancellationToken = default)
=> await connectionTask_;
Expand Down Expand Up @@ -113,9 +107,8 @@ private static async Task InitTask(ConnectionAmqp conn,
{
conn.Connection = await connectionFactory.CreateAsync(address)
.ConfigureAwait(false);
conn.Connection.AddClosedCallback((x,
e) => OnCloseConnection(x,
e,
conn.Connection.AddClosedCallback((_,
e) => OnCloseConnection(e,
conn.logger_));
break;
}
Expand All @@ -137,17 +130,16 @@ await Task.Delay(1000 * retry,
conn.isInitialized_ = true;
}

private static void OnCloseConnection(IAmqpObject sender,
Error? error,
ILogger logger)
private static void OnCloseConnection(Error? error,
ILogger logger)
{
if (error == null)
{
logger.LogInformation("AMQP Connection closed with no error");
}
else
{
logger.LogWarning("AMQP Connection closed with error: {0}",
logger.LogWarning("AMQP Connection closed with error: {error}",
error.ToString());
}
}
Expand Down
9 changes: 4 additions & 5 deletions Adaptors/Amqp/src/PullQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ await ConnectionAmqp.Init(cancellationToken)
$"{Options.PartitionId}###q{i}");
/* linkCredit_: the maximum number of messages the
* remote peer can send to the receiver.
* With the goal of minimizing/deactivating
* prefetching, a value of 1 gave us the desired
* behavior. We pick a default value of 2 to have "some cache". */
* remote peer can send to the receiver.
* With the goal of minimizing/deactivating
* prefetching, a value of 1 gave us the desired
* behavior. We pick a default value of 2 to have "some cache". */
rl.SetCredit(Options.LinkCredit);
return rl;
}))
Expand Down Expand Up @@ -141,7 +141,6 @@ public async IAsyncEnumerable<IQueueMessageHandler> PullMessagesAsync(int
sender,
receiver,
Encoding.UTF8.GetString(message.Body as byte[] ?? throw new InvalidOperationException("Error while deserializing message")),
logger_,
cancellationToken);

break;
Expand Down
5 changes: 0 additions & 5 deletions Adaptors/Amqp/src/QueueMessageHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,10 @@

using ArmoniK.Core.Base;

using Microsoft.Extensions.Logging;

namespace ArmoniK.Core.Adapters.Amqp;

public class QueueMessageHandler : IQueueMessageHandler
{
private readonly ILogger logger_;
private readonly Message message_;
private readonly IReceiverLink receiver_;
private readonly ISenderLink sender_;
Expand All @@ -39,13 +36,11 @@ public QueueMessageHandler(Message message,
ISenderLink sender,
IReceiverLink receiver,
string taskId,
ILogger logger,
CancellationToken cancellationToken)
{
message_ = message;
sender_ = sender;
receiver_ = receiver;
logger_ = logger;
TaskId = taskId;
CancellationToken = cancellationToken;
ReceptionDateTime = DateTime.UtcNow;
Expand Down
42 changes: 21 additions & 21 deletions Adaptors/Amqp/tests/ArmoniK.Core.Adapters.Amqp.Tests.csproj
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<IsPackable>false</IsPackable>
<Nullable>enable</Nullable>
</PropertyGroup>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<DebugType>Embedded</DebugType>
<IncludeSymbols>true</IncludeSymbols>
<DefineConstants>DEBUG;TRACE</DefineConstants>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<DebugType>Embedded</DebugType>
<IncludeSymbols>true</IncludeSymbols>
<DefineConstants>DEBUG;TRACE</DefineConstants>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.3" />
</ItemGroup>

<ItemGroup>
<AssemblyAttribute Include="System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverageAttribute" />
</ItemGroup>
<ItemGroup>
<AssemblyAttribute Include="System.Diagnostics.CodeAnalysis.ExcludeFromCodeCoverageAttribute" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Common\tests\ArmoniK.Core.Common.Tests.csproj" />
<ProjectReference Include="..\src\ArmoniK.Core.Adapters.Amqp.csproj" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\Common\tests\ArmoniK.Core.Common.Tests.csproj" />
<ProjectReference Include="..\src\ArmoniK.Core.Adapters.Amqp.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\Common\src\ArmoniK.Core.Common.csproj"/>
<ProjectReference Include="..\..\..\Common\src\ArmoniK.Core.Common.csproj" />
</ItemGroup>

</Project>
30 changes: 12 additions & 18 deletions Adaptors/LocalStorage/src/ObjectStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,18 @@ public Task Init(CancellationToken cancellationToken)

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
{
switch (tag)
{
case HealthCheckTag.Startup:
case HealthCheckTag.Readiness:
return Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Local storage not initialized yet."));
case HealthCheckTag.Liveness:
return Task.FromResult(isInitialized_ && Directory.Exists(path_)
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Local storage not initialized or folder has been deleted."));
default:
throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null);
}
}
=> tag switch
{
HealthCheckTag.Startup or HealthCheckTag.Readiness => Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Local storage not initialized yet.")),
HealthCheckTag.Liveness => Task.FromResult(isInitialized_ && Directory.Exists(path_)
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy("Local storage not initialized or folder has been deleted.")),
_ => throw new ArgumentOutOfRangeException(nameof(tag),
tag,
null),
};

/// <inheritdoc />
public async Task AddOrUpdateAsync(string key,
Expand Down
2 changes: 1 addition & 1 deletion Adaptors/LocalStorage/tests/ObjectStorageTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public override void TearDown()
RunTests = false;
}

public override void GetObjectStorageInstance()
protected override void GetObjectStorageInstance()
{
var rootPath = Path.Combine(Path.GetTempPath(),
$"ArmoniK.{Environment.ProcessId}");
Expand Down
8 changes: 2 additions & 6 deletions Adaptors/Memory/src/PushQueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,12 @@ public async Task PushMessagesAsync(IEnumerable<MessageData> messages,
{
var priorityGroups = messages.GroupBy(msgData => msgData.Options.Priority);
await Task.WhenAll(priorityGroups.Select(group => PushMessagesAsync(group,
partitionId,
group.Key,
cancellationToken)))
group.Key)))
.ConfigureAwait(false);
}

private Task PushMessagesAsync(IEnumerable<MessageData> messages,
string partitionId,
int priority = 1,
CancellationToken cancellationToken = default)
int priority = 1)
{
var messageHandlers = messages.Select(message => new MessageHandler
{
Expand Down
20 changes: 9 additions & 11 deletions Adaptors/Memory/src/QueueStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

namespace ArmoniK.Core.Adapters.Memory;

public class QueueStorage : IQueueStorage
public class QueueStorage : IPullQueueStorage, IPushQueueStorage
{
private static readonly MessageHandler DefaultMessage = new();

Expand All @@ -41,10 +41,6 @@ public class QueueStorage : IQueueStorage

private readonly SortedList<MessageHandler, MessageHandler> queues_ = new(MessageComparer.Instance);

/// <inheritdoc />
public string PartitionId
=> "";

/// <inheritdoc />
public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(HealthCheckResult.Healthy());
Expand All @@ -57,6 +53,7 @@ public Task Init(CancellationToken cancellationToken)
public int MaxPriority
=> 100;


/// <inheritdoc />
public async IAsyncEnumerable<IQueueMessageHandler> PullMessagesAsync(int nbMessages,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -96,16 +93,17 @@ await message.Semaphore.WaitAsync(CancellationToken.None)
}

/// <inheritdoc />
public Task PushMessagesAsync(IEnumerable<string> messages,
string partitionId,
int priority = 1,
CancellationToken cancellationToken = default)
public Task PushMessagesAsync(IEnumerable<MessageData> messages,
string _,
CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

var messageHandlers = messages.Select(message => new MessageHandler
{
IsVisible = true,
Priority = priority,
TaskId = message,
Priority = message.Options.Priority,
TaskId = message.TaskId,
CancellationToken = CancellationToken.None,
Status = QueueMessageStatus.Waiting,
Queues = queues_,
Expand Down
10 changes: 5 additions & 5 deletions Adaptors/Memory/src/ResultTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -302,18 +302,18 @@ public Task SetTaskOwnership(string sessi
throw new SessionNotFoundException($"Session '{session}' not found");
}

foreach (var req in requests)
foreach (var (resultId, taskId) in requests)
{
if (!session.TryGetValue(req.resultId,
if (!session.TryGetValue(resultId,
out var result))
{
throw new ResultNotFoundException($"Key '{req.resultId}' not found");
throw new ResultNotFoundException($"Key '{resultId}' not found");
}

session.TryUpdate(req.resultId,
session.TryUpdate(resultId,
result with
{
OwnerTaskId = req.taskId,
OwnerTaskId = taskId,
},
result);
}
Expand Down
41 changes: 7 additions & 34 deletions Adaptors/Memory/src/TaskTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,10 @@ public Task CreateTasks(IEnumerable<TaskData> tasks,
public Task<TaskData> ReadTaskAsync(string taskId,
CancellationToken cancellationToken = default)
{
if (taskId2TaskData_.ContainsKey(taskId))
if (taskId2TaskData_.TryGetValue(taskId,
out var value))
{
return Task.FromResult(taskId2TaskData_[taskId]);
return Task.FromResult(value);
}

throw new TaskNotFoundException($"Key '{taskId}' not found");
Expand All @@ -93,26 +94,21 @@ public Task<TaskData> ReadTaskAsync(string taskId,
public Task<bool> IsTaskCancelledAsync(string taskId,
CancellationToken cancellationToken = default)
{
if (!taskId2TaskData_.ContainsKey(taskId))
if (!taskId2TaskData_.TryGetValue(taskId,
out var value))
{
throw new TaskNotFoundException($"Key '{taskId}' not found");
}

return Task.FromResult(taskId2TaskData_[taskId]
.Status is TaskStatus.Cancelling or TaskStatus.Cancelled);
return Task.FromResult(value.Status is TaskStatus.Cancelling or TaskStatus.Cancelled);
}

/// <inheritdoc />
public Task StartTask(TaskData taskData,
CancellationToken cancellationToken = default)
{
if (!taskId2TaskData_.ContainsKey(taskData.TaskId))
{
throw new TaskNotFoundException($"Key '{taskData.TaskId}' not found");
}

taskId2TaskData_.AddOrUpdate(taskData.TaskId,
_ => throw new InvalidOperationException("The task does not exist."),
_ => throw new TaskNotFoundException($"Key '{taskData.TaskId}' not found"),
(_,
data) => data with
{
Expand Down Expand Up @@ -504,27 +500,4 @@ public Task<HealthCheckResult> Check(HealthCheckTag tag)
=> Task.FromResult(isInitialized_
? HealthCheckResult.Healthy()
: HealthCheckResult.Unhealthy());

private bool UpdateTaskToSubmitted(string id)
{
var updated = false;
taskId2TaskData_.AddOrUpdate(id,
_ => throw new InvalidOperationException("The task does not exist."),
(_,
data) =>
{
if (data.Status != TaskStatus.Creating)
{
return data;
}
updated = true;
return data with
{
Status = TaskStatus.Submitted,
SubmittedDate = DateTime.UtcNow,
};
});
return updated;
}
}
Loading

0 comments on commit fa68172

Please sign in to comment.