Skip to content

Commit

Permalink
Expose initial credits configuration (#265)
Browse files Browse the repository at this point in the history
* Expose initial credits configuration

ref: #264
Signed-off-by: Gabriele Santomaggio <[email protected]>

* Add constant for the default consumer credits

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored May 18, 2023
1 parent e42d8fe commit 0f5cd64
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 6 deletions.
1 change: 1 addition & 0 deletions RabbitMQ.Stream.Client/Consts.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal static class Consts
internal static readonly TimeSpan ShortWait = TimeSpan.FromSeconds(1);
internal static readonly TimeSpan MidWait = TimeSpan.FromSeconds(3);
internal static readonly TimeSpan LongWait = TimeSpan.FromSeconds(10);
internal const ushort ConsumerInitialCredits = 2;

internal static int RandomShort()
{
Expand Down
23 changes: 23 additions & 0 deletions RabbitMQ.Stream.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ public interface IConsumer

public record IConsumerConfig : INamedEntity
{
private ushort _initialCredits = Consts.ConsumerInitialCredits;

// StoredOffsetSpec configuration it is needed to keep the offset spec.
// since the offset can be decided from the ConsumerConfig.OffsetSpec.
// and from ConsumerConfig.ConsumerUpdateListener.
Expand All @@ -40,4 +42,25 @@ public record IConsumerConfig : INamedEntity
public string Reference { get; set; }

public Func<string, Task> ConnectionClosedHandler { get; set; }

// InitialCredits is the initial credits to be used for the consumer.
// if the InitialCredits is not set, the default value will be 2.
// It is the number of the chunks that the consumer will receive at beginning.
// A high value can increase the throughput but could increase the memory usage and server-side CPU usage.
// The RawConsumer uses this value to create the Channel buffer so all the chunks will be stored in the buffer memory.
// The default value it is usually a good value.
public ushort InitialCredits
{
get => _initialCredits;
set
{
if (value < 1)
{
throw new ArgumentException(
$"InitialCredits must be greater than 0. Default value is {Consts.ConsumerInitialCredits}.");
}

_initialCredits = value;
}
}
}
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ RabbitMQ.Stream.Client.AbstractEntity.Token.get -> System.Threading.Cancellation
RabbitMQ.Stream.Client.Chunk.Data.get -> System.Memory<byte>
RabbitMQ.Stream.Client.Chunk.MagicVersion.get -> byte
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.IConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.get -> ushort
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.InitialCredits.set -> void
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Close() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.GetLastPublishedId() -> System.Threading.Tasks.Task<ulong>
Expand Down
15 changes: 11 additions & 4 deletions RabbitMQ.Stream.Client/RawConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,20 @@ public class RawConsumer : AbstractEntity, IConsumer, IDisposable
private byte _subscriberId;
private readonly ILogger _logger;
private readonly Channel<Chunk> _chunksBuffer;
private const int InitialCredit = 2;
private readonly ushort _initialCredits;

private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = null)
{
_logger = logger ?? NullLogger.Instance;
_initialCredits = config.InitialCredits;
_logger.LogDebug("creating consumer {Consumer} with initial credits {InitialCredits}, " +
"offset {OffsetSpec}, is single active consumer {IsSingleActiveConsumer}, super stream {SuperStream}, client provided name {ClientProvidedName}, " +
config.Reference,
_initialCredits, config.OffsetSpec, config.SuperStream, config.IsSingleActiveConsumer,
config.ClientProvidedName);

// _chunksBuffer is a channel that is used to buffer the chunks
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(InitialCredit)
_chunksBuffer = Channel.CreateBounded<Chunk>(new BoundedChannelOptions(_initialCredits)
{
AllowSynchronousContinuations = false,
SingleReader = true,
Expand All @@ -113,7 +121,6 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu
IsPromotedAsActive = true;
_client = client;
_config = config;
_logger = logger ?? NullLogger.Instance;

ProcessChunks();
}
Expand Down Expand Up @@ -387,7 +394,7 @@ private async Task Init()

var (consumerId, response) = await _client.Subscribe(
_config,
InitialCredit,
_initialCredits,
consumerProperties,
async deliver =>
{
Expand Down
8 changes: 8 additions & 0 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ public record ConsumerConfig : ReliableConfig
/// </summary>
public Func<string, string, bool, Task<IOffsetType>> ConsumerUpdateListener { get; set; }

// InitialCredits is the initial credits to be used for the consumer.
// if the InitialCredits is not set, the default value will be 2.
// It is the number of the chunks that the consumer will receive at beginning.
// A high value can increase the throughput but could increase the memory usage and server-side CPU usage.
// The RawConsumer uses this value to create the Channel buffer so all the chunks will be stored in the buffer memory.
// The default value it is usually a good value.
public ushort InitialCredits { get; set; } = Consts.ConsumerInitialCredits;

public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream)
{
}
Expand Down
8 changes: 6 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ private async Task<IConsumer> StandardConsumer(bool boot)
Reference = _consumerConfig.Reference,
ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener,
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpec,
ConnectionClosedHandler = async _ =>
{
Expand All @@ -69,7 +70,8 @@ private async Task<IConsumer> StandardConsumer(bool boot)
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
if (_consumerConfig.MessageHandler != null)
{
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message).ConfigureAwait(false);
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
.ConfigureAwait(false);
}
},
}, BaseLogger).ConfigureAwait(false);
Expand All @@ -89,7 +91,8 @@ private async Task<IConsumer> SuperConsumer(bool boot)
}
else
{
var partitions = await _consumerConfig.StreamSystem.QueryPartition(_consumerConfig.Stream).ConfigureAwait(false);
var partitions = await _consumerConfig.StreamSystem.QueryPartition(_consumerConfig.Stream)
.ConfigureAwait(false);
foreach (var partition in partitions)
{
offsetSpecs[partition] =
Expand All @@ -104,6 +107,7 @@ private async Task<IConsumer> SuperConsumer(bool boot)
Reference = _consumerConfig.Reference,
ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener,
IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer,
InitialCredits = _consumerConfig.InitialCredits,
OffsetSpec = offsetSpecs,
MessageHandler = async (stream, consumer, ctx, message) =>
{
Expand Down
12 changes: 12 additions & 0 deletions Tests/RawConsumerSystemTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using System.Threading.Tasks;
using RabbitMQ.Stream.Client;
using RabbitMQ.Stream.Client.AMQP;
using RabbitMQ.Stream.Client.Reliable;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -617,6 +618,17 @@ public async void ConsumerMetadataHandlerUpdate()
await system.Close();
}

[Fact]
public async void ValidateInitialCredits()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);

await Assert.ThrowsAsync<ArgumentException>(async () =>
await Consumer.Create(new ConsumerConfig(system, stream) { InitialCredits = 0, }));

await SystemUtils.CleanUpStreamSystem(system, stream);
}

[Fact]
public async void ProducerConsumerMixingDifferentSendTypesCompressAndStandard()
{
Expand Down

0 comments on commit 0f5cd64

Please sign in to comment.