Skip to content

v1.8.0

Compare
Choose a tag to compare
@Gsantomaggio Gsantomaggio released this 14 Feb 09:37
· 23 commits to main since this release
v1.8.0
86b9d4c

(Below, you can find all the changes/PRs)

What's new in 1.8

The 1.8 focus are:

  • Multiple Consumers and Producers per connection
  • Improve the reconnection for stream and super stream.

The high-level classes Consumer and Producer don't introduce breaking changes.
The RawSuperStream* classes change the default behaviour. Please take a look at the section 3.

Main changes:

1. Multiple Consumers and Producers per connection

The RabbitMQ stream protocol supports multi-producers and multi-consumers per TCP Connection.
This version introduces the connection pool for Consumers and Producers.

There is a new ConnectionPoolConfig setting:

new StreamSystemConfig {
  ConnectionPoolConfig = new ConnectionPoolConfig()
 {
  ConsumersPerConnection = 10, 
   ProducersPerConnection = 10, 
 }
};

ConsumersPerConnection == The number of consumers per connection min 1 max 200 default is 1
ProducersPerConnection == The number of producers per connection min 1 max 200 default is 1

Each connection can handle different streams; see the image:

Screenshot 2023-12-18 at 10 18 05

Performances

Sharing the same connection for multiple streams reduces the number of connections, but it could impact the performances:

  • Consumer side. If one consumer is slow, it can also affect the other consumers
  • Producer side: If all the producers are at full rate, it can reduce the performances

The proper parameter depends on your environment.

Tip

You can use different StreamSystemConfig like:

configToReduceTheConnections = new StreamSystemConfig{
 ConnectionPoolConfig = new ConnectionPoolConfig() {
    ConsumersPerConnection = 50, // high value 
    ProducersPerConnection = 50,  // high value
  }
}
configToIncreaseThePerformances = new StreamSystemConfig{
 ConnectionPoolConfig = new ConnectionPoolConfig() {
		ConsumersPerConnection = 1, // low value 
		ProducersPerConnection = 1,  // low value
  }
}

There are many combinations from 1 to 200.

2. Improve the reconnections

Handle streamNotAvailable, Add disconnection Info: #343

Improve the super stream reconnection: #344

Increase the backoff strategy time: #345

Please follow this document If you want to know more about what happens during a broker restart.

The focus is to improve the reconnection during the cluster restart.

3. Raw Super stream events

Removed the auto-reconnect. The RawSuperStreamProducer and RawSuperStreamConsumer classes now expose two events:

#344

**NOTE: If you are using these classes, the auto-reconnect is removed to be compliant with all the Raw* classes. **

You should use Consumer and Producer unless for a specific use case.

For Raw* users:

  • Super Stream: during the disconnection, it is possible to understand the disconnection cause and reconnect the stream like:
var consumer = await system.CreateSuperStreamConsumer(configuration);
        var completed = new TaskCompletionSource<bool>();
        configuration.ConnectionClosedHandler = async (reason, stream) =>
        {
            if (reason == ConnectionClosedReason.Unexpected)
            {
                await consumer.ReconnectPartition(
                    await system.StreamInfo(stream).ConfigureAwait(false)
                );
                completed.SetResult(true);
            }
        };

The same is true for the standard consumer.

  • Metadata update
 MetadataHandler = async update =>
 {
    await consumer.ReconnectPartition(
                    await system.StreamInfo(stream).ConfigureAwait(false));
                        
 }   

4. Add events to Producer and Consumer classes

See: #349

See also: https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/tree/main/docs/ReliableClient
where you can find an example of how to use StatusChanged

 producerConfig.StatusChanged += (status) =>
  {
   var streamInfo = status.Partition is not null
      ? $" Partition {status.Partition} of super stream: {status.Stream}"
      : $"Stream: {status.Stream}";

       lp.LogInformation("Producer: {Id} - status changed from {From} to {To}. {Info}",
                              status.Identifier,
                              status.From,
                              status.To, streamInfo);

       if (status.To == ReliableEntityStatus.Open)
       {
          publishEvent.Set();
       }
       else
       {
          publishEvent.Reset();
         }};

5. Update Secret

See #342

6. SuperStream Creation/Deletion

See: #357

There are two ways to create the super-stream:

  1. Based on partitions with:
const string SuperStream = "my-first-system-super-stream";
var spec = new PartitionsSuperStreamSpec(SuperStream, 2);
await system.CreateSuperStream(spec);
  1. Based on keys with:
const string SuperStream = "countries";
var system = await StreamSystem.Create(new StreamSystemConfig());
var conf = new BindingsSuperStreamSpec(SuperStream, new[] { "italy", "france", "spain", "germany", "uk" });
await system.CreateSuperStream(conf);

Enhancements

Bug fix

  • Fix connections to server on different locales by @Noonlord in #329

New Contributors

Full Changelog: v1.7.4...v1.8.0