Skip to content

Commit

Permalink
Add super stream exists API (#358)
Browse files Browse the repository at this point in the history
- Add documentation
- Add configuration to BestPracticesClient client

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Feb 14, 2024
1 parent ac23f56 commit 86b9d4c
Show file tree
Hide file tree
Showing 17 changed files with 238 additions and 48 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@ Please refer to the [documentation](https://rabbitmq.github.io/rabbitmq-stream-d
The library requires .NET 6 or .NET 7.

### Documentation


- [HTML documentation](https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/htmlsingle/index.html)
- [PDF documentation](https://rabbitmq.github.io/rabbitmq-stream-dotnet-client/stable/dotnet-stream-client.pdf)

- [A Simple Getting started](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/Documentation/)
- [Best practices to write a reliable client](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient/)
- [Super Stream example](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/blob/main/docs/SuperStream)
- [Stream Performance Test](https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/RabbitMQ.Stream.Client.PerfTest)

Expand Down
13 changes: 13 additions & 0 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,19 @@ public async ValueTask<MetaDataResponse> QueryMetadata(string[] streams)
.ConfigureAwait(false);
}

public async Task<bool> SuperStreamExists(string stream)
{
var response = await QueryPartition(stream).ConfigureAwait(false);
if (response is { Streams.Length: >= 0 } &&
response.ResponseCode == ResponseCode.StreamNotAvailable)
{
ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream);
}

return response is { Streams.Length: >= 0 } &&
response.ResponseCode == ResponseCode.Ok;
}

public async Task<bool> StreamExists(string stream)
{
var streams = new[] { stream };
Expand Down
3 changes: 3 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ RabbitMQ.Stream.Client.Client.Publishers.get -> System.Collections.Generic.IDict
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.Client.SuperStreamExists(string stream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
RabbitMQ.Stream.Client.Client.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
Expand Down Expand Up @@ -182,6 +183,7 @@ RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTime d
RabbitMQ.Stream.Client.OffsetTypeTimestamp.OffsetTypeTimestamp(System.DateTimeOffset dateTimeOffset) -> void
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.Partitions.get -> int
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name) -> void
RabbitMQ.Stream.Client.PartitionsSuperStreamSpec.PartitionsSuperStreamSpec(string Name, int partitions) -> void
RabbitMQ.Stream.Client.ProducerFilter
RabbitMQ.Stream.Client.ProducerFilter.FilterValue.get -> System.Func<RabbitMQ.Stream.Client.Message, string>
Expand Down Expand Up @@ -309,6 +311,7 @@ RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Cl
RabbitMQ.Stream.Client.StreamSystem.DeleteSuperStream(string superStream) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamInfo>
RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.StreamStats>
RabbitMQ.Stream.Client.StreamSystem.SuperStreamExists(string superStream) -> System.Threading.Tasks.Task<bool>
RabbitMQ.Stream.Client.StreamSystem.UpdateSecret(string newSecret) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void
Expand Down
24 changes: 24 additions & 0 deletions RabbitMQ.Stream.Client/StreamSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public int MaxSegmentSizeBytes
public IDictionary<string, string> Args => args;
}

/// <summary>
/// Abstract class for SuperStreamSpec
/// </summary>
/// <param name="Name"> Super Stream Name</param>
public abstract record SuperStreamSpec(string Name)
{
internal virtual void Validate()
Expand Down Expand Up @@ -79,9 +83,22 @@ public int MaxSegmentSizeBytes
public IDictionary<string, string> Args => args;
}

/// <summary>
/// Create a super stream based on the number of partitions.
/// So there will be N partitions and N binding keys.
/// The stream names is the super stream name with a partition number appended.
/// The routing key is the partition number.
/// Producer should use HASH strategy to route the message to the correct partition.
/// Partitions should be at least 1.
/// </summary>
public record PartitionsSuperStreamSpec : SuperStreamSpec
{

public PartitionsSuperStreamSpec(string Name) : base(Name)
{
Partitions = 3;
}

public PartitionsSuperStreamSpec(string Name, int partitions) : base(Name)
{
Partitions = partitions;
Expand Down Expand Up @@ -121,6 +138,13 @@ internal override List<string> GetBindingKeys()

}

/// <summary>
/// Create a super stream based on the number of binding keys.
/// So there will be N partitions and N binding keys.
/// The stream names is the super stream name with a binding key appended.
/// Producer should use KEY strategy to route the message to the correct partition.
/// The binding keys should be unique duplicates are not allowed.
/// </summary>
public record BindingsSuperStreamSpec : SuperStreamSpec
{
public BindingsSuperStreamSpec(string Name, string[] bindingKeys) : base(Name)
Expand Down
14 changes: 14 additions & 0 deletions RabbitMQ.Stream.Client/StreamSystem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,20 @@ public async Task<bool> StreamExists(string stream)
}
}

public async Task<bool> SuperStreamExists(string superStream)
{
await MayBeReconnectLocator().ConfigureAwait(false);
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
try
{
return await _client.SuperStreamExists(superStream).ConfigureAwait(false);
}
finally
{
_semClientProvidedName.Release();
}
}

private static void MaybeThrowQueryException(string reference, string stream)
{
if (string.IsNullOrWhiteSpace(reference) || string.IsNullOrWhiteSpace(stream))
Expand Down
2 changes: 2 additions & 0 deletions Tests/SystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,9 @@ public async void CreateDeleteSuperStream()
Assert.Equal("1000", spec.Args["stream-max-segment-size-bytes"]);
Assert.Equal("20000", spec.Args["max-length-bytes"]);
await system.CreateSuperStream(spec);
Assert.True(await system.SuperStreamExists(SuperStream));
await system.DeleteSuperStream(SuperStream);
Assert.False(await system.SuperStreamExists(SuperStream));
await system.Close();
}

Expand Down
2 changes: 1 addition & 1 deletion Tests/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public static async Task ResetSuperStreams()
}

Wait();
var spec = new PartitionsSuperStreamSpec(InvoicesExchange, 3);
var spec = new PartitionsSuperStreamSpec(InvoicesExchange);
await system.CreateSuperStream(spec);
await system.Close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,16 @@

namespace ReliableClient;

public class RClient
public class BestPracticesClient
{
public record Config
{
public string Host { get; set; } = "localhost";
public string? Host { get; set; } = "localhost";
public int Port { get; set; } = 5552;
public string Username { get; set; } = "guest";
public string Password { get; set; } = "guest";
public string? Username { get; set; } = "guest";
public string? Password { get; set; } = "guest";

public string? StreamName { get; set; } = "DotNetClientTest";
public bool LoadBalancer { get; set; } = false;
public bool SuperStream { get; set; } = false;
public int Streams { get; set; } = 1;
Expand All @@ -30,6 +32,10 @@ public record Config
public int MessagesPerProducer { get; set; } = 5_000_000;
public int Consumers { get; set; } = 9;
public byte ConsumersPerConnection { get; set; } = 8;

public int DelayDuringSendMs { get; set; } = 0;


}

public static async Task Start(Config config)
Expand Down Expand Up @@ -61,12 +67,18 @@ public static async Task Start(Config config)
switch (Uri.CheckHostName(config.Host))
{
case UriHostNameType.IPv4:
ep = new IPEndPoint(IPAddress.Parse(config.Host), config.Port);
if (config.Host != null) ep = new IPEndPoint(IPAddress.Parse(config.Host), config.Port);
break;
case UriHostNameType.Dns:
var addresses = await Dns.GetHostAddressesAsync(config.Host).ConfigureAwait(false);
ep = new IPEndPoint(addresses[0], config.Port);
if (config.Host != null)
{
var addresses = await Dns.GetHostAddressesAsync(config.Host).ConfigureAwait(false);
ep = new IPEndPoint(addresses[0], config.Port);
}

break;
default:
throw new ArgumentOutOfRangeException();
}
}

Expand Down Expand Up @@ -105,13 +117,13 @@ public static async Task Start(Config config)
var streamsList = new List<string>();
if (config.SuperStream)
{
streamsList.Add("invoices");
if (config.StreamName != null) streamsList.Add(config.StreamName);
}
else
{
for (var i = 0; i < config.Streams; i++)
{
streamsList.Add($"invoices-{i}");
streamsList.Add($"{config.StreamName}-{i}");
}
}

Expand Down Expand Up @@ -141,6 +153,18 @@ public static async Task Start(Config config)
List<Consumer> consumersList = new();
List<Producer> producersList = new();
var obj = new object();
if (config.SuperStream)
{
if (await system.SuperStreamExists(streamsList[0]).ConfigureAwait(false))
{
await system.DeleteSuperStream(streamsList[0]).ConfigureAwait(false);
}


await system.CreateSuperStream(new PartitionsSuperStreamSpec(streamsList[0], config.Streams)).ConfigureAwait(false);
}


foreach (var stream in streamsList)
{
if (!config.SuperStream)
Expand All @@ -153,7 +177,7 @@ public static async Task Start(Config config)
await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_000,})
.ConfigureAwait(false);
await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
}
}

for (var z = 0; z < config.Consumers; z++)
{
Expand All @@ -162,15 +186,24 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
OffsetSpec = new OffsetTypeLast(),
IsSuperStream = config.SuperStream,
IsSingleActiveConsumer = config.SuperStream,
Reference = "myApp",
Identifier = $"my_c_{z}",
Reference = "myApp",// needed for the Single Active Consumer or fot the store offset
// can help to identify the consumer on the logs and RabbitMQ Management
Identifier = $"my_consumer_{z}",
InitialCredits = 10,
MessageHandler = (source, ctx, _, _) =>
MessageHandler = async (source, consumer, ctx, _) =>
{
if (totalConsumed % 10_000 == 0)
{
// don't store the offset every time, it could be a performance issue
// store the offset every 1_000/5_000/10_000 messages
await consumer.StoreOffset(ctx.Offset).ConfigureAwait(false);
}
Interlocked.Increment(ref totalConsumed);
return Task.CompletedTask;
},
};

// This is the callback that will be called when the consumer status changes
// DON'T PUT ANY BLOCKING CODE HERE
conf.StatusChanged += (status) =>
{
var streamInfo = status.Partition is not null
Expand All @@ -190,27 +223,33 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
await producer.Send(message).ConfigureAwait(false);
}

// this example is meant to show how to use the producer and consumer
// Create too many tasks for the producers and consumers is not a good idea
for (var z = 0; z < config.Producers; z++)
{
var z1 = z;
_ = Task.Run(async () =>
{
// the list of unconfirmed messages in case of error or disconnection
// This example is only for the example, in a real scenario you should handle the unconfirmed messages
// since the list could grow event the publishEvent should avoid it.
var unconfirmedMessages = new ConcurrentBag<Message>();
// the event to wait for the producer to be ready to send
// in case of disconnection the event will be reset
var publishEvent = new ManualResetEvent(false);

var producerConfig = new ProducerConfig(system, stream)
{
Identifier = $"my_super_{z1}",
Identifier = $"my_producer_{z1}",
SuperStreamConfig = new SuperStreamConfig()
{
Enabled = config.SuperStream, Routing = msg => msg.Properties.MessageId.ToString(),
},
ConfirmationHandler = confirmation =>
{
// Add the unconfirmed messages to the list in case of error
if (confirmation.Status != ConfirmationStatus.Confirmed)
{
confirmation.Messages.ForEach(m => { unconfirmedMessages.Add(m); });

Interlocked.Add(ref totalError, confirmation.Messages.Count);
return Task.CompletedTask;
}
Expand All @@ -219,15 +258,21 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
return Task.CompletedTask;
},
};

// Like the consumer don't put any blocking code here
producerConfig.StatusChanged += (status) =>
{
var streamInfo = status.Partition is not null
? $" Partition {status.Partition} of super stream: {status.Stream}"
: $"Stream: {status.Stream}";

// just log the status change
lp.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}",
status.Identifier, status.From, status.To,status.Reason, streamInfo);

// in case of disconnection the event will be reset
// in case of reconnection the event will be set so the producer can send messages
// It is important to use the ManualReset to avoid to send messages before the producer is ready
if (status.To == ReliableEntityStatus.Open)
{
publishEvent.Set();
Expand All @@ -247,6 +292,7 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
{
if (!unconfirmedMessages.IsEmpty)
{
// checks if there are unconfirmed messages and send them
var msgs = unconfirmedMessages.ToArray();
unconfirmedMessages.Clear();
foreach (var msg in msgs)
Expand All @@ -261,7 +307,8 @@ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publis
Properties = new Properties() {MessageId = $"hello{i}"}
};
await MaybeSend(producer, message, publishEvent).ConfigureAwait(false);
// await Task.Delay(1).ConfigureAwait(false);
// You don't need this it is only for the example
await Task.Delay(config.DelayDuringSendMs).ConfigureAwait(false);
Interlocked.Increment(ref totalSent);
}
});
Expand Down
Loading

0 comments on commit 86b9d4c

Please sign in to comment.