Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Publishers with handled back pressure from business logic #50

Closed
am-kantox opened this issue Feb 27, 2019 · 17 comments
Closed

Comments

@am-kantox
Copy link

As it was discussed at λ-days, I am to describe our use case for Broadway.

We have a Rabbit queue of incoming messages at the rate ~10K/sec in peaks.
We apply validation rules onto them since some of the messages are malformed.
These validation rules might be time-consuming, the result of applying them
would be another Rabbit queue (valid messages.)

Then we apply some processing again and deliver them to another queue(s).
That said, we currently use Rabbit as a communication channel between
different processes, microservices, and the main application. That simplifies
the architecture because there are very few if any ties between code
in different microservices.

It could be drawn this way:

——————————   ——————————   ——————————   ——————————   ——————————   —————————— 
| Rabbit | → |  Flow  | → | Rabbit | → |  ....  | → | Rabbit | → |  Flow  | 
——————————   ——————————   ——————————   ——————————   ——————————   —————————— 

Flows over GenStages are used for heavy computation. To avoid code repetitions,
we built a library that takes one or more processors in a form of {Mod, :fun} tuples
alongside with sources and destinations. It was created two years ago and
basically works over connection pools and compiled configs, that’s why we were so
excited to hear about Broadway. Unfortunately in it’s current documentation
Broadway is meant as a “dead end,” meaning that it has a great infrastructure
to handle back pressure and to process incoming data, but one cannot just plug-and-play
the existing Flow implementation into the pipeline as shown above.

It would be great to have [optional] publishers or how do you name it as well
(top half of the picture was taken from announce by Plataformatec):

                         [producer_1]
                             / \
                            /   \
                           /     \
                          /       \
                 [processor_1] [processor_2] ... [processor_50]  <- process each message
                          /\     /\
                         /  \   /  \
                        /    \ /    \
                       /      x      \ 
                      /      / \      \
                     /      /   \      \
                    /      /     \      \
              [batcher_s3_odd]  [batcher_s3_even]
                    /\                  \
                   /  \                  \
                  /    \                  \
                 /      \                  \
 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch

 ————————————————————————————————————————————————————————————————————————————————————————

 [consumer_s3_odd_1] [consumer_s3_odd_2]  [consumer_s3_even_1] <- process each batch
                \      /             \      /                   
                 \    /               \    /                   
                  \  /                 \  /                   
                   \/                   \/                   
            [batcher_s3_odd]     [batcher_s3_even]
                         / \     /\ 
                        /   \   /  \
                       /     \ /    \
                      /       x      \
                     /       / \      \
                    /       /   \      \
                   /       /     \      \
                  /       /       \      \
                [publisher_1]   [publisher_2] ... [publisher_50]  <- publish each message

That way the consumer part (optionally including batchers) might be easily
extracted into a separate codebase, making the business code fully isolated from
any broker/connection handling.

We currently use kinda this architecture with supported source backends RabbitMQ / HTTP and
destinations backends SQL / RabbitMQ / HTTP / Slack. We are able to simply add another publisher
and change nothing in the business logic to export data to the database or to another
say client who requires HTTP webhooks.

As I said, our implementation of connectors is just a pool hence we need to reimplement
back pressure support in our business logic units. It is not as much of boilerplate,
but I am pretty sure it might bring a huge added value to Broadway if it was supported
through a configuration like:

def start_link(_opts) do
  Broadway.start_link(__MODULE__,
    name: __MODULE__,
    producers: [...],
    processors: [...],
    batchers: [...],

    pipeline: {MyBusinessUnit, :process},

    batchers: [
      s3_positive: [stages: 2, batch_size: 10],
      s3_negative: [stages: 1, batch_size: 10]
    ],
    publishers: [
      rabbit: [
        module: {BroadwayRabbitMQ.Publisher, exchange_name: "my_exchange"}
      ]
    ]
  )
end
@msaraiva
Copy link
Collaborator

msaraiva commented Feb 27, 2019

Hi @am-kantox. Thanks for your proposal!

We have already discussed some ideas to make Broadway more flexible/pluggable in the near future. However, we decided, for now, to get more feedback like this from the community before we make a decision in that direction.

We certainly going to get back to this issue in the next few days/weeks, so stay tuned and thanks again for your feedback!

@josevalim
Copy link
Member

Hi @am-kantox! Thanks for feedback.

I have some questions about your current pipeline.

When you say you have RabbitMQ -> Flow -> RabbitMQ -> Flow, do you mean that:

  1. you get the data from RabbitMQ, then you process it with Flow, then you send it to RabbitMQ, and then there is another Flow that gets the data from RabbitMQ again?

  2. or you get the data from RabbityMQ, then you process it with Flow, then you send it to RabbitMQ, and continue processing it with Flow (without getting it from RabbitMQ again)?

That will help us steer the discussion in the correct direction.

@am-kantox
Copy link
Author

am-kantox commented Feb 27, 2019 via email

@josevalim
Copy link
Member

The first one. We try to decouple all Flows as much as possible, hence no ties.

@am-kantox so in this case, you should be able to create multiple broadway pipelines. Correct? Why would publishers be necessary? Or is just that you don't need batching?

@am-kantox
Copy link
Author

am-kantox commented Feb 27, 2019 via email

@am-kantox
Copy link
Author

That does mean that Broadway might take care about number of publishers to Rabbit, or about number of requests per a sec to some other service, assuming we have peak times and quiet times.

@josevalim
Copy link
Member

There are two ways we can implement this functionality today with GenStage:

  1. You have custom consumers that set the demand to manual to have granular control over the demand

  2. You simply block in the consumer until the thing you are calling can accept the request

Almost everyone is doing 2, because it is by far the simplest approach, and that's what Broadway supports. It is also not a major issue to block a consumer.

Have you implemented 1 by any chance? If so, I would love to take a look at it, otherwise, I would postpone this until we have use cases in place.

@am-kantox
Copy link
Author

am-kantox commented Feb 27, 2019 via email

@josevalim
Copy link
Member

Still, the ability to deliberately select a publish method according to the endpoind is valuable IMHO.

You can achieve this today by setting up multiple batchers. :)

@am-kantox
Copy link
Author

am-kantox commented Feb 27, 2019 via email

@josevalim
Copy link
Member

Batchers are about grouping data before "publishing" it to a separate step. So for example, if you want write to RabbitMQ one by one and write to the database ten by ten, you define two batchers.

So 10 different destinationis is totally possible today. 10 processing steps aren't right now but this is tracked in #39.

@am-kantox
Copy link
Author

I am not a mind reader :)

Glad to hear this title means exactly this.

@josevalim
Copy link
Member

@am-kantox the batchers part though (10 destinations) exists already today and is in the docs. Was that not made clear in the docs? Is there something we could improve?

@am-kantox
Copy link
Author

am-kantox commented Feb 27, 2019 via email

@josevalim
Copy link
Member

If you are using Broadway, I don't see why you would use Flow. We already do all of the partitioning, parallelism, etc for you. Given you said you are doing RabbitMQ -> Flow -> RabbitMQ -> Flow this will be two broadway pipelines: one that gets from RabbitMQ, processes it and sends to another RabbitMQ channel. And another pipeline that gets from RabbitMQ as well and consumes it.

Why would you need both Broadway and Flow?

@am-kantox
Copy link
Author

Flow is a more general and powerful abstraction than Broadway that focuses on data as a whole, providing features like aggregation, joins, windows, etc. — README

@josevalim I understand that there are two Broadway pipelines, but this is out of the scope of why Flow, so let’s assume we have one for simplicity.

We have a complicated Flow with several emits and we need to keep and pass an accumulator through the whole Flow. Basically, this is a processing of the huge CSV having several different partitions and a complicated logic to produce different new Ecto schemas out of it. Either I did not get how Broadway could be used instead, or we are stick to Flow for this particular processing. Partitioning inside these Flows is done by, say, properties.

OTOH, we want this Flows to act as plugins agnostic to the real data source. It might be several CSVs to be glued into one, or even many API calls with “a single row” data in each of them. For the latter, we want to use Broadway, processing different data sources and partitioning them, say, by client. This partitioning is relatively easy and does not require Flow.Window and other sophisticated stuff from Flow.

And we want these parts to be independent as much as possible. I hope the above answers the question.

@josevalim
Copy link
Member

Right, you can't do accumulator in Broadway, so if that is a must, Flow is a better option! But we can allow hashing, which will at least guarantee the same processors handle the same data. This can be useful at least for ordering. I have opened #62 for this case. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants