Skip to content

how it works channels

Shaylen Reddy edited this page Oct 8, 2023 · 3 revisions

Channels

This [according to the docs] allows you to asynchronously pass data from producers to consumers

The way I take advantage of this feature is to offload processing from a request to a hosted [background] service

The benefit here is a quicker response to the user who has no clue what's going on, while improving the user experience

There's two places where this is implemented:

  • To send a receipt to a customer upon a successfully placed order [SendReceiptChannel]
  • To offload the publishing of an update from a request to a background service [PublishUpdateChannel]

And one place where it isn't:

  • To send a password reset token to the customer

Flowchart

The idea in terms of a flowchart is pretty simple

flowchart LR
    Producer --> |Data| Channel --> |Data| Consumer

Loading

Example

Here, I'll showcase the code for the PublishUpdateChannel

Firstly, it's added as a Singleton as part of adding unpublished updates management, this will be discussed in a later wiki page

services.AddSingleton<PublishUpdateChannel>();

Here's the code for creating the Channel

channel =
    Channel.CreateBounded<(BaseMessage, string)>(
        new BoundedChannelOptions(capacity: 250)
        {
            SingleWriter = false,
            SingleReader = true
        });

The data passed through is a tuple with the update [BaseMessage] and destination [string] (BaseMessage, string)

Here's the method to write data through the channel

public async Task<bool> WriteToChannelAsync(BaseMessage message, string destination)
{
    while (await channel.Writer.WaitToWriteAsync())
    {
        if (channel.Writer.TryWrite((message, destination)) is true)
        {
            logger.LogInformation("Channel => Update has been written to the channel for publishing");

            return true;
        }
    }

    return false;
}

As to why you have to wait to write, is because of the channel's capacity

If it reaches capacity, there will be added back pressure and only when the channel has capacity, the data is written

Here's the code for reading from the channel

public IAsyncEnumerable<(BaseMessage, string)> ReadAllFromChannelAsync() =>
    channel.Reader.ReadAllAsync();

Since data is sent asynchronously, we have no idea how much or how little data is going to be written to the channel

So one after the other the data is read

Here's an example of a producer writing to the channel

var baseMessage = new BaseMessage
{
    TraceId = Activity.Current!.TraceId.ToString(),
    SpanId = Activity.Current!.SpanId.ToString(),
    AccessToken = HttpContext.Request.Headers.Authorization[0]!.Replace("Bearer ", ""),
    SerializedModel = JsonSerializer.SerializeToUtf8Bytes(updateAccountModel),
    IdOfEntityToUpdate = id
};

var configurationKeyForDestination = environment.IsDevelopment()
                                   ? "RabbitMQ:Exchanges:UpdateAccount"
                                   : "AzureServiceBus:Topics:UpdateAccount";

await publishUpdateChannel.WriteToChannelAsync(baseMessage, configuration[configurationKeyForDestination]!);

When it comes to reading from the channel, a background service has to be created, in this case it's the PublishUpdateChannelReaderBackgroundService [properly named 😂]

Since the reader is returning an IAsyncEnumerable, an awaited foreach loop is used

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await foreach (var (message, destination) in publishUpdateChannel.ReadAllFromChannelAsync())
    {
        // Process data
    }
}
Clone this wiki locally