Skip to content

Commit

Permalink
Implementiere Snooze-Verhalten für Event Listener.
Browse files Browse the repository at this point in the history
Co-authored-by: Kai Kuchenbecker <[email protected]>
Co-authored-by: Silvan Buedenbender <[email protected]>
  • Loading branch information
3 people committed May 7, 2024
1 parent b483d7d commit 93b6325
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
22 changes: 19 additions & 3 deletions lib/event_store/event_store_listener.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ defmodule Shared.EventStoreListener do
error_context :: error_context()
) ::
{:retry, error_context :: error_context()}
| {:retry, delay :: non_neg_integer(), error_context :: error_context()}
| {:snooze, delay :: non_neg_integer()}
| :skip
| {:stop, reason :: term()}

Expand All @@ -68,7 +68,7 @@ defmodule Shared.EventStoreListener do
error_context :: error_context()
) ::
{:retry, error_context :: error_context()}
| {:retry, delay :: non_neg_integer(), error_context :: error_context()}
| {:snooze, delay :: non_neg_integer()}
| :skip
| {:stop, reason :: term()}

Expand Down Expand Up @@ -240,6 +240,14 @@ defmodule Shared.EventStoreListener do
%RecordedEvent{data: domain_event, metadata: metadata} = event

case handler_module.on_error(error, stacktrace, domain_event, metadata, context) do
{:snooze, delay} when is_integer(delay) ->
Logger.info(fn ->
"Snoozing #{inspect(name)} for #{delay}ms while processing event #{inspect(event)}. Reason: #{format_error(error, stacktrace)}"
end)

Process.sleep(delay)
handle_event(event, state, context)

{:retry, %ErrorContext{} = context} ->
context = ErrorContext.record_error(context)

Expand All @@ -253,7 +261,7 @@ defmodule Shared.EventStoreListener do
handle_event(event, state, context)
else
reason =
"#{name} is dying due to bad event after #{ErrorContext.retry_count(context)} retries #{inspect(error)}, Stacktrace: #{inspect(stacktrace)}"
"#{name} is dying due to bad event after #{ErrorContext.retry_count(context)} retries #{format_error(error)}. Stacktrace: #{inspect(stacktrace)}"

Logger.warning(reason)

Expand Down Expand Up @@ -309,4 +317,12 @@ defmodule Shared.EventStoreListener do
end

defp valid_subscription_key?(_), do: false

defp format_error(error, stacktrace \\ [])

defp format_error({:error, reason}, stacktrace) when is_exception(reason),
do: Exception.format(:error, reason, stacktrace)

defp format_error({:error, reason}, _stacktrace), do: inspect(reason)
defp format_error(error, _stacktrace), do: inspect(error)
end
36 changes: 35 additions & 1 deletion lib/event_store/event_store_listener_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
defmodule Shared.EventStoreListenerTest do
use Support.EventStoreCase, async: false
import ExUnit.CaptureLog
import Mock

@event %Shared.EventTest.FakeEvent{}

Expand Down Expand Up @@ -41,7 +42,7 @@ defmodule Shared.EventStoreListenerTest do

setup do
old_log_level = Logger.level()
Logger.configure(level: :warning)
Logger.configure(level: :info)

{:ok, _pid} = Counter.start_link(0)

Expand Down Expand Up @@ -141,6 +142,39 @@ defmodule Shared.EventStoreListenerTest do
assert logs =~ "ExampleConsumerWithCustomConfig is retrying (2/2)"
assert logs =~ "is dying due to bad event after 2 retries"
end

test "allows to snooze on error" do
defmodule SnoozingConsumer do
use Shared.EventStoreListener,
subscription_key: "snoozing_consumer",
event_store: JehovakelEx.EventStore

@impl true
def handle(_event, _meta) do
raise RuntimeError, "Please Snooze"
end

@impl true
def on_error({:error, %RuntimeError{message: "Please Snooze"}}, _, _, _, _) do
{:snooze, 37}
end
end

start_supervised!(SnoozingConsumer)

test_process = self()

logs =
capture_log(fn ->
with_mock Process, [:passthrough],
sleep: fn snooze_time -> send(test_process, {:snoozing, snooze_time}) end do
{:ok, _events} = JehovakelEx.EventStore.append_event(@event, %{})
assert_receive {:snoozing, 37}
end
end)

assert logs =~ "Snoozing Shared.EventStoreListenerTest.SnoozingConsumer for 37ms"
end
end

test "Log Stacktrace on failing to handle exception during event handling" do
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ defmodule Shared.MixProject do
{:ecto, "~> 3.0", optional: true},
{:ecto_sql, "~> 3.0", optional: true},
# {:jehovakel_ex_ecto, ">= 0.0.0", optional: true, in_umbrella: true},
{:excoveralls, ">= 0.10.5", only: :test}
{:excoveralls, ">= 0.10.5", only: :test},
{:mock, "~> 0.3.0", only: :test}
]
end

Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
"highlander": {:hex, :highlander, "0.2.1", "e59b459f857e89daf73f2598bf2b2c0479a435481e6101ea389fd3625919b052", [:mix], [], "hexpm", "5ba19a18358803d82a923511acec8ee85fac30731c5ca056f2f934bc3d3afd9a"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"meck": {:hex, :meck, "0.9.2", "85ccbab053f1db86c7ca240e9fc718170ee5bda03810a6292b5306bf31bae5f5", [:rebar3], [], "hexpm", "81344f561357dc40a8344afa53767c32669153355b626ea9fcbc8da6b3045826"},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"},
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mock": {:hex, :mock, "0.3.8", "7046a306b71db2488ef54395eeb74df0a7f335a7caca4a3d3875d1fc81c884dd", [:mix], [{:meck, "~> 0.9.2", [hex: :meck, repo: "hexpm", optional: false]}], "hexpm", "7fa82364c97617d79bb7d15571193fc0c4fe5afd0c932cef09426b3ee6fe2022"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"postgrex": {:hex, :postgrex, "0.17.5", "0483d054938a8dc069b21bdd636bf56c487404c241ce6c319c1f43588246b281", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "50b8b11afbb2c4095a3ba675b4f055c416d0f3d7de6633a595fc131a828a67eb"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.6", "cf344f5692c82d2cd7554f5ec8fd961548d4fd09e7d22f5b62482e5aeaebd4b0", [:make, :mix, :rebar3], [], "hexpm", "bdb0d2471f453c88ff3908e7686f86f9be327d065cc1ec16fa4540197ea04680"},
Expand Down

0 comments on commit 93b6325

Please sign in to comment.