diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index ab19e84..e291eb3 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -69,22 +69,23 @@ defmodule BroadwayKafka.BrodClient do {:ok, group_config} <- validate_group_config(opts), {:ok, fetch_config} <- validate_fetch_config(opts), {:ok, client_config} <- validate_client_config(opts) do - {:ok, - %{ - hosts: parse_hosts(hosts), - group_id: group_id, - topics: topics, - receive_interval: receive_interval, - reconnect_timeout: reconnect_timeout, - offset_commit_on_ack: offset_commit_on_ack, - offset_reset_policy: offset_reset_policy, - begin_offset: begin_offset, - group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], - fetch_config: Map.new(fetch_config || []), - client_config: client_config, - shared_client: shared_client, - shared_client_id: build_shared_client_id(opts) - }} + config = %{ + hosts: parse_hosts(hosts), + group_id: group_id, + topics: topics, + receive_interval: receive_interval, + reconnect_timeout: reconnect_timeout, + offset_commit_on_ack: offset_commit_on_ack, + offset_reset_policy: offset_reset_policy, + begin_offset: begin_offset, + group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], + fetch_config: Map.new(fetch_config || []), + client_config: client_config, + shared_client: shared_client, + shared_client_id: build_shared_client_id(opts) + } + + {:ok, shared_client_child_spec(config), config} end end @@ -153,8 +154,9 @@ defmodule BroadwayKafka.BrodClient do end end - @impl true - def shared_client_child_spec(config) do + defp shared_client_child_spec(%{shared_client: false}), do: [] + + defp shared_client_child_spec(%{shared_client: true} = config) do [ %{ id: config.shared_client_id, diff --git a/lib/broadway_kafka/kafka_client.ex b/lib/broadway_kafka/kafka_client.ex index d458b44..dda9c3e 100644 --- a/lib/broadway_kafka/kafka_client.ex +++ b/lib/broadway_kafka/kafka_client.ex @@ -54,9 +54,4 @@ defmodule BroadwayKafka.KafkaClient do @callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok @callback connected?(:brod.client()) :: boolean @callback disconnect(:brod.client()) :: :ok - - @callback shared_client_child_spec(config()) :: [child_spec] - when child_spec: :supervisor.child_spec() | {module, any} | module - - @optional_callbacks shared_client_child_spec: 1 end diff --git a/lib/broadway_kafka/producer.ex b/lib/broadway_kafka/producer.ex index a676b55..0a42952 100644 --- a/lib/broadway_kafka/producer.ex +++ b/lib/broadway_kafka/producer.ex @@ -235,54 +235,48 @@ defmodule BroadwayKafka.Producer do def init(opts) do Process.flag(:trap_exit, true) - client = opts[:client] || BroadwayKafka.BrodClient + config = opts[:initialized_client_config] - case opts[:initialized_client_config] || client.init(opts) do - {:error, message} -> - raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message - - {:ok, config} -> - {_, producer_name} = Process.info(self(), :registered_name) + draining_after_revoke_flag = + self() + |> drain_after_revoke_table_name!() + |> drain_after_revoke_table_init!() - draining_after_revoke_flag = - self() - |> drain_after_revoke_table_name!() - |> drain_after_revoke_table_init!() + prefix = get_in(config, [:client_config, :client_id_prefix]) - prefix = get_in(config, [:client_config, :client_id_prefix]) + {_, producer_name} = Process.info(self(), :registered_name) - client_id = - config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}" + client_id = + config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}" - max_demand = - with [{_first, processor_opts}] <- opts[:broadway][:processors], - max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do - max_demand - else - _ -> 10 - end + max_demand = + with [{_first, processor_opts}] <- opts[:broadway][:processors], + max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do + max_demand + else + _ -> 10 + end - state = %{ - client: client, - client_id: client_id, - group_coordinator: nil, - receive_timer: nil, - receive_interval: config.receive_interval, - reconnect_timeout: config.reconnect_timeout, - acks: Acknowledger.new(), - config: config, - allocator_names: allocator_names(opts[:broadway]), - revoke_caller: nil, - draining_after_revoke_flag: draining_after_revoke_flag, - demand: 0, - shutting_down?: false, - buffer: :queue.new(), - max_demand: max_demand, - shared_client: config.shared_client - } + state = %{ + client: opts[:client] || BroadwayKafka.BrodClient, + client_id: client_id, + group_coordinator: nil, + receive_timer: nil, + receive_interval: config.receive_interval, + reconnect_timeout: config.reconnect_timeout, + acks: Acknowledger.new(), + config: config, + allocator_names: allocator_names(opts[:broadway]), + revoke_caller: nil, + draining_after_revoke_flag: draining_after_revoke_flag, + demand: 0, + shutting_down?: false, + buffer: :queue.new(), + max_demand: max_demand, + shared_client: config.shared_client + } - {:producer, connect(state)} - end + {:producer, connect(state)} end defp allocator_names(broadway_config) do @@ -518,27 +512,21 @@ defmodule BroadwayKafka.Producer do {producer_mod, producer_opts} = opts[:producer][:module] - {extra_child_specs, initialized_client_config} = - if producer_opts[:shared_client] do - client = producer_opts[:client] || BroadwayKafka.BrodClient + client = producer_opts[:client] || BroadwayKafka.BrodClient - case client.init(Keyword.put(producer_opts, :broadway, opts)) do - {:error, message} -> - raise ArgumentError, "invalid options given to #{client}.init/1, " <> message - - {:ok, config} = result -> - {client.shared_client_child_spec(config), result} - end - else - {[], nil} - end + case client.init(Keyword.put(producer_opts, :broadway, opts)) do + {:error, message} -> + raise ArgumentError, "invalid options given to #{client}.init/1, " <> message - new_producer_opts = - Keyword.put(producer_opts, :initialized_client_config, initialized_client_config) + {:ok, extra_child_specs, config} -> + new_producer_opts = + Keyword.put(producer_opts, :initialized_client_config, config) - updated_opts = put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts}) + updated_opts = + put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts}) - {allocators ++ extra_child_specs, updated_opts} + {allocators ++ extra_child_specs, updated_opts} + end end @impl :brod_group_member diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index ef5722e..121ad05 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -33,16 +33,16 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, expected_msg <> ~s/"host:9092,"/} opts = Keyword.put(@opts, :hosts, host: 9092) - assert {:ok, %{hosts: [host: 9092]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [host: 9092]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, [{"host", 9092}]) - assert {:ok, %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, "host:9092") - assert {:ok, %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host", 9092}]}} = BrodClient.init(opts) opts = Keyword.put(@opts, :hosts, "host1:9092,host2:9092") - assert {:ok, %{hosts: [{"host1", 9092}, {"host2", 9092}]}} = BrodClient.init(opts) + assert {:ok, [], %{hosts: [{"host1", 9092}, {"host2", 9092}]}} = BrodClient.init(opts) end test ":group_id is a required string" do @@ -55,7 +55,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :group_id to be a non empty string, got: :an_atom"} opts = Keyword.put(@opts, :group_id, "my_group") - assert {:ok, %{group_id: "my_group"}} = BrodClient.init(opts) + assert {:ok, [], %{group_id: "my_group"}} = BrodClient.init(opts) end test ":topics is a required list of strings" do @@ -68,12 +68,12 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :topics to be a list of strings, got: :an_atom"} opts = Keyword.put(@opts, :topics, ["topic_1", "topic_2"]) - assert {:ok, %{topics: ["topic_1", "topic_2"]}} = BrodClient.init(opts) + assert {:ok, [], %{topics: ["topic_1", "topic_2"]}} = BrodClient.init(opts) end test ":receive_interval is a non-negative integer with default value 2000" do opts = Keyword.delete(@opts, :receive_interval) - assert {:ok, %{receive_interval: 2000}} = BrodClient.init(opts) + assert {:ok, [], %{receive_interval: 2000}} = BrodClient.init(opts) opts = Keyword.put(@opts, :receive_interval, :an_atom) @@ -81,11 +81,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :receive_interval to be a non-negative integer, got: :an_atom"} opts = Keyword.put(@opts, :receive_interval, 1000) - assert {:ok, %{receive_interval: 1000}} = BrodClient.init(opts) + assert {:ok, [], %{receive_interval: 1000}} = BrodClient.init(opts) end test ":reconnect_timeout is a non-negative integer with default value 1000" do - assert {:ok, %{reconnect_timeout: 1000}} = BrodClient.init(@opts) + assert {:ok, [], %{reconnect_timeout: 1000}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :reconnect_timeout, :an_atom) @@ -93,11 +93,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :reconnect_timeout to be a non-negative integer, got: :an_atom"} opts = Keyword.put(@opts, :reconnect_timeout, 2000) - assert {:ok, %{reconnect_timeout: 2000}} = BrodClient.init(opts) + assert {:ok, [], %{reconnect_timeout: 2000}} = BrodClient.init(opts) end test ":offset_commit_on_ack is a boolean with default value true" do - assert {:ok, %{offset_commit_on_ack: true}} = BrodClient.init(@opts) + assert {:ok, [], %{offset_commit_on_ack: true}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :offset_commit_on_ack, :an_atom) @@ -105,11 +105,11 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :offset_commit_on_ack to be a boolean, got: :an_atom"} opts = Keyword.put(@opts, :offset_commit_on_ack, false) - assert {:ok, %{offset_commit_on_ack: false}} = BrodClient.init(opts) + assert {:ok, [], %{offset_commit_on_ack: false}} = BrodClient.init(opts) end test ":offset_reset_policy can be :earliest or :latest. Default is :latest" do - assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(@opts) + assert {:ok, [], %{offset_reset_policy: :latest}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :offset_reset_policy, :an_atom) @@ -118,14 +118,14 @@ defmodule BroadwayKafka.BrodClientTest do "expected :offset_reset_policy to be one of [:earliest, :latest], got: :an_atom"} opts = Keyword.put(@opts, :offset_reset_policy, :earliest) - assert {:ok, %{offset_reset_policy: :earliest}} = BrodClient.init(opts) + assert {:ok, [], %{offset_reset_policy: :earliest}} = BrodClient.init(opts) opts = Keyword.put(@opts, :offset_reset_policy, :latest) - assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(opts) + assert {:ok, [], %{offset_reset_policy: :latest}} = BrodClient.init(opts) end test ":begin_offset can be :assigned or :reset. Default is :assigned" do - assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(@opts) + assert {:ok, [], %{begin_offset: :assigned}} = BrodClient.init(@opts) opts = Keyword.put(@opts, :begin_offset, :an_atom) @@ -133,10 +133,10 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :begin_offset to be one of [:assigned, :reset], got: :an_atom"} opts = Keyword.put(@opts, :begin_offset, :assigned) - assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(opts) + assert {:ok, [], %{begin_offset: :assigned}} = BrodClient.init(opts) opts = Keyword.put(@opts, :begin_offset, :reset) - assert {:ok, %{begin_offset: :reset}} = BrodClient.init(opts) + assert {:ok, [], %{begin_offset: :reset}} = BrodClient.init(opts) end test ":offset_commit_interval_seconds is an optional non-negative integer" do @@ -148,7 +148,7 @@ defmodule BroadwayKafka.BrodClientTest do "a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :offset_commit_interval_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:offset_commit_interval_seconds] == 3 end @@ -160,7 +160,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :rejoin_delay_seconds to be a non-negative integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :rejoin_delay_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:rejoin_delay_seconds] == 3 end @@ -172,7 +172,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :session_timeout_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :session_timeout_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:session_timeout_seconds] == 3 end @@ -184,7 +184,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :heartbeat_rate_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :heartbeat_rate_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:heartbeat_rate_seconds] == 3 end @@ -196,7 +196,7 @@ defmodule BroadwayKafka.BrodClientTest do "expected :rebalance_timeout_seconds to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:group_config, :rebalance_timeout_seconds], 3) - {:ok, %{group_config: group_config}} = BrodClient.init(opts) + {:ok, [], %{group_config: group_config}} = BrodClient.init(opts) assert group_config[:rebalance_timeout_seconds] == 3 end @@ -207,7 +207,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :min_bytes to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:fetch_config, :min_bytes], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:min_bytes] == 3 end @@ -218,7 +218,7 @@ defmodule BroadwayKafka.BrodClientTest do {:error, "expected :max_bytes to be a positive integer, got: :an_atom"} opts = put_in(@opts, [:fetch_config, :max_bytes], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:max_bytes] == 3 end @@ -228,11 +228,11 @@ defmodule BroadwayKafka.BrodClientTest do assert BrodClient.init(opts) == {:error, "expected :max_wait_time to be a positive integer, got: :an_atom"} - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(@opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(@opts) assert not Map.has_key?(fetch_config, :max_wait_time) opts = put_in(@opts, [:fetch_config, :max_wait_time], 3) - {:ok, %{fetch_config: fetch_config}} = BrodClient.init(opts) + {:ok, [], %{fetch_config: fetch_config}} = BrodClient.init(opts) assert fetch_config[:max_wait_time] == 3 end @@ -244,7 +244,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :client_id_prefix], "a string") - assert {:ok, + assert {:ok, [], %{ client_config: [ client_id_prefix: "a string" @@ -267,7 +267,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :sasl], {:plain, "username", "password"}) - assert {:ok, + assert {:ok, [], %{ client_config: [ sasl: {:plain, "username", "password"} @@ -278,7 +278,7 @@ defmodule BroadwayKafka.BrodClientTest do test ":sasl is an optional tuple of :callback, SASL Authentication Plugin module and opts" do opts = put_in(@opts, [:client_config, :sasl], {:callback, FakeSaslMechanismPlugin, {}}) - assert {:ok, + assert {:ok, [], %{ client_config: [ sasl: {:callback, FakeSaslMechanismPlugin, {}} @@ -300,7 +300,7 @@ defmodule BroadwayKafka.BrodClientTest do certfile: "client.crt" ) - assert {:ok, + assert {:ok, [], %{ client_config: [ ssl: [cacertfile: "ca.crt", keyfile: "client.key", certfile: "client.crt"] @@ -309,7 +309,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :ssl], true) - assert {:ok, + assert {:ok, [], %{ client_config: [ssl: true] }} = BrodClient.init(opts) @@ -323,7 +323,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :connect_timeout], 5000) - assert {:ok, + assert {:ok, [], %{ client_config: [ connect_timeout: 5000 @@ -345,7 +345,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :request_timeout], 5000) - assert {:ok, + assert {:ok, [], %{ client_config: [ request_timeout: 5000 @@ -361,7 +361,7 @@ defmodule BroadwayKafka.BrodClientTest do opts = put_in(@opts, [:client_config, :query_api_versions], false) - assert {:ok, %{client_config: [query_api_versions: false]}} = BrodClient.init(opts) + assert {:ok, [], %{client_config: [query_api_versions: false]}} = BrodClient.init(opts) end test ":shared_client is an optional boolean" do @@ -376,7 +376,7 @@ defmodule BroadwayKafka.BrodClientTest do |> Keyword.put(:broadway, name: :my_broadway_name) |> put_in([:client_config, :client_id_prefix], "my_prefix.") - assert {:ok, %{shared_client: true}} = BrodClient.init(opts) + assert {:ok, _specs, %{shared_client: true}} = BrodClient.init(opts) end test "return shared_client_id when :shared_client is true" do @@ -386,20 +386,31 @@ defmodule BroadwayKafka.BrodClientTest do |> Keyword.put(:broadway, name: :my_broadway_name) |> put_in([:client_config, :client_id_prefix], "my_prefix.") - assert {:ok, + assert {:ok, child_specs, %{ shared_client: true, shared_client_id: :"my_prefix.Elixir.my_broadway_name.SharedClient" }} = BrodClient.init(opts) + assert [ + %{ + id: shared_client_id, + start: {:brod, :start_link_client, [hosts, shared_client_id, client_config]} + } + ] = child_specs + + assert [{:host, 9092}] = hosts + assert :"my_prefix.Elixir.my_broadway_name.SharedClient" = shared_client_id + assert [client_id_prefix: "my_prefix."] = client_config + opts = @opts |> Keyword.put(:shared_client, false) |> Keyword.put(:broadway, name: :my_broadway_name) |> put_in([:client_config, :client_id_prefix], "my_prefix.") - assert {:ok, + assert {:ok, [], %{ shared_client: false, shared_client_id: nil @@ -408,34 +419,6 @@ defmodule BroadwayKafka.BrodClientTest do end end - describe "shared_client_child_spec" do - test "should return child spec" do - module_opts = - @opts - |> Keyword.put(:shared_client, true) - |> Keyword.put(:client_config, client_id_prefix: "my_prefix.") - - broadway_opts = [ - name: :my_broadway - ] - - {:ok, config} = BrodClient.init(Keyword.put(module_opts, :broadway, broadway_opts)) - - assert child_specs = BrodClient.shared_client_child_spec(config) - - assert [ - %{ - id: shared_client_id, - start: {:brod, :start_link_client, [hosts, shared_client_id, client_config]} - } - ] = child_specs - - assert [{:host, 9092}] = hosts - assert :"my_prefix.Elixir.my_broadway.SharedClient" = shared_client_id - assert [client_id_prefix: "my_prefix."] = client_config - end - end - defmodule FakeSaslMechanismPlugin do @behaviour :kpro_auth_backend diff --git a/test/producer_test.exs b/test/producer_test.exs index 4cc9dd3..47f0192 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -43,7 +43,7 @@ defmodule BroadwayKafka.ProducerTest do defrecord :kafka_message, extract(:kafka_message, from_lib: "brod/include/brod.hrl") @impl true - def init(opts), do: {:ok, Map.new(opts)} + def init(opts), do: {:ok, opts[:child_specs], Map.new(opts)} @impl true def setup(_stage_pid, client_id, _callback_module, config) do @@ -119,20 +119,6 @@ defmodule BroadwayKafka.ProducerTest do def update_topics(_client_id, _topics) do :ok end - - @impl true - def shared_client_child_spec(config) do - [ - Supervisor.child_spec( - {Task, fn -> send(config.test_pid, :child_started_1) end}, - id: :child_started_1 - ), - Supervisor.child_spec( - {Task, fn -> send(config.test_pid, :child_started_2) end}, - id: :child_started_2 - ) - ] - end end defmodule Forwarder do @@ -255,9 +241,23 @@ defmodule BroadwayKafka.ProducerTest do stop_broadway(pid) end - test "start all child processes defined in shared_client_child_spec/1 callback" do + test "start all child process returned by config" do {:ok, message_server} = MessageServer.start_link() - {:ok, pid} = start_broadway(message_server, shared_client: true) + + parent_pid = self() + + child_specs = [ + Supervisor.child_spec( + {Task, fn -> send(parent_pid, :child_started_1) end}, + id: :child_started_1 + ), + Supervisor.child_spec( + {Task, fn -> send(parent_pid, :child_started_2) end}, + id: :child_started_2 + ) + ] + + {:ok, pid} = start_broadway(message_server, shared_client: true, child_specs: child_specs) assert_receive :child_started_1 assert_receive :child_started_2 @@ -688,7 +688,8 @@ defmodule BroadwayKafka.ProducerTest do offset_commit_on_ack: false, begin_offset: :assigned, ack_raises_on_offset: ack_raises_on_offset, - shared_client: opts[:shared_client] || false + shared_client: opts[:shared_client] || false, + child_specs: opts[:child_specs] || [] ]}, concurrency: producers_concurrency ],