Skip to content

how it works unpublished updates management

Shaylen Reddy edited this page Feb 18, 2024 · 3 revisions

Unpublished Updates Management

Though the original commit that added this here as well as the code comments explain it well, it'll still be a topic in the wiki

How This Solution Came About

The idea for this solution was born from a little bit of laziness

Laziness ??

Yes, LAZINESS, to start up the services provided through Docker such as Elasticsearch, Kibana, and RabbitMQ

And to this day, whenever I run the solution, I never ever start that up 😂

So what would happen?

It's obviously a mandatory dependency for any microservice that publishes updates as well as for those subscribing to the updates

Whenever an update was made that required publishing, a 500 Internal Server Error would be returned

Not the best thing to happen, and also, updates do get lost that way

One [actually an entire universe] can claim that error handling can occur right there within the request flow

This does cause a slight problem though, a slower response from the API as well as a reduced user experience

This was when the idea of passing it through a channel to be handled elsewhere came up

Now the first piece of the solution, the PublishUpdateChannel

With that, since channels are used, and to read data from the channel requires using an IAsyncEnumerable, a background service is created for that purpose

Enters the second piece of the solution, the PublishUpdateChannelReaderBackgroundService

Now, the third piece, the publisher interface IMessagePublisher

The method signature of the AddUnpublishedUpdatesManagement is this:

public static IServiceCollection AddUnpublishedUpdatesManagement<TMessagePublisherImplementation>(
    this IServiceCollection services,
    string databaseConnectionString) where TMessagePublisherImplementation : class, IMessagePublisher
{
    // ...
}

With TMessagePublisherImplementation being passed in, there's a possibility of having various implementations of the interface, however, two is used in this solution

  • RabbitMQ [RabbitMQPublisher]
  • Azure Service Bus [AzureServiceBusPublisher]

Moving to the forth piece of this solution, where exactly will the updates be stored if they are not published the first time ??

Database

The UnpublishedUpdateDbContext containing only one DbSet UnpublishedUpdate

The update is wrapped and stored as an EncodedUpdate

Here's the code snippet for that:

var unpublishedUpdate = new UnpublishedUpdate
{
    EncodedUpdate = Base64UrlEncoder.Encode(JsonSerializer.SerializeToUtf8Bytes(message)),
    Destination = destination
};

Now, with every DbContext comes a repository, the fifth piece for this solution, the IUnpublishedUpdateRepository with an implementation, UnpublishedUpdateRepository

And finally, something needs to periodically check the database for unpublished updates and retry them

The sixth piece, RetryUnpublishedUpdatesWorker added as a background service

This is the full solution added to the services collection

public static IServiceCollection AddUnpublishedUpdatesManagement<TMessagePublisherImplementation>(
    this IServiceCollection services,
    string databaseConnectionString) where TMessagePublisherImplementation : class, IMessagePublisher
{
    services.AddDbContext<UnpublishedUpdateDbContext>(options =>
        options.UseSqlServer(databaseConnectionString, sqlServerOptions =>
        {
            sqlServerOptions.MigrationsAssembly(typeof(UnpublishedUpdateDbContext).Assembly.GetName().Name);
            sqlServerOptions.EnableRetryOnFailure(maxRetryCount: 5);
        }));

    services.AddScoped<IUnpublishedUpdateRepository, UnpublishedUpdateRepository>();

    services.AddSingleton<IMessagePublisher, TMessagePublisherImplementation>();

    services.AddSingleton<PublishUpdateChannel>();
    services.AddHostedService<PublishUpdateChannelReaderBackgroundService>();
    services.AddHostedService<RetryUnpublishedUpdatesWorker>();

    return services;
}

Exploring The Flow In Code

From the producer's side, the BaseMessage is formed and sent through the channel

var baseMessage = new BaseMessage
{
    TraceId = Activity.Current!.TraceId.ToString(),
    SpanId = Activity.Current!.SpanId.ToString(),
    AccessToken = HttpContext.Request.Headers.Authorization[0]!.Replace("Bearer ", ""),
    SerializedModel = JsonSerializer.SerializeToUtf8Bytes(updateAccountModel),
    IdOfEntityToUpdate = id
};

