diff --git a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
index 2826c8179a..3fb1a269ee 100644
--- a/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
+++ b/src/Paramore.Brighter.Extensions.Hosting/TimedOutboxSweeper.cs
@@ -30,12 +30,17 @@ public Task StartAsync(CancellationToken cancellationToken)
{
s_logger.LogInformation("Outbox Sweeper Service is starting.");
- _timer = new Timer(DoWork, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
+ _timer = new Timer(Callback, null, TimeSpan.Zero, TimeSpan.FromSeconds(_options.TimerInterval));
return Task.CompletedTask;
}
- private void DoWork(object state)
+ private async void Callback(object _)
+ {
+ await DoWorkAsync();
+ }
+
+ private async Task DoWorkAsync()
{
var lockId = _distributedLock.ObtainLockAsync(LockingResourceName, CancellationToken.None).Result;
if (lockId != null)
@@ -55,9 +60,9 @@ private void DoWork(object state)
_options.Args);
if (_options.UseBulk)
- outBoxSweeper.SweepAsyncOutbox();
+ await outBoxSweeper.SweepAsyncOutboxAsync();
else
- outBoxSweeper.Sweep();
+ await outBoxSweeper.SweepAsync();
}
catch (Exception e)
{
diff --git a/src/Paramore.Brighter/CommandProcessor.cs b/src/Paramore.Brighter/CommandProcessor.cs
index f74e052749..5736be64a1 100644
--- a/src/Paramore.Brighter/CommandProcessor.cs
+++ b/src/Paramore.Brighter/CommandProcessor.cs
@@ -670,6 +670,19 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona
{
_bus.ClearOutbox(amountToClear, minimumAge, false, false, args);
}
+
+ ///
+ /// Flushes any outstanding message box message to the broker.
+ /// This will be run on a background task.
+ /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/
+ ///
+ /// The maximum number to clear.
+ /// The minimum age to clear in milliseconds.
+ /// Optional bag of arguments required by an outbox implementation to sweep
+ public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null)
+ {
+ return _bus.ClearOutboxAsync(amountToClear, minimumAge, false, false, args);
+ }
///
/// Flushes the message box message given by to the broker.
@@ -702,6 +715,25 @@ public void ClearAsyncOutbox(
{
_bus.ClearOutbox(amountToClear, minimumAge, true, useBulk, args);
}
+
+ ///
+ /// Flushes any outstanding message box message to the broker.
+ /// This will be run on a background task.
+ /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/
+ ///
+ /// The maximum number to clear.
+ /// The minimum age to clear in milliseconds.
+ /// Use the bulk send on the producer.
+ /// Optional bag of arguments required by an outbox implementation to sweep
+ public Task ClearAsyncOutboxAsync(
+ int amountToClear = 100,
+ int minimumAge = 5000,
+ bool useBulk = false,
+ Dictionary args = null
+ )
+ {
+ return _bus.ClearOutboxAsync(amountToClear, minimumAge, true, useBulk, args);
+ }
///
/// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply.
diff --git a/src/Paramore.Brighter/ExternalBusServices.cs b/src/Paramore.Brighter/ExternalBusServices.cs
index d6e7325edd..20ea6dfbfc 100644
--- a/src/Paramore.Brighter/ExternalBusServices.cs
+++ b/src/Paramore.Brighter/ExternalBusServices.cs
@@ -273,6 +273,39 @@ internal void ClearOutbox(int amountToClear, int minimumAge, bool useAsync, bool
Task.Run(() => BackgroundDispatchUsingSync(amountToClear, minimumAge, args));
}
}
+
+ ///
+ /// This is the clear outbox for implicit clearing of messages.
+ ///
+ /// Maximum number to clear.
+ /// The minimum age of messages to be cleared in milliseconds.
+ /// Use the Async outbox and Producer
+ /// Use bulk sending capability of the message producer, this must be paired with useAsync.
+ /// Optional bag of arguments required by an outbox implementation to sweep
+ internal Task ClearOutboxAsync(int amountToClear, int minimumAge, bool useAsync, bool useBulk, Dictionary args = null)
+ {
+ var span = Activity.Current;
+ span?.AddTag("amountToClear", amountToClear);
+ span?.AddTag("minimumAge", minimumAge);
+ span?.AddTag("async", useAsync);
+ span?.AddTag("bulk", useBulk);
+
+ if (useAsync)
+ {
+ if (!HasAsyncOutbox())
+ throw new InvalidOperationException("No async outbox defined.");
+
+ return BackgroundDispatchUsingAsync(amountToClear, minimumAge, useBulk, args);
+ }
+
+ else
+ {
+ if (!HasOutbox())
+ throw new InvalidOperationException("No outbox defined.");
+
+ return BackgroundDispatchUsingSync(amountToClear, minimumAge, args);
+ }
+ }
private async Task BackgroundDispatchUsingSync(int amountToClear, int minimumAge, Dictionary args)
{
diff --git a/src/Paramore.Brighter/IAmACommandProcessor.cs b/src/Paramore.Brighter/IAmACommandProcessor.cs
index 7f5cd5bb39..18a6ddbc6d 100644
--- a/src/Paramore.Brighter/IAmACommandProcessor.cs
+++ b/src/Paramore.Brighter/IAmACommandProcessor.cs
@@ -156,6 +156,15 @@ Task DepositPostAsync(IEnumerable requests, bool continueOnCapture
/// The minimum age to clear in milliseconds.
/// Optional bag of arguments required by an outbox implementation to sweep
public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null);
+
+ ///
+ /// Flushes any outstanding message box message to the broker.
+ /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/
+ ///
+ /// The maximum number to clear.
+ /// The minimum age to clear in milliseconds.
+ /// Optional bag of arguments required by an outbox implementation to sweep
+ public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null);
///
/// Flushes the message box message given by to the broker.
@@ -173,6 +182,16 @@ Task DepositPostAsync(IEnumerable requests, bool continueOnCapture
/// Use the bulk send on the producer.
/// Optional bag of arguments required by an outbox implementation to sweep
public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null);
+
+ ///
+ /// Flushes any outstanding message box message to the broker.
+ /// Intended for use with the Outbox pattern: http://gistlabs.com/2014/05/the-outbox/
+ ///
+ /// The maximum number to clear.
+ /// The minimum age to clear in milliseconds.
+ /// Use the bulk send on the producer.
+ /// Optional bag of arguments required by an outbox implementation to sweep
+ public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null);
///
/// Uses the Request-Reply messaging approach to send a message to another server and block awaiting a reply.
diff --git a/src/Paramore.Brighter/OutboxSweeper.cs b/src/Paramore.Brighter/OutboxSweeper.cs
index bb530b43cb..673f1d9890 100644
--- a/src/Paramore.Brighter/OutboxSweeper.cs
+++ b/src/Paramore.Brighter/OutboxSweeper.cs
@@ -1,5 +1,6 @@
using System.Collections.Generic;
using System.Diagnostics;
+using System.Threading.Tasks;
namespace Paramore.Brighter
{
@@ -52,5 +53,23 @@ public void SweepAsyncOutbox()
ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
_commandProcessor.ClearAsyncOutbox(_batchSize, _millisecondsSinceSent, _useBulk, _args);
}
+
+ ///
+ /// Dispatches the oldest un-dispatched messages from the outbox in a background thread.
+ ///
+ public Task SweepAsync()
+ {
+ ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
+ return _commandProcessor.ClearOutboxAsync(_batchSize, _millisecondsSinceSent, _args);
+ }
+
+ ///
+ /// Dispatches the oldest un-dispatched messages from the asynchronous outbox in a background thread.
+ ///
+ public Task SweepAsyncOutboxAsync()
+ {
+ ApplicationTelemetry.ActivitySource.StartActivity(IMPLICITCLEAROUTBOX, ActivityKind.Server);
+ return _commandProcessor.ClearAsyncOutboxAsync(_batchSize, _millisecondsSinceSent, _useBulk, _args);
+ }
}
}
diff --git a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs
index edeae3a976..9d21338577 100644
--- a/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs
+++ b/tests/Paramore.Brighter.Core.Tests/MessageDispatch/TestDoubles/SpyCommandProcessor.cs
@@ -171,6 +171,13 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona
ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args });
}
+ public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000,
+ Dictionary args = null)
+ {
+ ClearOutbox(amountToClear, minimumAge, args);
+ return Task.CompletedTask;
+ }
+
public async Task ClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
{
@@ -187,6 +194,13 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo
ClearParamsList.Add(new ClearParams { AmountToClear = amountToClear, MinimumAge = minimumAge, Args = args });
}
+ public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false,
+ Dictionary args = null)
+ {
+ ClearAsyncOutbox(amountToClear, minimumAge, useBulk, args);
+ return Task.CompletedTask;
+ }
+
public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)
{
diff --git a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs
index b7d5a82f85..d23f2a9bbf 100644
--- a/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs
+++ b/tests/Paramore.Brighter.InMemory.Tests/TestDoubles/FakeCommandProcessor.cs
@@ -136,6 +136,12 @@ public void ClearOutbox(int amountToClear = 100, int minimumAge = 5000, Dictiona
ClearOutbox(depositedMessages);
}
+
+ public Task ClearOutboxAsync(int amountToClear = 100, int minimumAge = 5000, Dictionary args = null)
+ {
+ ClearOutbox(amountToClear, minimumAge, args);
+ return Task.CompletedTask;
+ }
public Task ClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false, CancellationToken cancellationToken = default)
{
@@ -155,6 +161,12 @@ public void ClearAsyncOutbox(int amountToClear = 100, int minimumAge = 5000, boo
{
ClearOutbox(amountToClear, minimumAge);
}
+
+ public Task ClearAsyncOutboxAsync(int amountToClear = 100, int minimumAge = 5000, bool useBulk = false, Dictionary args = null)
+ {
+ ClearOutbox(amountToClear, minimumAge);
+ return Task.CompletedTask;
+ }
public Task BulkClearOutboxAsync(IEnumerable posts, bool continueOnCapturedContext = false,
CancellationToken cancellationToken = default)