Skip to content

Commit

Permalink
added requeue
Browse files Browse the repository at this point in the history
  • Loading branch information
jogeraca committed Feb 26, 2021
1 parent 604037a commit 2fc85f8
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 26 deletions.
31 changes: 24 additions & 7 deletions lib/gen_amqp/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule GenAMQP.Server do
extra_args = Keyword.get(opts, :extra_args, [])
before_funcs = Keyword.get(opts, :before, [])
after_funcs = Keyword.get(opts, :after, [])
requeue = Keyword.get(opts, :requeue, false)

quote do
require Logger
Expand Down Expand Up @@ -108,11 +109,11 @@ defmodule GenAMQP.Server do
{conn_name, conn_pid, false}
end

def on_message(
payload,
meta,
%{conn_name: conn_name, chan: chan, pool_name: pool_name} = state
) do
defp on_message(
payload,
meta,
%{conn_name: conn_name, chan: chan, pool_name: pool_name} = state
) do
data = %{
event: unquote(event),
exec_module: @exec_module,
Expand All @@ -128,8 +129,24 @@ defmodule GenAMQP.Server do
:poolboy.transaction(
pool_name,
fn pid ->
AMQP.Basic.ack(chan, meta.delivery_tag)
GenServer.call(pid, {:do_work, data}, :infinity)
try do
case GenServer.call(pid, {:do_work, data}, :infinity) do
{:ok, _} ->
AMQP.Basic.ack(chan, meta.delivery_tag)

{:error, _} ->
if unquote(requeue) do
AMQP.Basic.reject(chan, meta.delivery_tag, requeue: false)
else
AMQP.Basic.ack(chan, meta.delivery_tag)
end
end
rescue
e ->
if unquote(requeue) do
AMQP.Basic.reject(chan, meta.delivery_tag, requeue: not meta.redelivered)
end
end
end
)
end)
Expand Down
9 changes: 7 additions & 2 deletions lib/gen_amqp/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ defmodule GenAMQP.PoolWorker do
end

def handle_call({:do_work, data}, _from, state) do
work(data)
{:reply, nil, state}
{:reply, work(data), state}
end

defp work(%{
Expand Down Expand Up @@ -78,6 +77,8 @@ defmodule GenAMQP.PoolWorker do
if reply? do
reply(chan, meta, resp)
end

validation_error([resp])
end

defp reply(
Expand Down Expand Up @@ -107,6 +108,10 @@ defmodule GenAMQP.PoolWorker do
sol
end

@spec validation_error(any()) :: {:ok, any()} | {:error, any()}
defp validation_error(args),
do: apply(error_handler(), :validate_response, args)

defp reduce_with_funcs(funcs, event, payload) do
Enum.reduce(funcs, payload, fn f, acc ->
f.(event, acc)
Expand Down
34 changes: 17 additions & 17 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
%{
"amqp": {:hex, :amqp, "1.1.1", "e902a8e112c9c476995440c4beac867f054dc531de208adc01aa411dd07754a2", [:mix], [{:amqp_client, "~> 3.7.11", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.9", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.6", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"amqp_client": {:hex, :amqp_client, "3.7.12", "5accc1ef354e19b8200d48c72d2f68d6327672b8c8cbd1dcd1556264f348dafd", [:make, :rebar3], [{:rabbit_common, "3.7.12", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"},
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"gen_debug": {:hex, :gen_debug, "0.2.0", "747c557214792be7aacb4e2c40e3415cbc55ee9e7577eaef528edd27230d2890", [:mix], [], "hexpm"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"},
"jsx": {:hex, :jsx, "2.9.0", "d2f6e5f069c00266cad52fb15d87c428579ea4d7d73a33669e12679e203329dd", [:mix, :rebar3], [], "hexpm"},
"lager": {:hex, :lager, "3.6.5", "831910109f3fcb503debf658ca0538836b348c58bfbf349a6d48228096ce9040", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"},
"rabbit_common": {:hex, :rabbit_common, "3.7.12", "26b8c10bf8b7064fdf44792f6b2d8de79f5e1bd6b36c4b1c318fb3953c1e5d86", [:make, :rebar3], [{:jsx, "2.9.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.3.6", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"},
"recon": {:hex, :recon, "2.3.6", "2bcad0cf621fb277cabbb6413159cd3aa30265c2dee42c968697988b30108604", [:rebar3], [], "hexpm"},
"amqp": {:hex, :amqp, "1.2.2", "6ec277327a617bf9405ea4b61cfd43e09f0aee0ebf943edfd6f560416855d407", [:mix], [{:amqp_client, "~> 3.7.11", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.9", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "~> 2.3", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "e4158dc2014cc76cba10c257092c6a766218e40b67a4280b9cc825896284c299"},
"amqp_client": {:hex, :amqp_client, "3.7.17", "42456b610333e1206399f80dbf0941cf1d583e7663dac323cdc6789374b78e33", [:make, :rebar3], [{:rabbit_common, "3.7.17", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "6bfdc22fedd87894c9d2c6ff8bb9727546096d5ed062503c796708c3dd2be0d3"},
"earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm", "000aaeff08919e95e7aea13e4af7b2b9734577b3e6a7c50ee31ee88cab6ec4fb"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm", "e4d6e26434471761ed45a3545239da87af7b70904dd4442a55f87d06b137c56b"},
"ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0e11d67e662142fc3945b0ee410c73c8c956717fbeae4ad954b418747c734973"},
"gen_debug": {:hex, :gen_debug, "0.2.0", "747c557214792be7aacb4e2c40e3415cbc55ee9e7577eaef528edd27230d2890", [:mix], [], "hexpm", "03185766b32abc02fbecfdee264d06737bd447a0bb7148450240d622a20deca4"},
"goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"},
"jsx": {:hex, :jsx, "2.9.0", "d2f6e5f069c00266cad52fb15d87c428579ea4d7d73a33669e12679e203329dd", [:mix, :rebar3], [], "hexpm", "8ee1db1cabafdd578a2776a6aaae87c2a8ce54b47b59e9ec7dab5d7eb71cd8dc"},
"lager": {:hex, :lager, "3.6.10", "6172b43ab720ac33914ccd0aeb21fdbdf88213847707d4b91e6af57b2ae5c4d2", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "5d10499461826b79c5abee18bb594b3949cbdf76d9d9fd7e66d0a558137c21c9"},
"makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"},
"makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"},
"poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"},
"poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"},
"rabbit_common": {:hex, :rabbit_common, "3.7.17", "d5fc8e9d3ce080248109fd308f28e941ede6ee9d857a69fe5bbb8eb024b4a19d", [:make, :rebar3], [{:jsx, "2.9.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.10", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.5.0", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "8ed9ed954c84ffe94d55d67d3d8815c4a47afcd1a4649dc179a4c0688f8ec162"},
"ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"},
"recon": {:hex, :recon, "2.5.0", "2f7fcbec2c35034bade2f9717f77059dc54eb4e929a3049ca7ba6775c0bd66cd", [:mix, :rebar3], [], "hexpm", "72f3840fedd94f06315c523f6cecf5b4827233bed7ae3fe135b2a0ebeab5e196"},
}
4 changes: 4 additions & 0 deletions test/support/test_servers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ defmodule ErrorHandler do

{:reply, resp}
end

def validate_response("ok"), do: {:ok, "ok"}
def validate_response("error"), do: {:error, "error"}
def validate_response("error" = resp), do: {:error, resp}
end

defmodule ServerDemo do
Expand Down

0 comments on commit 2fc85f8

Please sign in to comment.