Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Publisher TopicAddress customizer #1625

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<Copyright>Copyright (c) Just Eat 2015-$([System.DateTime]::Now.ToString(yyyy))</Copyright>
<Deterministic>true</Deterministic>
<Description>A light-weight message bus on top of AWS SNS and SQS</Description>
<MinVerMinimumMajorMinor>7.2</MinVerMinimumMajorMinor>
<MinVerMinimumMajorMinor>7.3</MinVerMinimumMajorMinor>
<MinVerTagPrefix>v</MinVerTagPrefix>
<MinVerSkip Condition=" '$(Configuration)' == 'Debug' ">true</MinVerSkip>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
Expand Down
15 changes: 13 additions & 2 deletions build.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,16 @@ function DotNetTest {
}
}

function Get-ContainerRunnerCommand() {
$commands = @('docker', 'podman')

foreach ($command in $commands) {
if (Get-Command $command -ErrorAction SilentlyContinue) {
return $command
}
}
}

Write-Host "Creating packages..." -ForegroundColor Green

ForEach ($libraryProject in $libraryProjects) {
Expand All @@ -126,8 +136,9 @@ ForEach ($libraryProject in $libraryProjects) {
if (($null -ne $env:CI) -And ($EnableIntegrationTests -eq $true)) {
$LocalStackImage = "localstack/localstack:3.5.0"
$LocalStackPort = "4566"
& docker pull --quiet $LocalStackImage
& docker run --detach --name localstack --publish "${LocalStackPort}:${LocalStackPort}" $LocalStackImage
$containerRunner = Get-ContainerRunnerCommand
& $containerRunner pull --quiet $LocalStackImage
& $containerRunner run --detach --name localstack --publish "${LocalStackPort}:${LocalStackPort}" $LocalStackImage
$env:AWS_SERVICE_URL = "http://localhost:$LocalStackPort"
}

Expand Down
17 changes: 17 additions & 0 deletions src/JustSaying/Extensions/SemaphoreSlimExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@

internal static class SemaphoreSlimExtensions
{
public static async Task<IDisposable> WaitScopedAsync(this SemaphoreSlim semaphoreSlim, CancellationToken cancellationToken)
{
await semaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
return new LockLifetime(semaphoreSlim);
}

private sealed class LockLifetime(SemaphoreSlim lockSemaphore) : IDisposable
{
public void Dispose()
{
lockSemaphore.Release();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.Release(1) was redundant, we can just .Release()

}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
using System.Collections.Concurrent;
using JustSaying.Messaging;
using JustSaying.Messaging.Interrogation;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.Fluent;

internal sealed class DynamicAddressMessagePublisher(
string topicArnTemplate,
Func<string, Message, string> topicAddressCustomizer,
Func<string, StaticAddressPublicationConfiguration> staticConfigBuilder,
ILoggerFactory loggerFactory) : IMessagePublisher, IMessageBatchPublisher
{
private readonly string _topicArnTemplate = topicArnTemplate;
private readonly ConcurrentDictionary<string, IMessagePublisher> _publisherCache = new();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this ConcurrentDictionary<string, Lazy<IMessagePublisher>>, and do away with the lock collection (_topicCreationLocks)? It feels simpler and safer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that would express what we're trying to do in a nicer way, and make it less error prone to make changes to.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, we could go one further and have a private readonly ConcurrentDictionary<string, Lazy<StaticAddressPublicationConfiguration>> _publisherCache = new();, then we won't have to have a duplicate Dictionary for IMessagePublisher and IMessageBatchPublisher.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think that would be an improvement

private readonly ConcurrentDictionary<string, IMessageBatchPublisher> _batchPublisherCache = new();
private readonly ConcurrentDictionary<string, SemaphoreSlim> _topicCreationLocks = new();
martincostello marked this conversation as resolved.
Show resolved Hide resolved
private readonly ILogger<DynamicMessagePublisher> _logger = loggerFactory.CreateLogger<DynamicMessagePublisher>();
private readonly Func<string, Message, string> _topicAddressCustomizer = topicAddressCustomizer;
private readonly Func<string, StaticAddressPublicationConfiguration> _staticConfigBuilder = staticConfigBuilder;

/// <inheritdoc/>
public InterrogationResult Interrogate()
{
var publishers = _publisherCache.Keys.OrderBy(x => x).ToDictionary(x => x, x => _publisherCache[x].Interrogate());
var batchPublishers = _batchPublisherCache.Keys.OrderBy(x => x).ToDictionary(x => x, x => _batchPublisherCache[x].Interrogate());

return new InterrogationResult(new
{
Publishers = publishers,
BatchPublishers = batchPublishers,
});
}

/// <inheritdoc/>
public Task StartAsync(CancellationToken stoppingToken) => Task.CompletedTask;

Check warning on line 37 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L37

Added line #L37 was not covered by tests

/// <inheritdoc/>
public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
{
string topicArn = _topicAddressCustomizer(_topicArnTemplate, message);
IMessagePublisher publisher;
if (_publisherCache.TryGetValue(topicArn, out publisher))
{
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;

Check warning on line 47 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L46-L47

Added lines #L46 - L47 were not covered by tests
}

var lockObj = _topicCreationLocks.GetOrAdd(topicArn, _ => new SemaphoreSlim(1, 1));

_logger.LogDebug("Publisher for topic {TopicArn} not found, waiting on setup lock", topicArn);
using (await lockObj.WaitScopedAsync(cancellationToken).ConfigureAwait(false))
{
if (_publisherCache.TryGetValue(topicArn, out publisher))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;

Check warning on line 59 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L57-L59

Added lines #L57 - L59 were not covered by tests
}

_logger.LogDebug("Lock acquired to configure topic {TopicArn}", topicArn);
var config = _staticConfigBuilder(topicArn);

_ = _publisherCache.TryAdd(topicArn, config.Publisher);
publisher = config.Publisher;
}

_logger.LogDebug("Publishing message on newly configured topic {TopicArn}", topicArn);
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public Task PublishAsync(Message message, CancellationToken cancellationToken)
=> PublishAsync(message, null, cancellationToken);

Check warning on line 75 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L75

Added line #L75 was not covered by tests

/// <inheritdoc/>
public async Task PublishAsync(IEnumerable<Message> messages, PublishBatchMetadata metadata, CancellationToken cancellationToken)
{
var publisherTask = new List<Task>();
foreach (var groupByType in messages.GroupBy(x => x.GetType()))
{
foreach (var groupByTopic in groupByType.GroupBy(x => _topicAddressCustomizer(_topicArnTemplate, x)))
{
string topicArn = groupByTopic.Key;
var batch = groupByTopic.ToList();
IMessageBatchPublisher publisher;
if (_batchPublisherCache.TryGetValue(topicArn, out publisher))
{
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
continue;

Check warning on line 91 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L90-L91

Added lines #L90 - L91 were not covered by tests
}

var lockObj = _topicCreationLocks.GetOrAdd(topicArn, _ => new SemaphoreSlim(1, 1));
_logger.LogDebug("Publisher for topic {TopicArn} not found, waiting on creation lock", topicArn);
using (await lockObj.WaitScopedAsync(cancellationToken).ConfigureAwait(false))
{
if (_batchPublisherCache.TryGetValue(topicArn, out publisher))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
continue;

Check warning on line 102 in src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicAddressMessagePublisher.cs#L100-L102

Added lines #L100 - L102 were not covered by tests
}

_logger.LogDebug("Lock acquired to configure topic {TopicArn}", topicArn);
var config = _staticConfigBuilder(topicArn);

publisher = _batchPublisherCache.GetOrAdd(topicArn, config.BatchPublisher);
}

_logger.LogDebug("Publishing message on newly created topic {TopicName}", topicArn);
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
}
}

await Task.WhenAll(publisherTask).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using JustSaying.Messaging;
using JustSaying.Models;
using Microsoft.Extensions.Logging;

namespace JustSaying.Fluent;

internal sealed class DynamicAddressPublicationConfiguration(
IMessagePublisher publisher,
IMessageBatchPublisher batchPublisher) : ITopicAddressPublisher
{
public IMessagePublisher Publisher { get; } = publisher;
public IMessageBatchPublisher BatchPublisher { get; } = batchPublisher;

public static DynamicAddressPublicationConfiguration Build<T>(
string topicArnTemplate,
Func<string, Message, string> topicNameCustomizer,
Func<string, StaticAddressPublicationConfiguration> staticConfigBuilder,
ILoggerFactory loggerFactory)
{
var publisher = new DynamicAddressMessagePublisher(topicArnTemplate, topicNameCustomizer, staticConfigBuilder, loggerFactory);

return new DynamicAddressPublicationConfiguration(publisher, publisher);
}
}
63 changes: 34 additions & 29 deletions src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
public async Task PublishAsync(Message message, PublishMetadata metadata, CancellationToken cancellationToken)
{
string topicName = _topicNameCustomizer(message);
if (_publisherCache.TryGetValue(topicName, out var publisher))
IMessagePublisher publisher;
if (_publisherCache.TryGetValue(topicName, out publisher))
{
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;
Expand All @@ -47,23 +48,25 @@
var lockObj = _topicCreationLocks.GetOrAdd(topicName, _ => new SemaphoreSlim(1, 1));

_logger.LogDebug("Publisher for topic {TopicName} not found, waiting on creation lock", topicName);
await lockObj.WaitAsync(cancellationToken).ConfigureAwait(false);
if (_publisherCache.TryGetValue(topicName, out var thePublisher))
using (await lockObj.WaitScopedAsync(cancellationToken).ConfigureAwait(false))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
await thePublisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;
}

_logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName);
var config = _staticConfigBuilder(topicName);
_logger.LogDebug("Executing startup task for topic {TopicName}", topicName);
await config.StartupTask(cancellationToken).ConfigureAwait(false);
if (_publisherCache.TryGetValue(topicName, out publisher))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
return;

Check warning on line 57 in src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs#L55-L57

Added lines #L55 - L57 were not covered by tests
}

_ = _publisherCache.TryAdd(topicName, config.Publisher);
_logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName);
var config = _staticConfigBuilder(topicName);
_logger.LogDebug("Executing startup task for topic {TopicName}", topicName);
await config.StartupTask(cancellationToken).ConfigureAwait(false);

_ = _publisherCache.TryAdd(topicName, config.Publisher);
publisher = config.Publisher;
}
_logger.LogDebug("Publishing message on newly created topic {TopicName}", topicName);
await config.Publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
await publisher.PublishAsync(message, metadata, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
Expand All @@ -80,32 +83,34 @@
{
string topicName = groupByTopic.Key;
var batch = groupByTopic.ToList();

if (_batchPublisherCache.TryGetValue(topicName, out var publisher))
IMessageBatchPublisher publisher;
if (_batchPublisherCache.TryGetValue(topicName, out publisher))
{
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
continue;
}

var lockObj = _topicCreationLocks.GetOrAdd(topicName, _ => new SemaphoreSlim(1, 1));
_logger.LogDebug("Publisher for topic {TopicName} not found, waiting on creation lock", topicName);
await lockObj.WaitAsync(cancellationToken).ConfigureAwait(false);
if (_batchPublisherCache.TryGetValue(topicName, out publisher))
using (await lockObj.WaitScopedAsync(cancellationToken).ConfigureAwait(false))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
continue;
if (_batchPublisherCache.TryGetValue(topicName, out publisher))
{
_logger.LogDebug("Lock re-entrancy detected, returning existing publisher");
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
continue;

Check warning on line 101 in src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/Fluent/PublishConfig/DynamicMessagePublisher.cs#L99-L101

Added lines #L99 - L101 were not covered by tests
}

_logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName);
var config = _staticConfigBuilder(topicName);
_logger.LogDebug("Executing startup task for topic {TopicName}", topicName);
await config.StartupTask(cancellationToken).ConfigureAwait(false);

publisher = _batchPublisherCache.GetOrAdd(topicName, config.BatchPublisher);
}

_logger.LogDebug("Lock acquired to initialize topic {TopicName}", topicName);
var config = _staticConfigBuilder(topicName);
_logger.LogDebug("Executing startup task for topic {TopicName}", topicName);
await config.StartupTask(cancellationToken).ConfigureAwait(false);

var cachedPublisher = _batchPublisherCache.GetOrAdd(topicName, config.BatchPublisher);

_logger.LogDebug("Publishing message on newly created topic {TopicName}", topicName);
publisherTask.Add(cachedPublisher.PublishAsync(batch, metadata, cancellationToken));
publisherTask.Add(publisher.PublishAsync(batch, metadata, cancellationToken));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using JustSaying.Messaging;

namespace JustSaying.Fluent;

internal interface ITopicAddressPublisher
{
IMessagePublisher Publisher { get; }
IMessageBatchPublisher BatchPublisher { get; }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using Amazon;
using JustSaying.AwsTools;
using JustSaying.AwsTools.MessageHandling;
using JustSaying.Messaging;
using Microsoft.Extensions.Logging;

namespace JustSaying.Fluent;

internal sealed class StaticAddressPublicationConfiguration(
IMessagePublisher publisher,
IMessageBatchPublisher batchPublisher) : ITopicAddressPublisher
{
public IMessagePublisher Publisher { get; } = publisher;
public IMessageBatchPublisher BatchPublisher { get; } = batchPublisher;

public static StaticAddressPublicationConfiguration Build<T>(
string topicAddress,
IAwsClientFactory clientFactory,
ILoggerFactory loggerFactory,
JustSayingBus bus)
{
var topicArn = Arn.Parse(topicAddress);

var eventPublisher = new SnsMessagePublisher(
topicAddress,
clientFactory.GetSnsClient(RegionEndpoint.GetBySystemName(topicArn.Region)),
bus.SerializationRegister,
loggerFactory,
bus.Config.MessageSubjectProvider)
{
MessageResponseLogger = bus.Config.MessageResponseLogger,
MessageBatchResponseLogger = bus.PublishBatchConfiguration?.MessageBatchResponseLogger
};

return new StaticAddressPublicationConfiguration(eventPublisher, eventPublisher);
}
}
Loading