Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for redirecting messages #138

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 183 additions & 1 deletion lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ defmodule Broadway do
you can set the `:partition_by` option. See "Ordering and partitioning".

* Rate-limiting (TODO)

* Message Routing - Broadway provides conveniences for
building complex messaging architectures. Messages can
be redirected through multiple pipelines for any
desired fan in/out configuration. For example, if you
want to send messages from PipelineA to PipelineB, it
can be done by calling `Broadway.redirect/2`.
See "Chaining Pipelines".

* Statistics/Metrics (TODO)
* Back-off (TODO)

Expand Down Expand Up @@ -382,9 +391,115 @@ defmodule Broadway do
order. Those issues happens regardless of Broadway and solutions
to said problems almost always need to be addressed outside of
Broadway too.

## Chaining Pipelines

> Caution: Improper use of message forwarding can lead to
> degraded pipeline performance. Before reaching for
> chained pipelines as a method for logically separating
> your message processing workflow, carefully consider
> whether or not your use case could be handled with a
> single pipeline.

Some messaging patterns require more complex processing
configurations than what is provided by a single Broadway
pipeline. When the need arises, you can continue to
leverage the performance and availability features
provided by Broadway. Instead of relying on additional
remote queues, only to receive subsequent messages in
another Broadway pipeline, you may choose to locally
redirect a batch of messages to another pipeline before
acknowledging the messages.

For example, if you want to process messages individually,
perform some batch operation, then perform another CPU
intensive task per individual message, you can define one
data source and two Broadway pipelines. The first pipeline
receives messages from your data source and performs the
initial processing. Next, it fans out to its own batcher
processes to perform some batch operation. When the batch
operation completes, it fans in the processed messages to
the next pipeline in the chain. From there, the receiving
pipeline can choose to acknowledge the messages, or to
redirect them to another pipeline.

Let's look at an example. First, configure a Broadway
pipeline with a producer of your choice. In the
`c:handle_batch/4` callback, use `redirect/2` to dispatch
the message to the next pipeline, waiting the specified
amount of time per message for the receiving producer to
finish dispatching:

defmodule MyFirstPipeline do
# Receives the initial messages
use Broadway

def start_link(_opts) do
Broadway.start_link(MyFirstPipeline,
name: MyForwarder,
producer: [
module: {Counter, []},
stages: 1
],
processors: [
default: [stages: 2]
]
)
end

def handle_message(_processor, message, _context) do
# do some intensive processing and return the message
end

def handle_batch(_batch, messages, _batch_info, _context) do
# messages are dispatched to MyNextPipeline
# on timeout, the message will be failed
redirect(messages, to: MyNextPipeline, timeout: 15_000)
end
end

Next, configure the receiving pipeline using
`Broadway.LineProducer`. The LineProducer is a specific
type of producer that can receive messages from another
pipeline, and will return once the message has been
dispatched. This can lead to timeouts in the event that
there are no available consumers, so be sure to set the
appropriate number of producer and processor stages in
your proceeding pipeline(s).

defmodule MyNextPipeline do
# Receives messages from MyFirstPipeline
use Broadway

def start_link(_) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [module: {Broadway.LineProducer, []}],
processors: [default: []]
)
end

def handle_message(_processor, message, _context) do
# another round of intensive processing
end

def handle_batch(_batch, messages, _batch_info, _context) do
# return the batch of messages for acknowledgement
end
end

Note that any pipeline in the chain can be the terminal
pipeline for any message or messages it receives. Though
you must always return the entire batch of messages from
`c:handle_batch/4`, you can choose to redirect only a
portion of the messages processed in the current pipeline.
Messages returned to Broadway without being redirected
will be terminally acknowledged in the current pipeline,
while the terminal acknowledgement for redirected messages
will occur somewhere farther along the chain.
"""

alias Broadway.{BatchInfo, Message, Options, Server, Producer}
alias Broadway.{BatchInfo, Message, Options, Producer, Redirector, Server}

@doc """
Invoked to handle/process individual messages sent from a producer.
Expand Down Expand Up @@ -500,6 +615,7 @@ defmodule Broadway do
defmacro __using__(opts) do
quote location: :keep, bind_quoted: [opts: opts, module: __CALLER__.module] do
@behaviour Broadway
import Broadway, only: [redirect: 2]

@doc false
def child_spec(arg) do
Expand Down Expand Up @@ -799,6 +915,72 @@ defmodule Broadway do
ref
end

@doc """
Redirects a list of messages to another pipeline.

> Caution: Improper use of message routing can lead
> to degraded pipeline performance. See the
> "Chaining Pipelines" section of module documentation
> for more information.

This function can be used to dispatch a list of messages
from a `c:handle_batch/4` callback to producer stages
in the proceeding Broadway pipeline.

Once a message has been dispatched into the next pipeline,
this function will set a no-op acknowledger on the message
so that it is safe to be "acknowledged" at the end of the
pipeline that called `redirect/2`. This can be referred to
as a message's _effective acknowledgement_ in the calling
pipeline.

> Note: `redirect/2` only waits to return until the
> messages has been picked up by the proceeding pipeline,
> not until the pipeline has finished processing the
> messages.

Once the proceeding pipeline finishes processing the
message, assuming it does not redirect the message to
_yet another_ pipeline, the message will be acknowledged
as usual. This can be referred to as a message's
_terminal acknowledgement_ in the proceeding pipeline.
If the proceeding pipeline also calls `redirect/2`,
redirection proceeds ad infinitum until the message is
finally acknowledged.

> Failed messages and messages that have already been
> acknowledged, for instance via `ack_immediately/1`
> are never redirected.

## Receiving Redirected Messages

Chained pipelines should be configured to use
`Broadway.LineProducer`, a producer incorporating an
internal queue to hold messages until they can be
dispatched into the receiving pipeline. See the
"Chaining Pipelines" section in the module documentation
for more information.

The redirected messages are dispatched across all
available producers/stages in the receiving pipeline.

Returns the updated list of redirected messages.

## Options

`to` - The name of the Broadway pipeline where the
messages are to be dispatched.

`timeout` - Optional. The amount of time, per message,
to wait for the receiving pipeline to dispatch the
message. Default is `5000`ms.
"""
@spec redirect(messages :: [Message.t(), ...], options :: keyword) :: [Message.t(), ...]
def redirect(messages, options) when is_list(messages) and messages != [] do
broadway = options[:to] || raise ArgumentError, "expected :to option in redirect/2"
Redirector.redirect(broadway, messages, options)
end

