Skip to content

Commit

Permalink
Allow the Outbox Sweeper to wait for clearing so that the lock can wo…
Browse files Browse the repository at this point in the history
…rk (#3334)

Task #3321
  • Loading branch information
preardon authored Oct 1, 2024
1 parent 69b00ff commit d7ffd74
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 4 deletions.
13 changes: 9 additions & 4 deletions src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,17 @@ public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Sweeper Service is starting.");

_timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
_timer = new Timer(Callback, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));

return Task.CompletedTask;
}

private void DoWork(object state)
private async void Callback(object _)
{
await DoWorkAsync();
}

private async Task DoWorkAsync()
{
var lockId = _distributedLock.ObtainLockAsync(LockingResourceName, CancellationToken.None).Result;
if (lockId != null)
Expand All @@ -55,9 +60,9 @@ private void DoWork(object state)
_options.Args);

if (_options.UseBulk)
outBoxSweeper.SweepAsyncOutbox();
await outBoxSweeper.SweepAsyncOutboxAsync();
else
outBoxSweeper.Sweep();
await outBoxSweeper.SweepAsync();
}
catch (Exception e)
{
Expand Down
32 changes: 32 additions & 0 deletions src/Paramore.Brighter/CommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,19 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona
{
_bus.ClearOutbox(amountToClear, minimumAge, false, false, args);
}

/// <summary>
/// Flushes any outstanding message box message to the broker.
/// This will be run on a background task.
/// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ <see cref="DepositPostBox"/>
/// </summary>
/// <param name="amountToClear">The maximum number to clear.</param>
/// <param name="minimumAge">The minimum age to clear in milliseconds.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary<string, object> args = null)
{
return _bus.ClearOutboxAsync(amountToClear, minimumAge, false, false, args);
}

/// <summary>
/// Flushes the message box message given by <param name="posts"> to the broker.
Expand Down Expand Up @@ -702,6 +715,25 @@ public void ClearAsyncOutbox(
{
_bus.ClearOutbox(amountToClear, minimumAge, true, useBulk, args);
}

/// <summary>
/// Flushes any outstanding message box message to the broker.
/// This will be run on a background task.
/// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ <see cref="DepositPostBoxAsync"/>
/// </summary>
/// <param name="amountToClear">The maximum number to clear.</param>
/// <param name="minimumAge">The minimum age to clear in milliseconds.</param>
/// <param name="useBulk">Use the bulk send on the producer.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public Task ClearAsyncOutboxAsync(
int amountToClear = 100,
int minimumAge = 5000,
bool useBulk = false,
Dictionary<string, object> args = null
)
{
return _bus.ClearOutboxAsync(amountToClear, minimumAge, true, useBulk, args);
}

/// <summary>
/// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply.
Expand Down
33 changes: 33 additions & 0 deletions src/Paramore.Brighter/ExternalBusServices.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,39 @@ internal void ClearOutbox(int amountToClear, int minimumAge, bool useAsync, bool
Task.Run(() => BackgroundDispatchUsingSync(amountToClear, minimumAge, args));
}
}

/// <summary>
/// This is the clear outbox for implicit clearing of messages.
/// </summary>
/// <param name="amountToClear">Maximum number to clear.</param>
/// <param name="minimumAge">The minimum age of messages to be cleared in milliseconds.</param>
/// <param name="useAsync">Use the Async outbox and Producer</param>
/// <param name="useBulk">Use bulk sending capability of the message producer, this must be paired with useAsync.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
internal Task ClearOutboxAsync(int amountToClear, int minimumAge, bool useAsync, bool useBulk, Dictionary<string, object> args = null)
{
var span = Activity.Current;
span?.AddTag("amountToClear", amountToClear);
span?.AddTag("minimumAge", minimumAge);
span?.AddTag("async", useAsync);
span?.AddTag("bulk", useBulk);

if (useAsync)
{
if (!HasAsyncOutbox())
throw new InvalidOperationException("No async outbox defined.");

return BackgroundDispatchUsingAsync(amountToClear, minimumAge, useBulk, args);
}

else
{
if (!HasOutbox())
throw new InvalidOperationException("No outbox defined.");

return BackgroundDispatchUsingSync(amountToClear, minimumAge, args);
}
}