var configurationKeyForDestination = environment.IsDevelopment()
                                   ? "RabbitMQ:Exchanges:UpdateAccount"
                                   : "AzureServiceBus:Topics:UpdateAccount";

await publishUpdateChannel.WriteToChannelAsync(baseMessage, configuration[configurationKeyForDestination]!);

On the consumer end of the channel, the message is received and a publish is attempted, if an exception is thrown, it'll be caught and regardless of the error, the unpublished update will be encoded and written to the database

Here's that code

// Shortened for brevity
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    await foreach (var (message, destination) in publishUpdateChannel.ReadAllFromChannelAsync())
    {
        message.StartANewActivity("Attempting to publish update");

        try
        {
            await messagingServicePublisher.PublishMessageAsync(message, destination);
        }
        catch (Exception ex)
        {
            logger.LogError(
                ex,
                "{announcement}: Attempt to publish update to {publishDestination} destination was unsuccessful, writing to the database to try again later",
                "FAILED", destination);

            using var scope = serviceScopeFactory.CreateScope();

            var unpublishedUpdateRepository = scope.ServiceProvider.GetService<IUnpublishedUpdateRepository>();

            var unpublishedUpdate = new UnpublishedUpdate
            {
                EncodedUpdate = Base64UrlEncoder.Encode(JsonSerializer.SerializeToUtf8Bytes(message)),
                Destination = destination
            };

            await unpublishedUpdateRepository!.CreateAsync(unpublishedUpdate);

            await unpublishedUpdateRepository.SaveChangesAsync();
        }
    }
}

Since a HostedService is a singleton and the IUnpublishedUpdateRepository is a scoped service, the IServiceScopeFactory is used to create a scope that lives only in - and is disposed off after - the catch block

For the retry worker, here's that code

// Shortened for brevity
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (!stoppingToken.IsCancellationRequested)
    {
        using var scope = serviceScopeFactory.CreateScope();

        var unpublishedUpdateRepository = scope.ServiceProvider.GetRequiredService<IUnpublishedUpdateRepository>();

        var unpublishedUpdates = await unpublishedUpdateRepository.RetrieveAllAsync();

        unpublishedUpdates.ForEach(unpublishedUpdate =>
        {
            unpublishedUpdate.Retries++;

            var message = JsonSerializer.Deserialize<BaseMessage>(Base64UrlEncoder.DecodeBytes(unpublishedUpdate.EncodedUpdate))
                       ?? throw new InvalidOperationException("message cannot be null");

            message.StartANewActivity("Retrying to publish update");

            try
            {
                messagingServicePublisher.PublishMessageAsync(message, unpublishedUpdate.Destination);

                logger.LogInformation(
                    "Worker => Unpublished update was published successfully to {destination} after {retries} retries",
                    unpublishedUpdate.Destination, unpublishedUpdate.Retries);

                unpublishedUpdateRepository.DeleteAsync(unpublishedUpdate);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "The message bus is unavailable");
            }
        });

        await unpublishedUpdateRepository.SaveChangesAsync();

        Thread.Sleep(TimeSpan.FromMinutes(10));
    }
}

The flow here is:

  • Use the IServiceScopeFactory to create a scope
  • Get the IUnpublishedUpdateRepository
  • Retrieve all the unpublished updates
  • Go through each update and increment the number of retries [this is tracked in the EF Core change tracker]
  • Decode the update
  • Retry publishing of the update, and if it succeeds, it's deleted from the database [tracked via the EF Core change tracker], but if it doesn't, nothing really happens
  • Any changes in the EF Core change tracker is then persisted to the database

Almost forgot, the UnpublishedUpdateDbContext is migrated every time the application starts up, NOT a good practice but makes things a little easier

await app.MigrateUnpublishedUpdatesManagementDatabaseAsync();

No visual for this one, it's not as straight-forward as the others unfortunately 😞

Clone this wiki locally