From 4ea43020baaa8d4596189228d3b2f0d9337eacf2 Mon Sep 17 00:00:00 2001 From: Aaron Ross Date: Wed, 18 Dec 2024 13:13:50 -0800 Subject: [PATCH 1/2] add :retry_on_error config option This option will allow users to opt-in to a new behaviour, where the job's return value will be considered when determing whether to retry the job. Previously, only a `raise` would constitute a retry-able failure. With this new behaviour enabled, returning `:error` or `{:error, term()}` will also trigger a retry. --- lib/faktory_worker/job.ex | 36 ++++++++++++ lib/faktory_worker/worker.ex | 3 + lib/faktory_worker/worker/pool.ex | 4 +- lib/faktory_worker/worker/server.ex | 13 ++++- .../worker/server_integration_test.exs | 3 +- test/faktory_worker/worker/server_test.exs | 58 +++++++++++++++++++ 6 files changed, 113 insertions(+), 4 deletions(-) diff --git a/lib/faktory_worker/job.ex b/lib/faktory_worker/job.ex index 1988b71..c58a4a3 100644 --- a/lib/faktory_worker/job.ex +++ b/lib/faktory_worker/job.ex @@ -56,6 +56,42 @@ defmodule FaktoryWorker.Job do When defining `perform` functions, they must always accept one argument for each item in the list of values passed into `perform_async/2`. + + ## Retrying jobs on failure + + When a job fails, the Faktory server can enqueue it again to be retried. By default, a job is only considered + to have failed when it `raise`s an exception. + + ```elixir + def perform do + raise "retry me!" + end + ``` + + If you would like to consider the return value from `perform` and retry when an error is returned, you may enable + the `:retry_on_errors` config option. + + ```elixir + # config.exs + config :faktory_worker, retry_on_errors: true + + # job.ex + def perform do + case MyWorker.do_work() do + # when `retry_on_errors` is enabled, returning `:error` or `{:error, term()}` + # will retry the job + :error -> :error + {:error, reason} -> {:error, reason} + + # raising will still retry the job, as well + {:error, "fatal"} -> raise("oh no!") + + # returning anything else will be considered a success + :ok -> :ok + _ -> :something_else + end + end + ``` ## Synchronous job pushing diff --git a/lib/faktory_worker/worker.ex b/lib/faktory_worker/worker.ex index 8d0cb9e..2d7895b 100644 --- a/lib/faktory_worker/worker.ex +++ b/lib/faktory_worker/worker.ex @@ -18,6 +18,7 @@ defmodule FaktoryWorker.Worker do defstruct [ :conn_pid, :disable_fetch, + :retry_on_error, :fetch_ref, :process_wid, :worker_state, @@ -40,6 +41,7 @@ defmodule FaktoryWorker.Worker do process_wid = Keyword.fetch!(opts, :process_wid) retry_interval = Keyword.get(opts, :retry_interval, @five_seconds) disable_fetch = Keyword.get(opts, :disable_fetch) + retry_on_error = Keyword.get(opts, :retry_on_error) # Delay connection startup to stagger worker connections. Without this # all workers try to connect at the same time and it can't handle the load @@ -56,6 +58,7 @@ defmodule FaktoryWorker.Worker do %__MODULE__{ conn_pid: conn_pid, disable_fetch: disable_fetch, + retry_on_error: retry_on_error, process_wid: process_wid, worker_state: :ok, faktory_name: faktory_name, diff --git a/lib/faktory_worker/worker/pool.ex b/lib/faktory_worker/worker/pool.ex index 0009341..f1df5e6 100644 --- a/lib/faktory_worker/worker/pool.ex +++ b/lib/faktory_worker/worker/pool.ex @@ -23,13 +23,15 @@ defmodule FaktoryWorker.Worker.Pool do process_wid = Keyword.get(opts, :process_wid) pool_opts = Keyword.get(opts, :worker_pool, []) disable_fetch = Keyword.get(pool_opts, :disable_fetch, false) + retry_on_error = Keyword.get(opts, :retry_on_error, false) opts = [ name: :"worker_#{process_wid}_#{number}", faktory_name: Keyword.get(opts, :name), connection: connection_opts, process_wid: process_wid, - disable_fetch: disable_fetch + disable_fetch: disable_fetch, + retry_on_error: retry_on_error ] FaktoryWorker.Worker.Server.child_spec(opts) diff --git a/lib/faktory_worker/worker/server.ex b/lib/faktory_worker/worker/server.ex index 1957449..ccdc2df 100644 --- a/lib/faktory_worker/worker/server.ex +++ b/lib/faktory_worker/worker/server.ex @@ -57,9 +57,18 @@ defmodule FaktoryWorker.Worker.Server do {:noreply, state} end - def handle_info({job_ref, _}, %{job_ref: %{ref: job_ref}} = state) when is_reference(job_ref) do + def handle_info({job_ref, result}, %{job_ref: %{ref: job_ref}} = state) + when is_reference(job_ref) do Process.demonitor(job_ref, [:flush]) - state = Worker.ack_job(state, :ok) + + ack = + case {state.retry_on_error, result} do + {true, :error} -> {:error, "job returned :error"} + {true, {:error, reason}} -> {:error, reason} + _ -> :ok + end + + state = Worker.ack_job(state, ack) {:noreply, state} end diff --git a/test/faktory_worker/worker/server_integration_test.exs b/test/faktory_worker/worker/server_integration_test.exs index 4130ca9..a9a881c 100644 --- a/test/faktory_worker/worker/server_integration_test.exs +++ b/test/faktory_worker/worker/server_integration_test.exs @@ -14,7 +14,8 @@ defmodule FaktoryWorker.Worker.ServerIntegrationTest do opts = [ name: :test_worker_1, process_wid: Random.process_wid(), - disable_fetch: true + disable_fetch: true, + retry_on_error: false ] pid = start_supervised!(Server.child_spec(opts)) diff --git a/test/faktory_worker/worker/server_test.exs b/test/faktory_worker/worker/server_test.exs index 07545a2..de776d5 100644 --- a/test/faktory_worker/worker/server_test.exs +++ b/test/faktory_worker/worker/server_test.exs @@ -56,6 +56,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] @@ -179,6 +180,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] @@ -200,6 +202,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] @@ -238,6 +241,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] @@ -283,6 +287,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] @@ -301,6 +306,58 @@ defmodule FaktoryWorker.Worker.ServerTest do :ok = stop_supervised(:test_worker_1) end + + test "should send 'FAIL' command when job returns error" do + expect_failure = fn result, message -> + job_id = "f47ccc395ef9d9646118434f" + job_ref = :erlang.make_ref() + + fail_payload = %{ + jid: job_id, + errtype: "Undetected Error Type", + message: inspect(message), + backtrace: [] + } + + fail_command = "FAIL #{Jason.encode!(fail_payload)}\r\n" + + worker_connection_mox() + + expect(FaktoryWorker.SocketMock, :send, fn _, ^fail_command -> + :ok + end) + + expect(FaktoryWorker.SocketMock, :recv, fn _ -> + {:ok, "+OK\r\n"} + end) + + opts = [ + name: :test_worker_1, + process_wid: Random.process_wid(), + disable_fetch: true, + # enable retries when an error is returned + retry_on_error: true, + connection: [socket_handler: FaktoryWorker.SocketMock] + ] + + pid = start_supervised!(Server.child_spec(opts)) + + :sys.replace_state(pid, fn state -> + state + |> Map.put(:job_start, System.monotonic_time(:millisecond)) + |> Map.put(:job_ref, %{ref: job_ref}) + |> Map.put(:job_id, job_id) + |> Map.put(:job, %{"jid" => job_id}) + end) + + Process.send(pid, {job_ref, result}, []) + + :ok = stop_supervised(:test_worker_1) + end + + expect_failure.({:error, "oopsie!"}, "oopsie!") + expect_failure.(:error, "job returned :error") + end test "should send 'FAIL' command when a job times out" do worker_pool = [queues: ["timeout_queue"]] @@ -358,6 +415,7 @@ defmodule FaktoryWorker.Worker.ServerTest do name: :test_worker_1, process_wid: Random.process_wid(), disable_fetch: true, + retry_on_error: false, connection: [socket_handler: FaktoryWorker.SocketMock] ] From cd8d141ce978491bb05e106538d430b539bdad1c Mon Sep 17 00:00:00 2001 From: Aaron Ross Date: Wed, 18 Dec 2024 13:19:27 -0800 Subject: [PATCH 2/2] update docs --- docs/configuration.md | 3 +++ lib/faktory_worker/job.ex | 8 ++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ad92244..d33572b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -25,6 +25,7 @@ Below is the full structure of available options and a definition for each one. ```elixir [ name: atom(), + retry_on_error: boolean(), connection: [ host: String.t(), port: pos_integer(), @@ -49,6 +50,8 @@ Below is the full structure of available options and a definition for each one. - `name` (default: `FaktoryWorker`) - The name to use for this instance. This is useful if you want to run more than one instance of Faktory Worker. When using this make sure the workers are configured to use the correct instance (see [Worker Configuration](#worker-configuration) below). +- `retry_on_error` (default: `false`) - When disabled, only consider jobs that `raise` as failed and retry them. When enabled, also consider the return value from `perform` callbacks when determining whether or not to retry a job. A return of `:error` or `{:error, term()}` will retry the job. + - `connection` - A list of options to configure the connection to Faktory. - `host` (default: `"localhost"`) - The host name used to connect to Faktory. diff --git a/lib/faktory_worker/job.ex b/lib/faktory_worker/job.ex index c58a4a3..d6a5ba2 100644 --- a/lib/faktory_worker/job.ex +++ b/lib/faktory_worker/job.ex @@ -69,17 +69,17 @@ defmodule FaktoryWorker.Job do ``` If you would like to consider the return value from `perform` and retry when an error is returned, you may enable - the `:retry_on_errors` config option. + the `:retry_on_error` config option. ```elixir # config.exs - config :faktory_worker, retry_on_errors: true + config :my_app, FaktoryWorker, retry_on_error: true # job.ex def perform do case MyWorker.do_work() do - # when `retry_on_errors` is enabled, returning `:error` or `{:error, term()}` - # will retry the job + # returning `:error` or `{:error, term()}` will retry the job + # when `:retry_on_error` is set to `true` :error -> :error {:error, reason} -> {:error, reason}