private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge, Dictionary<string, object> args)
{
Expand Down
19 changes: 19 additions & 0 deletions src/Paramore.Brighter/IAmACommandProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,15 @@ Task<Guid[]> DepositPostAsync<T>(IEnumerable<T> requests, bool continueOnCapture
/// <param name="minimumAge">The minimum age to clear in milliseconds.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictionary<string, object> args = null);

/// <summary>
/// Flushes any outstanding message box message to the broker.
/// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ <see cref="DepositPostBox"/>
/// </summary>
/// <param name="amountToClear">The maximum number to clear.</param>
/// <param name="minimumAge">The minimum age to clear in milliseconds.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary<string, object> args = null);

/// <summary>
/// Flushes the message box message given by <param name="posts"> to the broker.
Expand All @@ -173,6 +182,16 @@ Task<Guid[]> DepositPostAsync<T>(IEnumerable<T> requests, bool continueOnCapture
/// <param name="useBulk">Use the bulk send on the producer.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary<string, object> args = null);

/// <summary>
/// Flushes any outstanding message box message to the broker.
/// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/ <see cref="DepositPostBoxAsync"/>
/// </summary>
/// <param name="amountToClear">The maximum number to clear.</param>
/// <param name="minimumAge">The minimum age to clear in milliseconds.</param>
/// <param name="useBulk">Use the bulk send on the producer.</param>
/// <param name="args">Optional bag of arguments required by an outbox implementation to sweep</param>
public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary<string, object> args = null);

/// <summary>
/// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply.
Expand Down
19 changes: 19 additions & 0 deletions src/Paramore.Brighter/OutboxSweeper.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;

namespace Paramore.Brighter
{
Expand Down Expand Up @@ -52,5 +53,23 @@ public void SweepAsyncOutbox()
ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
_commandProcessor.ClearAsyncOutbox(_batchSize, _millisecondsSinceSent, _useBulk, _args);
}

/// <summary>
/// Dispatches the oldest un-dispatched messages from the outbox in a background thread.
/// </summary>
public Task SweepAsync()
{
ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
return _commandProcessor.ClearOutboxAsync(_batchSize, _millisecondsSinceSent, _args);
}

/// <summary>
/// Dispatches the oldest un-dispatched messages from the asynchronous outbox in a background thread.
/// </summary>
public Task SweepAsyncOutboxAsync()
{
ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
return _commandProcessor.ClearAsyncOutboxAsync(_batchSize, _millisecondsSinceSent, _useBulk, _args);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,13 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona
ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args });
}

public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000,
Dictionary<string, object> args = null)
{
ClearOutbox(amountToClear, minimumAge, args);
return Task.CompletedTask;
}

public async Task ClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
{
Expand All @@ -187,6 +194,13 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo
ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args });
}

public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false,
Dictionary<string, object> args = null)
{
ClearAsyncOutbox(amountToClear, minimumAge, useBulk, args);
return Task.CompletedTask;
}

public Task BulkClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona

ClearOutbox(depositedMessages);
}

public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary<string, object> args = null)
{
ClearOutbox(amountToClear, minimumAge, args);
return Task.CompletedTask;
}

public Task ClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default)
{
Expand All @@ -155,6 +161,12 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo
{
ClearOutbox(amountToClear, minimumAge);
}

public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary<string, object> args = null)
{
ClearOutbox(amountToClear, minimumAge);
return Task.CompletedTask;
}

public Task BulkClearOutboxAsync(IEnumerable<Guid> posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
Expand Down

0 comments on commit d7ffd74

Please sign in to comment.