-
Notifications
You must be signed in to change notification settings - Fork 14
Consumer
You may consume messages in a couple of different ways. Either via dedicated worker threads, or via a pool of shared threads between multiple queues ConsumerAsync
You'll need the following
- The transport your connecting to, determined by the transport init module you specify when creating the container.
- The connection string to the transport
- The name of the queue
- A delegate that will be called for each message to process
using (var queueContainer = new QueueContainer<QueueInit>())
{
var queueConnection = new QueueConnection(queueName, connectionString);
using (var queue = queueContainer.CreateConsumer(queueConnection))
{
queue.Configuration.Worker.WorkerCount = 4;
queue.Start<SimpleMessage>(Handle);
Console.WriteLine("Processing messages - press any key to stop");
Console.ReadKey((true));
}
}
private void Handle(IReceivedMessage<SimpleMessage> m, IWorkerNotification n)
{
//processing logic goes here
}
When your delgate method finishes with no errors, the message will be acked and considered finished. However, you may wish to check for a condition in your logic
- The queue shutting down. A cancel token is provided for this.
- If the transport supports rollback, you may throw an operation canceled exception to requeue the message
For example, here is how you can check to see if canelazation is requested, and also force a requeue. Note that we are verifying that the transport supports rollbacks first.
if (notifications.TransportSupportsRollback && notifications.WorkerStopping.CancelWorkToken.IsCancellationRequested)
{
notifications.Log.Log(DotNetWorkQueue.Logging.LogLevel.Debug, () => "Cancel has been requested - aborting");
notifications.WorkerStopping.CancelWorkToken.ThrowIfCancellationRequested();
}
You could register a delegate with the Cancel token instead, so that you don't have to constantly check the token throughout your code.
To stop the queue, call dispose on it.
queue.Dispose();
Calling dispose on the queue container will dispose all queues created by that container as well.
Dispose is blocking operation. Depending on your configuration settings and how quickly your message consuming code responds to cancels, it may take a while to return.
For any issues please use the GitHub issues