Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for redirecting messages #138

Closed
wants to merge 1 commit into from

Conversation

mcrumm
Copy link
Collaborator

@mcrumm mcrumm commented Nov 25, 2019

Initial work on a redirect/2 function to dispatch messages from handle_batch/4 in PipelineA to handle_message/3 in PipelineB. Related to the discussion in #39

I feel like this is close to where it needs to be, but the Consumer still fails the entire batch when a single dispatch times out, which isn't exactly what I was going for.

Opening a draft PR to get feedback on the current progress. Thanks!

@josevalim
Copy link
Member

Hi @mcrumm!

I apologize because I may not have been clear enough in our previous convos. I am currently not sure if this is a pattern I would like in Broadway per se, so I think this would be best built as a library first. Maybe a off_broadway_in_memory_producer or similar?

Other than that, the code looks great! You may consider though giving the producer a name that reflects its behaviour a bit more. In particular, we have to three types of in-memory producers:

  1. One that sends messages via cast (fully async)
  2. One that sends messages via call and confirms the call on handle_call
  3. One that sends messages via call and confirms the call on handle_demand (current implementation)

In 1 and 2 you can always send the message directly in handle_call and handle_cast and use the :buffer_size config of GenStage to control the buffer.

3 sounds the safest, as it is fully backed by back-pressure, but it may lead to a long time waiting and the call instructions may timeout. So you may want to watch that carefully.

Overall, you may want to consider the following improvements:

  1. You may want to measure how long it takes to push things into the producer
  2. It may be necessary to have more than one type of producer: at least 1 and 3 or 2 and 3 - hence a reason to name them based on their behaviour
  3. Allow multiple messages to be pushed at once. The idea is to put lists of messages into the queue. Here is an implementation I recently had to use at BroadwayKafka for queue and dequeing many: https://gist.github.com/josevalim/0ac793814bb17eeb59f9d73fb8a7346e - note I am acking as soon as the first entry in the pipeline is sent but the last one would also be fine

Feel free to ask if you have any questions!

@mcrumm
Copy link
Collaborator Author

mcrumm commented Nov 26, 2019

@josevalim Thank you so much for the insight and for the enqueue_many example! I'll take a closer look and give this a shot as a standalone lib.

@mcrumm mcrumm closed this Nov 26, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants