diff --git a/lib/broadway.ex b/lib/broadway.ex index 21bf100..b39073a 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -975,7 +975,21 @@ defmodule Broadway do @callback process_name(broadway_name :: Broadway.name(), base_name :: String.t()) :: Broadway.name() - @optional_callbacks prepare_messages: 2, handle_batch: 4, handle_failed: 2, process_name: 2 + @doc """ + Invoked when items are discarded from the buffer. Gets passed to `GenStage.format_discarded/2`. + + If true is returned by the callback, the default log message is emitted. + + Allows controlling or customization of the log message emitted. + """ + @doc since: "1.2.0" + @callback format_discarded(discarded :: non_neg_integer(), state :: term()) :: boolean() + + @optional_callbacks prepare_messages: 2, + handle_batch: 4, + handle_failed: 2, + process_name: 2, + format_discarded: 2 defguardp is_broadway_name(name) when is_atom(name) or (is_tuple(name) and tuple_size(name) == 3) diff --git a/lib/broadway/topology/producer_stage.ex b/lib/broadway/topology/producer_stage.ex index d03f1f6..3b74231 100644 --- a/lib/broadway/topology/producer_stage.ex +++ b/lib/broadway/topology/producer_stage.ex @@ -224,6 +224,17 @@ defmodule Broadway.Topology.ProducerStage do |> handle_no_reply(state) end + @impl true + def format_discarded(discarded, state) do + %{module: module, module_state: module_state} = state + + if function_exported?(module, :format_discarded, 2) do + module.format_discarded(discarded, module_state) + else + true + end + end + @impl true def terminate(reason, %{module: module, module_state: module_state}) do if function_exported?(module, :terminate, 2) do