Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Have ShardingAdapter recursively call the underlying IMessageExtractor #7474

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,28 @@ namespace Akka.Cluster.Sharding.Tests;

public class AutomaticallyHandledExtractorMessagesSpec
{
public sealed record MyWrappedMessage(string EntityId, string Message);

// custom IMessageExtractor
public class MyMessageExtractor : IMessageExtractor
{
public string? EntityId(object message) => message switch
{
string s => s,
MyWrappedMessage wrapped => wrapped.EntityId,
_ => null
};

public object? EntityMessage(object message) => message;
public object? EntityMessage(object message)
{
switch (message)
{
case MyWrappedMessage wrapped:
return wrapped.Message;
default:
return message;
}
}

public string? ShardId(object message) => message switch
{
Expand All @@ -35,65 +47,47 @@ public string ShardId(string entityId, object? messageHint = null)
return entityId;
}
}

#pragma warning disable CS0618 // Type or member is obsolete
private ExtractEntityId ExtractEntityId = message =>
{
if (message is string s)
return (s, s);
return Option<(string, object)>.None;
};

private ExtractShardId ExtractShardId = message =>
{
if (message is string s)
return s;
return null!;
};
#pragma warning restore CS0618 // Type or member is obsolete
public static readonly TheoryData<(object shardingInput, object realMsg, string entityId, string shardId)>
Messages = new()
{
// (new ShardRegion.StartEntity("foo"), new ShardRegion.StartEntity("foo"), "foo", "foo"),
(new ShardingEnvelope("bar", "baz"), "baz", "bar", "bar"), ("bar", "bar", "bar", "bar"),
};

public static readonly TheoryData<(object shardingInput, object realMsg, string entityId, string shardId)> Messages = new()
{
// (new ShardRegion.StartEntity("foo"), new ShardRegion.StartEntity("foo"), "foo", "foo"),
(new ShardingEnvelope("bar", "baz"), "baz", "bar", "bar"),
("bar", "bar", "bar", "bar"),
};

[Theory]
[MemberData(nameof(Messages))]
public void ShouldAutomaticallyHandleMessagesInCustomIMessageExtractor((object shardingInput, object realMsg, string entityId, string shardId) data)
public void ShouldAutomaticallyHandleMessagesInCustomIMessageExtractor(
(object shardingInput, object realMsg, string entityId, string shardId) data)
{
// arrange
var extractor = new ExtractorAdapter(new MyMessageExtractor());

// act
var entityId = extractor.EntityId(data.shardingInput);
var entityMessage = extractor.EntityMessage(data.shardingInput);
var shardId = extractor.ShardId(entityId!, data.shardingInput);

// assert
entityId.Should().Be(data.entityId);
entityMessage.Should().Be(data.realMsg);
shardId.Should().Be(data.shardId);
}

// NOTE: so the old delegates are hopeless and will simply not work - you HAVE to handle the messages yourself there
// need to repeat of the previous test but using the deprecated delegate methods and the adapter
// [Theory]
// [MemberData(nameof(Messages))]
// public void ShouldAutomaticallyHandleMessagesInCustomIMessageExtractorUsingDelegates((object shardingInput, object realMsg, string entityId, string shardId) data)
// {
// // arrange
// var extractor = new ExtractorAdapter(new DeprecatedHandlerExtractorAdapter(ExtractEntityId, ExtractShardId));
//
// // act
// var entityId = extractor.EntityId(data.shardingInput);
// var entityMessage = extractor.EntityMessage(data.shardingInput);
// var shardId = extractor.ShardId(entityId!, data.shardingInput);
//
// // assert
// entityId.Should().Be(data.entityId);
// entityMessage.Should().Be(data.realMsg);
// shardId.Should().Be(data.shardId);
// }
}

[Fact]
public void ShouldUnwrapMessageInsideShardingEnvelope()
{
// arrange
var extractor = new ExtractorAdapter(new MyMessageExtractor());
var myMessage = new MyWrappedMessage("entity1", "hello");
var envelope = new ShardingEnvelope("entity1", myMessage);

// act
var entityId = extractor.EntityId(envelope);
var entityMessage = extractor.EntityMessage(envelope);

// assert
entityId.Should().Be("entity1");
entityMessage.Should().Be("hello");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ExtractorAdapter(IMessageExtractor underlying)
{
return message switch
{
ShardingEnvelope se => se.Message,
ShardingEnvelope se => _underlying.EntityMessage(se.Message),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also call the underlying message extractor's EntityMessage method on the payload of the ShardingEnvelope, just in case that also needs unwrapping. This is needed in order to make sure that messages still work properly when using tools like Akka.Cluster.Sharding.Delivery, which uses the ShardingEnvelope heavily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a recursive search here instead, just to make doubly sure?

something in the line of

private object UnwrapShardingEnvelope(object message)
{
  object unwrapped = message;
  while(unwrapped is IWrappedMessage)
  {
    if(unwrapped is ShardingEnvelope)
      return unwrapped;
    unwrapped = unwrapped.Message;
  }
  return message;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because that's introducing side-effects the user is ultimately responsible for - we only care about the ShardingEnvelope specifically

_ => _underlying.EntityMessage(message)
};
}
Expand Down
Loading