Skip to content

Commit

Permalink
Add format_discarded/2 (#343)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ziinc authored Nov 28, 2024
1 parent 8d29ca4 commit f043234
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
16 changes: 15 additions & 1 deletion lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,21 @@ defmodule Broadway do
@callback process_name(broadway_name :: Broadway.name(), base_name :: String.t()) ::
Broadway.name()

@optional_callbacks prepare_messages: 2, handle_batch: 4, handle_failed: 2, process_name: 2
@doc """
Invoked when items are discarded from the buffer. Gets passed to `GenStage.format_discarded/2`.
If true is returned by the callback, the default log message is emitted.
Allows controlling or customization of the log message emitted.
"""
@doc since: "1.2.0"
@callback format_discarded(discarded :: non_neg_integer(), state :: term()) :: boolean()

@optional_callbacks prepare_messages: 2,
handle_batch: 4,
handle_failed: 2,
process_name: 2,
format_discarded: 2

defguardp is_broadway_name(name)
when is_atom(name) or (is_tuple(name) and tuple_size(name) == 3)
Expand Down
11 changes: 11 additions & 0 deletions lib/broadway/topology/producer_stage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ defmodule Broadway.Topology.ProducerStage do
|> handle_no_reply(state)
end

@impl true
def format_discarded(discarded, state) do
%{module: module, module_state: module_state} = state

if function_exported?(module, :format_discarded, 2) do
module.format_discarded(discarded, module_state)
else
true
end
end

@impl true
def terminate(reason, %{module: module, module_state: module_state}) do
if function_exported?(module, :terminate, 2) do
Expand Down

0 comments on commit f043234

Please sign in to comment.