Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observe and ToObservable for IEncodedConnection #554

Open
hypdeb opened this issue Feb 8, 2022 · 6 comments
Open

Observe and ToObservable for IEncodedConnection #554

hypdeb opened this issue Feb 8, 2022 · 6 comments

Comments

@hypdeb
Copy link

hypdeb commented Feb 8, 2022

Feature Request

Use Case:

Same use cases as for IConnection.

Proposed Change:

Implement extension methods Observe and ToObservable for IEncodedConnection.

Who Benefits From The Change(s)?

Users of Rx and IEncodedConnection.

Alternative Approaches

There's none I can see. I'm new to NATS, maybe there's something I'm not understanding.

@hypdeb
Copy link
Author

hypdeb commented Feb 9, 2022

Maybe I can expand a bit on what I was expecting when using an IEncodedConnection with Rx: I would have expected to have a straight forward way of getting an observable of the decoded object directly from the subscription like this

IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
connection.OnDeserialize = data =>
{
  // Deserialization logic, e.g. using protocol buffers.
};
IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
INATSObservable<object> observable = subscription.ToObservable()
  .Select(decodedObject => {
    // Mapping logic.
  });

Note that in the above, the IEncodedAsyncSubscription type doesn't exist and is what I think would be needed for that to work, as an alternative to IAsyncSubscription.

@scottf
Copy link
Collaborator

scottf commented Feb 9, 2022

@hypdeb Can you please review this unit test https://github.com/nats-io/nats.net/blob/master/src/Tests/IntegrationTests/TestEncoding.cs
If looks like you can already received encoded messages using the normal async subscription. If you are already aware of this, maybe can you explain why you need to observe a message that you can already receive async?

@hypdeb
Copy link
Author

hypdeb commented Feb 9, 2022

@scottf Thanks for coming back to me on this.

I don't think the test addresses my issue. I know I can receive the decoded messages, what I am missing is a handy way of extracting an observable sequence out of those messages, like it exists for the IConnection using the Observe and ToObservable extension methods defined here
https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/Rx/RxExtensions.cs#L18

It is possible what I'm trying to do is somehow not smart, therefore let me add a bit more context: I have a messaging service with an interface looking like this:

interface IMessagingService
{
  IObservable<MessageType> ReadMessages();
}

that I would like to implement like this:

class NatsMessagingService
{
  IObservable<MessageType> ReadMessages()
  {
    IEncodedConnection connection = this.connectionFactory.CreateEncodedConnection(configuration.Url)
    connection.OnDeserialize = data =>
    {
      // Deserialization logic, e.g. using protocol buffers.
    };
    IEncodedAsyncSubscription subscription = connection.SubscribeAsync("Subject");
    INATSObservable<MessageType> observable = subscription.ToObservable()
      .Select(decodedObject => {
        // Mapping logic.
      });
    return observable;
  }
}

I cannot do that at this time because there is no ToObservable or Observe extension methods for IEncodedConnection and the ToObservable method defined on IAsyncSubscription produces an observable sequence of Msg, which doesn't contain the decoded message (as far as I can see), but only the byte[].

@scottf
Copy link
Collaborator

scottf commented Feb 9, 2022

I think you might need some or an entire duplication of the RX package just for encoded connection. Maybe / maybe not, this is an area in the .NET client that I really don't use. You are welcome to fork the repo and propose a solution.

@hypdeb
Copy link
Author

hypdeb commented Feb 10, 2022

The Rx stuff is actually okay, the issue is this:

https://github.com/nats-io/nats.net/blob/e51e0c940f785ac9105dcb15cf27196f73af3b9d/src/NATS.Client/AsyncSub.cs#L37

What is missing is a subscription with and EncodedMessageEventArgs instead of MsgHandlerEventArgs.

Which part of the code are you referring to when you say that you are not using it ? Encoded connections or Rx ? Would you know of someone who is using that who could have a look ?

@hypdeb
Copy link
Author

hypdeb commented Mar 30, 2022

I am noticing a further discrepancy between the functionality supported by IConnection and IEncodedConnection, in that IEncodedConnection doesn't support CreateJetStreamContext. I think I will have to somehow make due with IConnection as it is too painful to use IEncodedConnection at this stage.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants