Skip to content

Commit

Permalink
Merge pull request #211 from superhawk610/feat/retry-on-error
Browse files Browse the repository at this point in the history
feat: add `:retry_on_error` config option
  • Loading branch information
Ch4s3 authored Dec 19, 2024
2 parents 92ab76d + cd8d141 commit 6e98bce
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 4 deletions.
3 changes: 3 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ Below is the full structure of available options and a definition for each one.
```elixir
[
name: atom(),
retry_on_error: boolean(),
connection: [
host: String.t(),
port: pos_integer(),
Expand All @@ -49,6 +50,8 @@ Below is the full structure of available options and a definition for each one.

- `name` (default: `FaktoryWorker`) - The name to use for this instance. This is useful if you want to run more than one instance of Faktory Worker. When using this make sure the workers are configured to use the correct instance (see [Worker Configuration](#worker-configuration) below).

- `retry_on_error` (default: `false`) - When disabled, only consider jobs that `raise` as failed and retry them. When enabled, also consider the return value from `perform` callbacks when determining whether or not to retry a job. A return of `:error` or `{:error, term()}` will retry the job.

- `connection` - A list of options to configure the connection to Faktory.

- `host` (default: `"localhost"`) - The host name used to connect to Faktory.
Expand Down
36 changes: 36 additions & 0 deletions lib/faktory_worker/job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,42 @@ defmodule FaktoryWorker.Job do
When defining `perform` functions, they must always accept one argument for each item in the list of values passed into
`perform_async/2`.
## Retrying jobs on failure
When a job fails, the Faktory server can enqueue it again to be retried. By default, a job is only considered
to have failed when it `raise`s an exception.
```elixir
def perform do
raise "retry me!"
end
```
If you would like to consider the return value from `perform` and retry when an error is returned, you may enable
the `:retry_on_error` config option.
```elixir
# config.exs
config :my_app, FaktoryWorker, retry_on_error: true
# job.ex
def perform do
case MyWorker.do_work() do
# returning `:error` or `{:error, term()}` will retry the job
# when `:retry_on_error` is set to `true`
:error -> :error
{:error, reason} -> {:error, reason}
# raising will still retry the job, as well
{:error, "fatal"} -> raise("oh no!")
# returning anything else will be considered a success
:ok -> :ok
_ -> :something_else
end
end
```
## Synchronous job pushing
Expand Down
3 changes: 3 additions & 0 deletions lib/faktory_worker/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ defmodule FaktoryWorker.Worker do
defstruct [
:conn_pid,
:disable_fetch,
:retry_on_error,
:fetch_ref,
:process_wid,
:worker_state,
Expand All @@ -40,6 +41,7 @@ defmodule FaktoryWorker.Worker do
process_wid = Keyword.fetch!(opts, :process_wid)
retry_interval = Keyword.get(opts, :retry_interval, @five_seconds)
disable_fetch = Keyword.get(opts, :disable_fetch)
retry_on_error = Keyword.get(opts, :retry_on_error)

# Delay connection startup to stagger worker connections. Without this
# all workers try to connect at the same time and it can't handle the load
Expand All @@ -56,6 +58,7 @@ defmodule FaktoryWorker.Worker do
%__MODULE__{
conn_pid: conn_pid,
disable_fetch: disable_fetch,
retry_on_error: retry_on_error,
process_wid: process_wid,
worker_state: :ok,
faktory_name: faktory_name,
Expand Down
4 changes: 3 additions & 1 deletion lib/faktory_worker/worker/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ defmodule FaktoryWorker.Worker.Pool do
process_wid = Keyword.get(opts, :process_wid)
pool_opts = Keyword.get(opts, :worker_pool, [])
disable_fetch = Keyword.get(pool_opts, :disable_fetch, false)
retry_on_error = Keyword.get(opts, :retry_on_error, false)

opts = [
name: :"worker_#{process_wid}_#{number}",
faktory_name: Keyword.get(opts, :name),
connection: connection_opts,
process_wid: process_wid,
disable_fetch: disable_fetch
disable_fetch: disable_fetch,
retry_on_error: retry_on_error
]

FaktoryWorker.Worker.Server.child_spec(opts)
Expand Down
13 changes: 11 additions & 2 deletions lib/faktory_worker/worker/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,18 @@ defmodule FaktoryWorker.Worker.Server do
{:noreply, state}
end

def handle_info({job_ref, _}, %{job_ref: %{ref: job_ref}} = state) when is_reference(job_ref) do
def handle_info({job_ref, result}, %{job_ref: %{ref: job_ref}} = state)
when is_reference(job_ref) do
Process.demonitor(job_ref, [:flush])
state = Worker.ack_job(state, :ok)

ack =
case {state.retry_on_error, result} do
{true, :error} -> {:error, "job returned :error"}
{true, {:error, reason}} -> {:error, reason}
_ -> :ok
end

state = Worker.ack_job(state, ack)
{:noreply, state}
end

Expand Down
3 changes: 2 additions & 1 deletion test/faktory_worker/worker/server_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ defmodule FaktoryWorker.Worker.ServerIntegrationTest do
opts = [
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true
disable_fetch: true,
retry_on_error: false
]

pid = start_supervised!(Server.child_spec(opts))
Expand Down
58 changes: 58 additions & 0 deletions test/faktory_worker/worker/server_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand Down Expand Up @@ -179,6 +180,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand All @@ -200,6 +202,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand Down Expand Up @@ -238,6 +241,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand Down Expand Up @@ -283,6 +287,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand All @@ -301,6 +306,58 @@ defmodule FaktoryWorker.Worker.ServerTest do

:ok = stop_supervised(:test_worker_1)
end

test "should send 'FAIL' command when job returns error" do
expect_failure = fn result, message ->
job_id = "f47ccc395ef9d9646118434f"
job_ref = :erlang.make_ref()

fail_payload = %{
jid: job_id,
errtype: "Undetected Error Type",
message: inspect(message),
backtrace: []
}

fail_command = "FAIL #{Jason.encode!(fail_payload)}\r\n"

worker_connection_mox()

expect(FaktoryWorker.SocketMock, :send, fn _, ^fail_command ->
:ok
end)

expect(FaktoryWorker.SocketMock, :recv, fn _ ->
{:ok, "+OK\r\n"}
end)

opts = [
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
# enable retries when an error is returned
retry_on_error: true,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

pid = start_supervised!(Server.child_spec(opts))

:sys.replace_state(pid, fn state ->
state
|> Map.put(:job_start, System.monotonic_time(:millisecond))
|> Map.put(:job_ref, %{ref: job_ref})
|> Map.put(:job_id, job_id)
|> Map.put(:job, %{"jid" => job_id})
end)

Process.send(pid, {job_ref, result}, [])

:ok = stop_supervised(:test_worker_1)
end

expect_failure.({:error, "oopsie!"}, "oopsie!")
expect_failure.(:error, "job returned :error")
end

test "should send 'FAIL' command when a job times out" do
worker_pool = [queues: ["timeout_queue"]]
Expand Down Expand Up @@ -358,6 +415,7 @@ defmodule FaktoryWorker.Worker.ServerTest do
name: :test_worker_1,
process_wid: Random.process_wid(),
disable_fetch: true,
retry_on_error: false,
connection: [socket_handler: FaktoryWorker.SocketMock]
]

Expand Down

0 comments on commit 6e98bce

Please sign in to comment.