Skip to content

Commit

Permalink
Handle CreditResponse when subscription is unknown on server (#159)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonnepmyra authored Aug 29, 2022
1 parent dc1fe21 commit ece98ef
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 0 deletions.
4 changes: 4 additions & 0 deletions RabbitMQ.Stream.Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,10 @@ private async Task HandleIncoming(Memory<byte> frameMemory)
TuneResponse.Read(frame, out var tuneResponse);
tuneReceived.SetResult(tuneResponse);
break;
case CreditResponse.Key:
CreditResponse.Read(frame, out var creditResponse);
creditResponse.HandleUnRoutableCredit();
break;
default:
HandleCorrelatedCommand(tag, ref frame);
break;
Expand Down
55 changes: 55 additions & 0 deletions RabbitMQ.Stream.Client/CreditResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// This source code is dual-licensed under the Apache License, version
// 2.0, and the Mozilla Public License, version 2.0.
// Copyright (c) 2007-2020 VMware, Inc.

using System;
using System.Buffers;

namespace RabbitMQ.Stream.Client
{
public readonly struct CreditResponse : ICommand
{
public const ushort Key = 9;

private CreditResponse(ResponseCode responseCode, byte subscriptionId)
{
SubscriptionId = subscriptionId;
ResponseCode = responseCode;
}

public int SizeNeeded => throw new NotImplementedException();

private byte SubscriptionId { get; }

private ResponseCode ResponseCode { get; }

public int Write(Span<byte> span)
{
throw new NotImplementedException();
}

internal static int Read(ReadOnlySequence<byte> frame, out CreditResponse command)
{
var offset = WireFormatting.ReadUInt16(frame, out _);
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out _);
offset += WireFormatting.ReadUInt16(frame.Slice(offset), out var responseCode);
offset += WireFormatting.ReadByte(frame.Slice(offset), out var subscriptionId);

command = new CreditResponse((ResponseCode)responseCode, subscriptionId);
return offset;
}

internal void HandleUnRoutableCredit()
{
/* the server sends a credit-response only in case of
* problem, e.g. crediting an unknown subscription
* (which can happen when a consumer is closed at
* the same time as the deliverhandler is working
*/

LogEventSource.Log.LogWarning(
$"Received notification for subscription id: {SubscriptionId} " +
$"code: {ResponseCode}");
}
}
}
5 changes: 5 additions & 0 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const RabbitMQ.Stream.Client.CloseResponse.Key = 22 -> ushort
const RabbitMQ.Stream.Client.CreateRequest.Key = 13 -> ushort
const RabbitMQ.Stream.Client.CreateResponse.Key = 13 -> ushort
const RabbitMQ.Stream.Client.CreditRequest.Key = 9 -> ushort
const RabbitMQ.Stream.Client.CreditResponse.Key = 9 -> ushort
const RabbitMQ.Stream.Client.DeclarePublisherRequest.Key = 1 -> ushort
const RabbitMQ.Stream.Client.DeclarePublisherResponse.Key = 1 -> ushort
const RabbitMQ.Stream.Client.DeletePublisherRequest.Key = 6 -> ushort
Expand Down Expand Up @@ -303,6 +304,10 @@ RabbitMQ.Stream.Client.CreditRequest.CreditRequest() -> void
RabbitMQ.Stream.Client.CreditRequest.CreditRequest(byte subscriptionId, ushort credit) -> void
RabbitMQ.Stream.Client.CreditRequest.SizeNeeded.get -> int
RabbitMQ.Stream.Client.CreditRequest.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.CreditResponse
RabbitMQ.Stream.Client.CreditResponse.CreditResponse() -> void
RabbitMQ.Stream.Client.CreditResponse.SizeNeeded.get -> int
RabbitMQ.Stream.Client.CreditResponse.Write(System.Span<byte> span) -> int
RabbitMQ.Stream.Client.DeclarePublisherRequest
RabbitMQ.Stream.Client.DeclarePublisherRequest.DeclarePublisherRequest() -> void
RabbitMQ.Stream.Client.DeclarePublisherRequest.DeclarePublisherRequest(uint correlationId, byte publisherId, string publisherRef, string stream) -> void
Expand Down

0 comments on commit ece98ef

Please sign in to comment.