Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression and JustSaying 8.0 Changes #1525

Open
wants to merge 80 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 78 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
a58a62b
Initial subscription compression functionality
slang25 May 9, 2024
cc7d4d3
WIP - more compression changes
slang25 Jun 12, 2024
2d3c495
Add publisher compression support
slang25 Jun 26, 2024
426030d
Address more feedback
slang25 Jun 27, 2024
e9a8a77
Further compression development
slang25 Jul 8, 2024
ddead74
Add some extra null handling
slang25 Jul 8, 2024
37eaa73
Tweaked behaviour of CompressMessageIfNeeded
slang25 Jul 8, 2024
09413c3
Add more xml doc comments
slang25 Jul 8, 2024
ab075b0
Add null check and remove binary breaking change
slang25 Jul 8, 2024
d1dd060
Work on public api
slang25 Jul 8, 2024
fff6506
Add back API which was made private
slang25 Jul 8, 2024
aa326ce
Fix up API files
slang25 Jul 8, 2024
c9ec0f9
Add tests and fixes
slang25 Jul 8, 2024
a4ef78c
Move missing encoding check
slang25 Jul 8, 2024
619b199
Add extra validation
slang25 Jul 8, 2024
ce4b6d4
Add unit tests
slang25 Jul 8, 2024
b3e5222
Rename DefaultCompressionOptions
slang25 Jul 8, 2024
82a0bca
More work
slang25 Jul 9, 2024
cd4f61b
WIP
slang25 Jul 9, 2024
7c18a0d
More compression refactoring
slang25 Jul 11, 2024
df43e02
Tidy up
slang25 Jul 12, 2024
8c23f2f
Fix CI
slang25 Jul 12, 2024
b0ea0ef
Compression WIP
slang25 Aug 22, 2024
4d732a0
More compression work
slang25 Aug 26, 2024
d888b5a
Add RawMessageDelivery support
slang25 Aug 27, 2024
c593ef6
Fix more tests
slang25 Sep 4, 2024
3657d8d
Add more tests and cleanup code
slang25 Oct 3, 2024
d644e48
More tidy up
slang25 Oct 3, 2024
6c04948
More tidy up
slang25 Oct 3, 2024
cb8e0fd
More tidy up
slang25 Oct 3, 2024
fe1d325
More tidy up
slang25 Oct 3, 2024
140ba8e
Merge branch 'main' into refactored-compression
slang25 Oct 3, 2024
8ca09ad
More tidy up
slang25 Oct 3, 2024
d2b2a8f
Apply some feedback
slang25 Oct 4, 2024
fbe31d9
Apply some feedback
slang25 Oct 4, 2024
64e0d21
Apply some feedback
slang25 Oct 4, 2024
ee7febf
Apply some feedback
slang25 Oct 4, 2024
ef110b8
Apply some feedback
slang25 Oct 4, 2024
8de4441
More changes
slang25 Oct 7, 2024
3872c83
Fix json encoding issues
slang25 Oct 7, 2024
7e016e6
Tidy up a few bits
slang25 Oct 8, 2024
d549a5c
Apply some feedback
slang25 Oct 8, 2024
af27eae
More refactoring
slang25 Oct 9, 2024
c2e8f0a
Merge remote-tracking branch 'upstream/main' into refactored-compression
slang25 Oct 11, 2024
e1daeee
Fix test
slang25 Oct 11, 2024
0ade76f
Update LocalSqsSnsMessaging and fix flakey test
slang25 Oct 15, 2024
e1a257c
Apply feedback
slang25 Oct 15, 2024
6e76da9
Support overriding subject
slang25 Oct 18, 2024
07d53d3
Make tests more resiliant
slang25 Oct 21, 2024
6830ed9
Improve relaibility of tests
slang25 Oct 23, 2024
1f69d00
Improve relaibility of tests
slang25 Oct 23, 2024
d330c1f
More test tweaks
slang25 Oct 23, 2024
caad08d
Improve relaibility of tests
slang25 Oct 23, 2024
57694a6
Improve relaibility of tests
slang25 Oct 23, 2024
fa76dae
Improve relaibility of tests
slang25 Oct 25, 2024
c7c782d
Improve relaibility of tests
slang25 Oct 26, 2024
aab0086
Improve relaibility of tests
slang25 Oct 27, 2024
4f0b1ae
Reduce unnecessary exceptions
slang25 Oct 27, 2024
d58dc2c
Run tests in parallel
slang25 Oct 27, 2024
850202e
Revert "Run tests in parallel"
slang25 Oct 27, 2024
173091a
Make PublishDestinationType internal
slang25 Oct 28, 2024
e77b78a
Revert samples change
slang25 Nov 6, 2024
ed96100
Extend message attribute handling
slang25 Nov 6, 2024
01bbbd3
Add improved message size logic and fixed duplicate lines
slang25 Nov 6, 2024
38157f0
Revert localstack ports for samples
slang25 Nov 6, 2024
4a05bd3
Merge main
slang25 Nov 6, 2024
dd12c60
Merge remote-tracking branch 'upstream/main' into refactored-compression
slang25 Nov 6, 2024
bf35392
Revert SDK version
slang25 Nov 6, 2024
4054945
Add integration test
slang25 Nov 7, 2024
5a39f7c
Update src/JustSaying/AwsTools/MessageHandling/PublishMessageConverte…
slang25 Nov 7, 2024
b850c1c
Move ParseMessageAttribute to usage
slang25 Nov 7, 2024
6626806
Update tests/JustSaying.Extensions.DependencyInjection.StructureMap.T…
slang25 Nov 7, 2024
99e6efd
Update src/JustSaying/AwsTools/MessageHandling/ReceivedMessageConvert…
slang25 Nov 7, 2024
474654a
Update tests/JustSaying.UnitTests/Messaging/Compression/GzipMessageBo…
slang25 Nov 7, 2024
009886b
Reduce boilerplate
slang25 Nov 7, 2024
01c124f
Add cancellation token
slang25 Nov 7, 2024
96a1a79
Reduce default compression size
slang25 Nov 7, 2024
0e7372a
Rename types
slang25 Nov 8, 2024
43c59c4
Use STJ in tests
slang25 Nov 8, 2024
8212cbb
Add TODO note for adding back LocalStack tests
slang25 Nov 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@
<CoverletOutputFormat>cobertura,json</CoverletOutputFormat>
<Exclude>[*.Benchmarks]*,[*Sample*]*,[*Test*]*,[xunit.*]*</Exclude>
</PropertyGroup>
<!-- Keys used by InternalsVisibleTo attributes. -->
<PropertyGroup>
<DynamicProxyGenAssembly2PublicKey>0024000004800000940000000602000000240000525341310004000001000100c547cac37abd99c8db225ef2f6c8a3602f3b3606cc9891605d02baa56104f4cfc0734aa39b93bf7852f7d9266654753cc297e7d2edfe0bac1cdcf9f717241550e0a7b191195b7667bb4f64bcb8e2121380fd1d9d46ad2d92d2d15605093924cceaf74c4861eff62abf69b9291ed0a340e113be11e6a7d3113e92484cf7045cc7</DynamicProxyGenAssembly2PublicKey>
</PropertyGroup>
<ItemGroup>
<None Include="$(MSBuildThisFileDirectory)$(PackageIcon)" Pack="True" PackagePath="" />
<None Include="$(MSBuildThisFileDirectory)$(PackageReadmeFile)" Pack="True" PackagePath="" />
Expand Down
10 changes: 5 additions & 5 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
<Project>
<ItemGroup Label="Libraries">
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.300" />
<PackageVersion Include="AWSSDK.Extensions.NETCore.Setup" Version="3.7.301" />
<PackageVersion Include="AWSSDK.SimpleNotificationService" Version="3.7.400" />
<PackageVersion Include="AWSSDK.SQS" Version="3.7.400" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="1.1.1" />
<PackageVersion Include="Microsoft.Bcl.AsyncInterfaces" Version="6.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="1.1.0" Condition=" '$(TargetFramework)' == 'net461' " />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.0.0" Condition=" '$(TargetFramework)' == 'netstandard2.0' " />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="8.0.0" Condition=" '$(TargetFramework)' == 'net8.0' " />
<PackageVersion Include="Microsoft.Extensions.Logging.Abstractions" Version="2.0.0" />
<PackageVersion Include="Newtonsoft.Json" Version="13.0.1" />
<PackageVersion Include="StructureMap" Version="4.6.0" />
<PackageVersion Include="System.Text.Json" Version="4.6.0" />
<PackageVersion Include="System.Text.Json" Version="6.0.10" />
<PackageVersion Include="System.Threading.Channels" Version="4.5.0" />
</ItemGroup>
<ItemGroup Label="Tests and Samples">
<PackageVersion Include="AutoFixture" Version="4.18.1" />
<PackageVersion Include="CommandLineParser" Version="2.9.1" />
<PackageVersion Include="coverlet.msbuild" Version="6.0.2" />
<PackageVersion Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageVersion Include="LocalSqsSnsMessaging" Version="0.5.3" />
<PackageVersion Include="Magnum" Version="2.1.3" />
<PackageVersion Include="MartinCostello.Logging.XUnit" Version="0.4.0" />
<PackageVersion Include="MELT" Version="0.9.0" />
<PackageVersion Include="MELT.Xunit" Version="0.9.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.PublicApiAnalyzers" Version="3.3.4" />
<PackageVersion Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="8.0.0" />
<PackageVersion Include="Microsoft.Extensions.Configuration.Json" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Diagnostics.Testing" Version="8.10.0" />
<PackageVersion Include="Microsoft.Extensions.Hosting" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging" Version="8.0.1" />
<PackageVersion Include="Microsoft.Extensions.Logging.Console" Version="8.0.1" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ static async Task Run()
config.Publications(x =>
{
// Creates the following if they do not already exist
// - a SNS topic of name `orderreadyevent` with two tags:
// - an SNS topic of name `orderreadyevent` with two tags:
// - "IsOrderEvent" with no value
// - "Publisher" with the value "KitchenConsole"
x.WithTopic<OrderReadyEvent>(cfg =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using JustSaying.Fluent;
using JustSaying.Messaging;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Compression;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
Expand Down Expand Up @@ -139,16 +140,11 @@ public static IServiceCollection AddJustSaying(this IServiceCollection services,
services.TryAddSingleton<IMessageContextAccessor>(serviceProvider => serviceProvider.GetRequiredService<MessageContextAccessor>());
services.TryAddSingleton<IMessageContextReader>(serviceProvider => serviceProvider.GetRequiredService<MessageContextAccessor>());

services.TryAddSingleton<IMessageSerializationFactory, NewtonsoftSerializationFactory>();
services.TryAddSingleton<IMessageBodySerializationFactory, NewtonsoftSerializationFactory>();
services.TryAddSingleton<IMessageSubjectProvider, GenericMessageSubjectProvider>();
services.TryAddSingleton<IVerifyAmazonQueues, AmazonQueueCreator>();
services.TryAddSingleton<IMessageSerializationRegister>(
(p) =>
{
var config = p.GetRequiredService<IMessagingConfig>();
var serializerFactory = p.GetRequiredService<IMessageSerializationFactory>();
return new MessageSerializationRegister(config.MessageSubjectProvider, serializerFactory);
});
services.TryAddEnumerable(ServiceDescriptor.Singleton<IMessageBodyCompression, GzipMessageBodyCompression>());
services.TryAddSingleton<MessageCompressionRegistry>();

services.TryAddSingleton<IMessageReceivePauseSignal, MessageReceivePauseSignal>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
using JustSaying.AwsTools.QueueCreation;
using JustSaying.Fluent;
using JustSaying.Messaging.Channels.Receive;
using JustSaying.Messaging.Compression;
using JustSaying.Messaging.MessageHandling;
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Messaging.Middleware.Logging;
using JustSaying.Messaging.Middleware.PostProcessing;
using JustSaying.Messaging.Monitoring;
using JustSaying.Naming;
using StructureMap;
using StructureMap.Pipeline;

namespace JustSaying;

Expand All @@ -33,7 +35,7 @@ public JustSayingRegistry()
For<IMessagingConfig>().Use(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IPublishBatchConfiguration>().Use<MessagingConfig>(context => context.GetInstance<MessagingConfig>()).Singleton();
For<IMessageMonitor>().Use<NullOpMessageMonitor>().Singleton();
For<IMessageSerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageBodySerializationFactory>().Use<NewtonsoftSerializationFactory>().Singleton();
For<IMessageSubjectProvider>().Use<GenericMessageSubjectProvider>().Singleton();
For<IVerifyAmazonQueues>().Use<AmazonQueueCreator>().Singleton();

Expand All @@ -43,18 +45,8 @@ public JustSayingRegistry()

For<LoggingMiddleware>().Transient();
For<SqsPostProcessorMiddleware>().Transient();

For<IMessageSerializationRegister>()
.Use(
nameof(IMessageSerializationRegister),
(p) =>
{
var config = p.GetInstance<IMessagingConfig>();
var serializerFactory = p.GetInstance<IMessageSerializationFactory>();
return new MessageSerializationRegister(config.MessageSubjectProvider, serializerFactory);
})
.Singleton();

For<IMessageBodyCompression>().Add<GzipMessageBodyCompression>().Singleton();
For<MessageCompressionRegistry>().Singleton();
For<IMessageReceivePauseSignal>().Use<MessageReceivePauseSignal>().Singleton();

For<DefaultNamingConventions>().Singleton();
Expand Down
12 changes: 12 additions & 0 deletions src/JustSaying/AwsTools/MessageAttributeKeys.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace JustSaying.AwsTools;

/// <summary>
/// Contains constant key values for message attributes.
/// </summary>
internal static class MessageAttributeKeys
{
/// <summary>
/// Represents the key for the Content-Encoding attribute.
/// </summary>
public const string ContentEncoding = "Content-Encoding";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
using JustSaying.Messaging.Compression;

namespace JustSaying.AwsTools.MessageHandling;

/// <summary>
/// Represents options for message compression during publishing.
/// </summary>
public sealed class PublishCompressionOptions
{
/// <summary>
/// Gets or sets the message length threshold in bytes.
/// Messages larger than this threshold will be compressed.
/// </summary>
/// <remarks>
/// The default value is 260,096 bytes (254 KB), 2KB less than the SNS and SQS limit.
/// </remarks>
public int MessageLengthThreshold { get; set; } = 254 * 1024;

/// <summary>
/// Gets or sets the compression encoding to be used.
/// </summary>
/// <remarks>
/// This should correspond to a registered compression algorithm in the <see cref="MessageCompressionRegistry"/>.
/// </remarks>
public string CompressionEncoding { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace JustSaying.AwsTools.MessageHandling.Dispatch;
/// <summary>
/// Dispatches messages to the queue.
/// </summary>
public interface IMessageDispatcher
internal interface IMessageDispatcher
{
/// <summary>
/// Dispatches the message in <see cref="IQueueMessageContext"/> to the queue in the context.
Expand All @@ -14,4 +14,4 @@ public interface IMessageDispatcher
/// <param name="cancellationToken">A <see cref="CancellationToken"/> to stop processing the message dispatch.</param>
/// <returns>A <see cref="Task"/> that completes once the message has been dispatched.</returns>
Task DispatchMessageAsync(IQueueMessageContext messageContext, CancellationToken cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,18 @@

namespace JustSaying.AwsTools.MessageHandling.Dispatch;

public class MessageDispatcher : IMessageDispatcher
internal sealed class MessageDispatcher : IMessageDispatcher
{
private readonly IMessageSerializationRegister _serializationRegister;
private readonly IMessageMonitor _messagingMonitor;
private readonly MiddlewareMap _middlewareMap;

private static ILogger _logger;
private readonly ILogger _logger;

public MessageDispatcher(
IMessageSerializationRegister serializationRegister,
IMessageMonitor messagingMonitor,
MiddlewareMap middlewareMap,
ILoggerFactory loggerFactory)
{
_serializationRegister = serializationRegister;
_messagingMonitor = messagingMonitor;
_middlewareMap = middlewareMap;
_logger = loggerFactory.CreateLogger("JustSaying");
Expand All @@ -42,6 +39,7 @@

if (!success)
{
_logger.LogTrace("DeserializeMessage failed. Message will not be dispatched.");
return;
}

Expand All @@ -68,18 +66,18 @@

await middleware.RunAsync(handleContext, null, cancellationToken)
.ConfigureAwait(false);

}

private async Task<(bool success, Message typedMessage, MessageAttributes attributes)>
DeserializeMessage(IQueueMessageContext messageContext, CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Attempting to deserialize message with serialization register {Type}",
_serializationRegister.GetType().FullName);
var messageWithAttributes = _serializationRegister.DeserializeMessage(messageContext.Message.Body);
return (true, messageWithAttributes.Message, messageWithAttributes.MessageAttributes);
_logger.LogDebug("Attempting to deserialize message.");

var (message, attributes) = await messageContext.MessageConverter.ConvertToInboundMessageAsync(messageContext.Message, cancellationToken);

return (true, message, attributes);
}
catch (MessageFormatNotSupportedException ex)
{
Expand All @@ -93,6 +91,11 @@

return (false, null, null);
}
catch (OperationCanceledException)

Check warning on line 94 in src/JustSaying/AwsTools/MessageHandling/Dispatch/MessageDispatcher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Dispatch/MessageDispatcher.cs#L94

Added line #L94 was not covered by tests
{
// Ignore cancellation
return (false, null, null);

Check warning on line 97 in src/JustSaying/AwsTools/MessageHandling/Dispatch/MessageDispatcher.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/Dispatch/MessageDispatcher.cs#L97

Added line #L97 was not covered by tests
}
catch (Exception ex)
{
_logger.LogError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,10 @@

namespace JustSaying.AwsTools.MessageHandling;

internal class ForeignTopicArnProvider(RegionEndpoint regionEndpoint, string accountId, string topicName) : ITopicArnProvider
internal class ForeignTopicArnProvider(RegionEndpoint regionEndpoint, string accountId, string topicName)

Check warning on line 5 in src/JustSaying/AwsTools/MessageHandling/ForeignTopicArnProvider.cs

View check run for this annotation

Codecov / codecov/patch

src/JustSaying/AwsTools/MessageHandling/ForeignTopicArnProvider.cs#L5

Added line #L5 was not covered by tests
{

private readonly string _arn = $"arn:aws:sns:{regionEndpoint.SystemName}:{accountId}:{topicName}";

public Task<bool> ArnExistsAsync()
{
// Assume foreign topics exist, we actually find out when we attempt to subscribe
return Task.FromResult(true);
}

public Task<string> GetArnAsync()
{
return Task.FromResult(_arn);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
using JustSaying.Messaging.MessageSerialization;

namespace JustSaying.Messaging;

public interface IInboundMessageConverter
{
/// <summary>
/// Converts an Amazon SQS message to a <see cref="InboundMessage" /> object.
/// </summary>
/// <param name="message">The Amazon SQS message to convert.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A <see cref="InboundMessage" /> object containing the deserialized message body and attributes.</returns>
/// <remarks>
/// This method handles the conversion of both raw SQS messages and SNS-wrapped messages.
/// It also applies any necessary decompression to the message body.
/// </remarks>
ValueTask<InboundMessage> ConvertToInboundMessageAsync(Amazon.SQS.Model.Message message, CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using JustSaying.Messaging.MessageSerialization;
using JustSaying.Models;

namespace JustSaying.Messaging;

public interface IOutboundMessageConverter
{
/// <summary>
/// Converts a message to a format suitable for publishing, applying necessary transformations and compression.
/// </summary>
/// <param name="message">The original message to be converted.</param>
/// <param name="publishMetadata">Metadata associated with the publish operation, including any custom message attributes.</param>
/// <param name="cancellationToken">A token to monitor for cancellation requests.</param>
/// <returns>A <see cref="OutboundMessage"/> object containing the converted message body, attributes, and any additional publishing information.</returns>
/// <remarks>
/// This method handles the following operations:
/// <ul>
/// <li>Serializes the message body</li>
/// <li>Adds custom message attributes</li>
/// <li>Applies compression to the message body if it meets specified criteria</li>
/// <li>Adds compression-related attributes if compression is applied</li>
/// <li>Prepares the message for SNS (if applicable) by setting the subject</li>
/// </ul>
/// The exact behavior may vary based on the destination type and compression options.
/// </remarks>
ValueTask<OutboundMessage> ConvertToOutboundMessageAsync(Message message, PublishMetadata publishMetadata, CancellationToken cancellationToken = default);
}
2 changes: 1 addition & 1 deletion src/JustSaying/AwsTools/MessageHandling/ISqsQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,4 @@ Task ChangeMessageVisibilityAsync(
int visibilityTimeoutInSeconds,
CancellationToken cancellationToken);

}
}

This file was deleted.

Loading