From ece98ef442c772369277016cf7ec28f7fad53384 Mon Sep 17 00:00:00 2001 From: jonnepmyra <44847685+jonnepmyra@users.noreply.github.com> Date: Mon, 29 Aug 2022 10:53:58 +0200 Subject: [PATCH] Handle CreditResponse when subscription is unknown on server (#159) --- RabbitMQ.Stream.Client/Client.cs | 4 ++ RabbitMQ.Stream.Client/CreditResponse.cs | 55 +++++++++++++++++++ .../PublicAPI.Unshipped.txt | 5 ++ 3 files changed, 64 insertions(+) create mode 100644 RabbitMQ.Stream.Client/CreditResponse.cs diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 8df7d6a8..5d3c14b0 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -386,6 +386,10 @@ private async Task HandleIncoming(Memory frameMemory) TuneResponse.Read(frame, out var tuneResponse); tuneReceived.SetResult(tuneResponse); break; + case CreditResponse.Key: + CreditResponse.Read(frame, out var creditResponse); + creditResponse.HandleUnRoutableCredit(); + break; default: HandleCorrelatedCommand(tag, ref frame); break; diff --git a/RabbitMQ.Stream.Client/CreditResponse.cs b/RabbitMQ.Stream.Client/CreditResponse.cs new file mode 100644 index 00000000..5dacc5ee --- /dev/null +++ b/RabbitMQ.Stream.Client/CreditResponse.cs @@ -0,0 +1,55 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Buffers; + +namespace RabbitMQ.Stream.Client +{ + public readonly struct CreditResponse : ICommand + { + public const ushort Key = 9; + + private CreditResponse(ResponseCode responseCode, byte subscriptionId) + { + SubscriptionId = subscriptionId; + ResponseCode = responseCode; + } + + public int SizeNeeded => throw new NotImplementedException(); + + private byte SubscriptionId { get; } + + private ResponseCode ResponseCode { get; } + + public int Write(Span span) + { + throw new NotImplementedException(); + } + + internal static int Read(ReadOnlySequence frame, out CreditResponse command) + { + var offset = WireFormatting.ReadUInt16(frame, out _); + offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _); + offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode); + offset += WireFormatting.ReadByte(frame.Slice(offset), out var subscriptionId); + + command = new CreditResponse((ResponseCode)responseCode, subscriptionId); + return offset; + } + + internal void HandleUnRoutableCredit() + { + /* the server sends a credit-response only in case of + * problem, e.g. crediting an unknown subscription + * (which can happen when a consumer is closed at + * the same time as the deliverhandler is working + */ + + LogEventSource.Log.LogWarning( + $"Received notification for subscription id: {SubscriptionId} " + + $"code: {ResponseCode}"); + } + } +} diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index e58c12e5..95ab4590 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -53,6 +53,7 @@ const RabbitMQ.Stream.Client.CloseResponse.Key = 22 -> ushort const RabbitMQ.Stream.Client.CreateRequest.Key = 13 -> ushort const RabbitMQ.Stream.Client.CreateResponse.Key = 13 -> ushort const RabbitMQ.Stream.Client.CreditRequest.Key = 9 -> ushort +const RabbitMQ.Stream.Client.CreditResponse.Key = 9 -> ushort const RabbitMQ.Stream.Client.DeclarePublisherRequest.Key = 1 -> ushort const RabbitMQ.Stream.Client.DeclarePublisherResponse.Key = 1 -> ushort const RabbitMQ.Stream.Client.DeletePublisherRequest.Key = 6 -> ushort @@ -303,6 +304,10 @@ RabbitMQ.Stream.Client.CreditRequest.CreditRequest() -> void RabbitMQ.Stream.Client.CreditRequest.CreditRequest(byte subscriptionId, ushort credit) -> void RabbitMQ.Stream.Client.CreditRequest.SizeNeeded.get -> int RabbitMQ.Stream.Client.CreditRequest.Write(System.Span span) -> int +RabbitMQ.Stream.Client.CreditResponse +RabbitMQ.Stream.Client.CreditResponse.CreditResponse() -> void +RabbitMQ.Stream.Client.CreditResponse.SizeNeeded.get -> int +RabbitMQ.Stream.Client.CreditResponse.Write(System.Span span) -> int RabbitMQ.Stream.Client.DeclarePublisherRequest RabbitMQ.Stream.Client.DeclarePublisherRequest.DeclarePublisherRequest() -> void RabbitMQ.Stream.Client.DeclarePublisherRequest.DeclarePublisherRequest(uint correlationId, byte publisherId, string publisherRef, string stream) -> void