-
Notifications
You must be signed in to change notification settings - Fork 160
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support multiple processors #39
Comments
Copied from #36
@josevalim said:
|
I'm in a CQRS/ES application that's consuming events from a RabbitMQ message bus as producer. What I'd like to do is to define different processors so that I can handle the messages differently. I have a web app funneling webhook events through a rabbitmq instance, that is my producer. I essentially would like to do something like this: defmodule MyBroadway do
use Broadway
def start_link(_opts) do
Broadway.start_link(MyBroadway,
name: MyBroadwayExample,
producers: [
default: [
module: {MyProducerModule, []},
stages: 1
]
],
processors: [
webhook_source: [
module: MyWebookSourceModule
stages: 50
],
other_webhook_source: [
module: MyOtherWebookSourceModule
stages: 50
]
]
)
end
end This would allow me to route messages to the correct handlers and clean up my broadway declaration file. I'm happy to help with this, wether it's doc or implementation. |
@bdubaut in this case, you don’t need multiple processors. Processors shouldn’t be used to route business logic, for business logic, you can have code in handle_message that matches on the message and dispatches to the appropriate place. |
@josevalim I'm having trouble seeing how we would partition to the right set of processors. I think we can likely avoid the multiple processors, at least as far as the use cases I've use Broadway in go :) |
To me multiple processors would run as steps (one after the other).
--
*José Valimwww.plataformatec.com.br
<http://www.plataformatec.com.br/>Founder and Director of R&D*
|
@josevalim That's a cool idea, in that case I feel like the Unless I'm the only one that understood it this way, I wonder how to make this clearer either in the doc or in the broadway options themselves 🤔 |
Well, we can only have one processor right now, so it is not a problem. But
if we ever implement this, we would then make it clear.
--
*José Valimwww.plataformatec.com.br
<http://www.plataformatec.com.br/>Founder and Director of R&D*
|
@josevalim I'd like to make an attempt at this, if it's still viable. |
Hi @mcrumm, thanks for the ping but we are still waiting for valid use cases. Do you have any? :D |
Hi @josevalim. I would say that use case could be following: We use such kind of GenStage pipeline to fetch data from Snapchat Marketing API (https://developers.snapchat.com/api/docs/#introduction). Here (http://big-elephants.com/2019-01/facebook-genstage/) is the detailed description of this approach. |
Thanks for sharing @van-mronov! As I mentioned in my replies above, I am still unconvinced the logic you described would really benefit from multiple steps. For example, in what you described, I would have a single producer, N consumers, and have each of the consumers do the work from beginning to end. So a consumer would do:
Without a need into break those into a bunch of processors. The reason is because our CPU concurrency is always bounded by the machine cores so as long as you have one stage with N processors, where N is the number of cores, you will maximize CPU usage. But then you can say "but José, they have to do I/O too", then you can easily start 100 consumers instead, and everything will still work one because the VM is really good at resources allocation and also performs work stealing. Even if somehow you are already not happy with that, note that you have been doing 1 producer - N consumers so far, but you can also flip it to M producers - N consumers, and that already adds a lot of flexibility to split the load. So in my mind, the goal of having multiple processors is to effectively maximize the machine resources and I haven't yet heard of a problem where you can't do that with Broadway topologies as is. In fact, I am worried adding this feature will only push people towards less optimal designs, because they will use multiple processors as logical steps, while they are everything BUT that. |
Thank you @josevalim. I see what you mean. I believe we started to use several processors layers since |
Thanks for the detailed explanation -- that cleared up a number of things for me. A use case, as it was described to me, wouldn't actually be served by simply adding more processor stages, either. However, I thought it was interesting, so to close the loop on why I resurrected this issue I'll share it here. So right now, we receive a message in the processor and we can fan-in to a batcher. What if we could fan-out of batches, into a subsequent processor? So, for instance, I receive messages one at a time in Processor --> Batcher --> Processor --> Batcher |
@mcrumm yes, I can see this case being a necessity. I wonder if we could handle it by simply sending it to another Broadway pipeline, that has producer-based processes? |
I think that could be a really elegant solution! Currently we landed on using two Broadway pipelines, which is perfectly fine, but if we could essentially compose them together without the need publish back out to a remote queue, that would be pretty fantastic. |
Yes please! :) |
We have all seen similar pipelines for streaming frameworks like Flink, where you have multiple stages partitioned by key for aggregating or joining messages on stream. In that case it has value because processes are distributed and handle their own big state. |
Hello José, thanks for your wonderful work :) Here is a use case I came across that might benefit from multiple processors:
The analysis part is mostly IO and brief single core bursts so we want to have e.g. The ffmpeg part reencodes clips to 720p, retrieving relevant parts directly from the source HTTP video; ffmpeg uses all available cores when doing this so we must limit the number of ffmpeg processes to something reasonable (e.g. We did not want to limit ffmpeg max threads, because we want ffmpeg to use all cores when there's only 1 encoding process in flight. So combining analysis + ffmpeg in a single processor did not work well, we either had too much ffmpeg or not enough analysis parallel processes. Note 1: the Erlang VM wonders don't apply here, because the CPU intensive tasks happen in external processes. Note 2: processors' So I started rewriting the pipeline with |
Hi @flupke!
This should not be an issue in practice because we rescue any failure during process. The one_for_all is really to handle bugs in Broadway which should not happen. Other than that, maybe a pool design with a single layer of processors may be good enough for you. The pool will guarantee a certain number of ffmpeg processes are being used and, while it will block, I think it is OK but ffmpeg will be the bottleneck (and it will also use CPU anyway). |
I see, thank you for the clarification!
I couldn't tell how that would work, so I did an experiment. The pool implementation seems to be as performant as the 2 stages version (maybe even better on latency, but I have doubts on my code). So this is one more point against multiple processors, and very good news for me since I can stick to Broadway. Thank you very much!!! |
Hello!
@josevalim, my question is not directly related to mcrumm's case, I am just wondering - when we connect multiple pipelines, how acknowledgement semantics works? Would it be a necessity to do something like that, for example? RabbitMQ -> Producer -> Processor -> Batcher -> | pipeline boundary | -> RabbitMQ -> Producer -> Processor -> Batcher Or do you see a sound way of ACKing from the next pipeline to previous pipeline Batcher? 🤔 |
You would disable acking on the first and then include the relevant metadata to ack on the second. If I remember correctly you can update the ack configuration on the fly. But, as per my previous replies, I am still skeptical that multiple processors are useful except for very few corner cases. |
Thank you for the answer! By no means I am entering polemics here, just sharing my gut feeling, that separation of concerns, not trying to squeeze too much stuff into single Processor type would be more sound approach :) |
Separation of concerns is modeled by modules, not processors. Processors are about runtime properties. Using processors for design purposes will lead to inefficient pipelines. |
Thank you for your time, appreciated! I will try to continue with the path/approach you outlined. |
I first want to say thank you to everyone involved in Broadway, it's an amazing library! I was considering it and GenStage for a use case I have where I want to have batching, rate-limiting, and eventually back-pressure, so it seemed like a better tool than just GenStage. But I also wanted to be able to implement a topology where one event is processed in parallel (or sequentially) by several different processors where they don't have anything to do with each other. Something like: [A - producer] Produce event -> [B - consumer] Send some requests [C - consumer] Log event [D - consumer] Push some notifications Up until today I didn't find in the documentation any mention that Broadway can have only one processor, and the documentation begins with:
But then below in the docs it says:
I imagine I can send the events to several different pipelines, it would also allow for more fine-grained producer concurrency control. I also wonder if there's a way to subscribe custom GenStage consumers. Or perhaps use different batches with duplicate data and in a way let the different batchers be the steps B, C, and D? But I also wanted to ask - if that's the case, what is meant by |
@NicolayD per the discussion above, the general observation is that breaking it into steps will often make things slower than faster. Your code should rather be: Producer: A Or perhaps D could happen in a batch processor. This is a common misconception where folks want to make design steps into logical steps, but moving data around is expensive and you should only do it when necessary.
Each layer (producer, processor, batcher) have multiple internal stages. But even if you discount that, the skeleton of producer -> processor -> batcher -> batch processor is already multi. |
I see, thank you very much! It does make sense to be in one step. |
Issue opened to collect feedback from the community.
The text was updated successfully, but these errors were encountered: