From 2fc85f80023a51995ebd3c458520e5dc50fd1798 Mon Sep 17 00:00:00 2001 From: yoser Date: Fri, 26 Feb 2021 10:38:28 -0500 Subject: [PATCH] added requeue --- lib/gen_amqp/server.ex | 31 ++++++++++++++++++++++++------- lib/gen_amqp/worker.ex | 9 +++++++-- mix.lock | 34 +++++++++++++++++----------------- test/support/test_servers.ex | 4 ++++ 4 files changed, 52 insertions(+), 26 deletions(-) diff --git a/lib/gen_amqp/server.ex b/lib/gen_amqp/server.ex index ac3389b..e4b8420 100644 --- a/lib/gen_amqp/server.ex +++ b/lib/gen_amqp/server.ex @@ -11,6 +11,7 @@ defmodule GenAMQP.Server do extra_args = Keyword.get(opts, :extra_args, []) before_funcs = Keyword.get(opts, :before, []) after_funcs = Keyword.get(opts, :after, []) + requeue = Keyword.get(opts, :requeue, false) quote do require Logger @@ -108,11 +109,11 @@ defmodule GenAMQP.Server do {conn_name, conn_pid, false} end - def on_message( - payload, - meta, - %{conn_name: conn_name, chan: chan, pool_name: pool_name} = state - ) do + defp on_message( + payload, + meta, + %{conn_name: conn_name, chan: chan, pool_name: pool_name} = state + ) do data = %{ event: unquote(event), exec_module: @exec_module, @@ -128,8 +129,24 @@ defmodule GenAMQP.Server do :poolboy.transaction( pool_name, fn pid -> - AMQP.Basic.ack(chan, meta.delivery_tag) - GenServer.call(pid, {:do_work, data}, :infinity) + try do + case GenServer.call(pid, {:do_work, data}, :infinity) do + {:ok, _} -> + AMQP.Basic.ack(chan, meta.delivery_tag) + + {:error, _} -> + if unquote(requeue) do + AMQP.Basic.reject(chan, meta.delivery_tag, requeue: false) + else + AMQP.Basic.ack(chan, meta.delivery_tag) + end + end + rescue + e -> + if unquote(requeue) do + AMQP.Basic.reject(chan, meta.delivery_tag, requeue: not meta.redelivered) + end + end end ) end) diff --git a/lib/gen_amqp/worker.ex b/lib/gen_amqp/worker.ex index 166cbe9..9ed145f 100644 --- a/lib/gen_amqp/worker.ex +++ b/lib/gen_amqp/worker.ex @@ -11,8 +11,7 @@ defmodule GenAMQP.PoolWorker do end def handle_call({:do_work, data}, _from, state) do - work(data) - {:reply, nil, state} + {:reply, work(data), state} end defp work(%{ @@ -78,6 +77,8 @@ defmodule GenAMQP.PoolWorker do if reply? do reply(chan, meta, resp) end + + validation_error([resp]) end defp reply( @@ -107,6 +108,10 @@ defmodule GenAMQP.PoolWorker do sol end + @spec validation_error(any()) :: {:ok, any()} | {:error, any()} + defp validation_error(args), + do: apply(error_handler(), :validate_response, args) + defp reduce_with_funcs(funcs, event, payload) do Enum.reduce(funcs, payload, fn f, acc -> f.(event, acc) diff --git a/mix.lock b/mix.lock index 210acae..bd16d51 100644 --- a/mix.lock +++ b/mix.lock @@ -1,19 +1,19 @@ %{ - "amqp": {:hex, :amqp, "1.1.1", "e902a8e112c9c476995440c4beac867f054dc531de208adc01aa411dd07754a2", [:mix], [{:amqp_client, "~> 3.7.11", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.9", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "~> 2.3.6", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, - "amqp_client": {:hex, :amqp_client, "3.7.12", "5accc1ef354e19b8200d48c72d2f68d6327672b8c8cbd1dcd1556264f348dafd", [:make, :rebar3], [{:rabbit_common, "3.7.12", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm"}, - "earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm"}, - "elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, - "gen_debug": {:hex, :gen_debug, "0.2.0", "747c557214792be7aacb4e2c40e3415cbc55ee9e7577eaef528edd27230d2890", [:mix], [], "hexpm"}, - "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm"}, - "jsx": {:hex, :jsx, "2.9.0", "d2f6e5f069c00266cad52fb15d87c428579ea4d7d73a33669e12679e203329dd", [:mix, :rebar3], [], "hexpm"}, - "lager": {:hex, :lager, "3.6.5", "831910109f3fcb503debf658ca0538836b348c58bfbf349a6d48228096ce9040", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"}, - "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm"}, - "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm"}, - "rabbit_common": {:hex, :rabbit_common, "3.7.12", "26b8c10bf8b7064fdf44792f6b2d8de79f5e1bd6b36c4b1c318fb3953c1e5d86", [:make, :rebar3], [{:jsx, "2.9.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.3.6", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm"}, - "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, - "recon": {:hex, :recon, "2.3.6", "2bcad0cf621fb277cabbb6413159cd3aa30265c2dee42c968697988b30108604", [:rebar3], [], "hexpm"}, + "amqp": {:hex, :amqp, "1.2.2", "6ec277327a617bf9405ea4b61cfd43e09f0aee0ebf943edfd6f560416855d407", [:mix], [{:amqp_client, "~> 3.7.11", [hex: :amqp_client, repo: "hexpm", optional: false]}, {:goldrush, "~> 0.1.0", [hex: :goldrush, repo: "hexpm", optional: false]}, {:jsx, "~> 2.9", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "~> 3.6.5", [hex: :lager, repo: "hexpm", optional: false]}, {:rabbit_common, "~> 3.7.11", [hex: :rabbit_common, repo: "hexpm", optional: false]}, {:ranch, "~> 1.7", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "~> 2.3", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "e4158dc2014cc76cba10c257092c6a766218e40b67a4280b9cc825896284c299"}, + "amqp_client": {:hex, :amqp_client, "3.7.17", "42456b610333e1206399f80dbf0941cf1d583e7663dac323cdc6789374b78e33", [:make, :rebar3], [{:rabbit_common, "3.7.17", [hex: :rabbit_common, repo: "hexpm", optional: false]}], "hexpm", "6bfdc22fedd87894c9d2c6ff8bb9727546096d5ed062503c796708c3dd2be0d3"}, + "earmark": {:hex, :earmark, "1.3.1", "73812f447f7a42358d3ba79283cfa3075a7580a3a2ed457616d6517ac3738cb9", [:mix], [], "hexpm", "000aaeff08919e95e7aea13e4af7b2b9734577b3e6a7c50ee31ee88cab6ec4fb"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.0", "ff26e938f95830b1db152cb6e594d711c10c02c6391236900ddd070a6b01271d", [:mix], [], "hexpm", "e4d6e26434471761ed45a3545239da87af7b70904dd4442a55f87d06b137c56b"}, + "ex_doc": {:hex, :ex_doc, "0.19.3", "3c7b0f02851f5fc13b040e8e925051452e41248f685e40250d7e40b07b9f8c10", [:mix], [{:earmark, "~> 1.2", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.10", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0e11d67e662142fc3945b0ee410c73c8c956717fbeae4ad954b418747c734973"}, + "gen_debug": {:hex, :gen_debug, "0.2.0", "747c557214792be7aacb4e2c40e3415cbc55ee9e7577eaef528edd27230d2890", [:mix], [], "hexpm", "03185766b32abc02fbecfdee264d06737bd447a0bb7148450240d622a20deca4"}, + "goldrush": {:hex, :goldrush, "0.1.9", "f06e5d5f1277da5c413e84d5a2924174182fb108dabb39d5ec548b27424cd106", [:rebar3], [], "hexpm", "99cb4128cffcb3227581e5d4d803d5413fa643f4eb96523f77d9e6937d994ceb"}, + "jsx": {:hex, :jsx, "2.9.0", "d2f6e5f069c00266cad52fb15d87c428579ea4d7d73a33669e12679e203329dd", [:mix, :rebar3], [], "hexpm", "8ee1db1cabafdd578a2776a6aaae87c2a8ce54b47b59e9ec7dab5d7eb71cd8dc"}, + "lager": {:hex, :lager, "3.6.10", "6172b43ab720ac33914ccd0aeb21fdbdf88213847707d4b91e6af57b2ae5c4d2", [:rebar3], [{:goldrush, "0.1.9", [hex: :goldrush, repo: "hexpm", optional: false]}], "hexpm", "5d10499461826b79c5abee18bb594b3949cbdf76d9d9fd7e66d0a558137c21c9"}, + "makeup": {:hex, :makeup, "0.8.0", "9cf32aea71c7fe0a4b2e9246c2c4978f9070257e5c9ce6d4a28ec450a839b55f", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5fbc8e549aa9afeea2847c0769e3970537ed302f93a23ac612602e805d9d1e7f"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.13.0", "be7a477997dcac2e48a9d695ec730b2d22418292675c75aa2d34ba0909dcdeda", [:mix], [{:makeup, "~> 0.8", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "adf0218695e22caeda2820eaba703fa46c91820d53813a2223413da3ef4ba515"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm", "5c040b8469c1ff1b10093d3186e2e10dbe483cd73d79ec017993fb3985b8a9b3"}, + "poison": {:hex, :poison, "3.1.0", "d9eb636610e096f86f25d9a46f35a9facac35609a7591b3be3326e99a0484665", [:mix], [], "hexpm", "fec8660eb7733ee4117b85f55799fd3833eb769a6df71ccf8903e8dc5447cfce"}, + "poolboy": {:hex, :poolboy, "1.5.2", "392b007a1693a64540cead79830443abf5762f5d30cf50bc95cb2c1aaafa006b", [:rebar3], [], "hexpm", "dad79704ce5440f3d5a3681c8590b9dc25d1a561e8f5a9c995281012860901e3"}, + "rabbit_common": {:hex, :rabbit_common, "3.7.17", "d5fc8e9d3ce080248109fd308f28e941ede6ee9d857a69fe5bbb8eb024b4a19d", [:make, :rebar3], [{:jsx, "2.9.0", [hex: :jsx, repo: "hexpm", optional: false]}, {:lager, "3.6.10", [hex: :lager, repo: "hexpm", optional: false]}, {:ranch, "1.7.1", [hex: :ranch, repo: "hexpm", optional: false]}, {:recon, "2.5.0", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "8ed9ed954c84ffe94d55d67d3d8815c4a47afcd1a4649dc179a4c0688f8ec162"}, + "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm", "451d8527787df716d99dc36162fca05934915db0b6141bbdac2ea8d3c7afc7d7"}, + "recon": {:hex, :recon, "2.5.0", "2f7fcbec2c35034bade2f9717f77059dc54eb4e929a3049ca7ba6775c0bd66cd", [:mix, :rebar3], [], "hexpm", "72f3840fedd94f06315c523f6cecf5b4827233bed7ae3fe135b2a0ebeab5e196"}, } diff --git a/test/support/test_servers.ex b/test/support/test_servers.ex index d627ebe..3875ebf 100644 --- a/test/support/test_servers.ex +++ b/test/support/test_servers.ex @@ -13,6 +13,10 @@ defmodule ErrorHandler do {:reply, resp} end + + def validate_response("ok"), do: {:ok, "ok"} + def validate_response("error"), do: {:error, "error"} + def validate_response("error" = resp), do: {:error, resp} end defmodule ServerDemo do