defp configuration_spec() do
[
name: [required: true, type: :atom],
Expand Down
70 changes: 70 additions & 0 deletions lib/broadway/line_producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Broadway.LineProducer do
@moduledoc """
A GenStage producer that receives one message at a time
from an upstream pipeline.

In theatre, a Line Producer manages the budget of a
production, and generally only works on one production
at a time. `Broadway.LineProducer` plays a similar role,
as it budgets the time to dispatch messages redirected
to its pipeline, and it only accepts one new message at a
time.

This implementation will keep messages in an internal
queue until there is demand, leading to client timeouts
for slow consumers.

You can use the mental picture of a line of people
(messages) waiting for their turn to be dispatched.
Messages enter at the rear (tail) of the line and are
dispatched from the front (head) of the line.

See the "Chaining Pipelines" section in the `Broadway`
module documentation for more information.
"""
use GenStage
alias Broadway.Producer

@behaviour Producer

def start_link(opts \\ []) do
{server_opts, opts} = Keyword.split(opts, [:name])
GenStage.start_link(__MODULE__, opts, server_opts)
end

@doc """
Sends a `Broadway.Message` and returns only after the
message is dispatched.
"""
@spec dispatch(GenServer.server(), Message.t(), timeout :: GenServer.timeout()) :: :ok
def dispatch(line_producer, message, timeout \\ 5000) do
GenStage.call(line_producer, {__MODULE__, :dispatch, message}, timeout)
end

## Callbacks

@impl true
def init(_opts) do
{:producer, {:queue.new(), 0}}
end

@impl true
def handle_call({__MODULE__, :dispatch, message}, from, {queue, demand}) do
dispatch_messages(:queue.in({from, message}, queue), demand, [])
end

@impl true
def handle_demand(incoming_demand, {queue, demand}) do
dispatch_messages(queue, incoming_demand + demand, [])
end

defp dispatch_messages(queue, demand, messages) do
with d when d > 0 <- demand,
{{:value, {from, message}}, queue} <- :queue.out(queue) do
GenStage.reply(from, :ok)
dispatch_messages(queue, demand - 1, [message | messages])
else
_ -> {:noreply, Enum.reverse(messages), {queue, demand}}
end
end
end
70 changes: 70 additions & 0 deletions lib/broadway/redirector.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
defmodule Broadway.Redirector do
# Redirection logic for routing messages to a proceeding pipeline.
@moduledoc false
alias Broadway.{LineProducer, Message, NoopAcknowledger, Server}

@spec redirect(
server :: GenServer.server(),
batch :: [Message.t(), ...],
opts :: any
) :: [Message.t(), ...]
def redirect(server, messages, opts \\ []) when is_list(messages) and messages != [] do
# skip redirecting failed and/or acknowledged messages
{redirects, skipped} =
Enum.split_with(messages, fn
%Message{status: :ok, acknowledger: {acknowledger, _, _}}
when acknowledger != NoopAcknowledger ->
true

_ ->
false
end)

# dispatch messages to producer stages, returning results and messages
for {result, message} <- dispatch_and_return(server, redirects, opts) do
case result do
{:ok, message} ->
# set a no-op acknowledger for the return to `handle_batch/4`
%{message | acknowledger: {NoopAcknowledger, _ack_ref = nil, _ack_data = nil}}

{:exit, reason} ->
# failed dispatch fails the message
Message.failed(message, reason)
end
end ++ skipped
end

@doc false
@spec dispatch(Message.t(), GenServer.server(), GenServer.timeout()) :: Message.t() | no_return
def dispatch(message, server, timeout) do
producer = Enum.random(Server.producer_names(server))

if pid = Process.whereis(producer) do
_ =
LineProducer.dispatch(
pid,
# reset the batcher values before dispatching
%{message | batcher: :default, batch_key: :default},
timeout
)

message
else
Message.failed(message, {:redirect, server})
end
end

defp dispatch_and_return(server, messages, opts) do
timeout = opts[:timeout] || 5000
ordered = opts[:ordered] || false

messages
|> Task.async_stream(__MODULE__, :dispatch, [server, timeout],
max_concurrency: length(Server.producer_names(server)),
on_timeout: :kill_task,
ordered: ordered,
timeout: :infinity
)
|> Enum.zip(messages)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule Broadway.MixProject do
],
Producers: [
Broadway.DummyProducer,
Broadway.LineProducer,
Broadway.TermStorage
]
]
Expand Down
Loading