Skip to content

Commit

Permalink
chore: move child_specs to init and always init on prepare
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Oct 10, 2023
1 parent f8d8c64 commit 8a9cbcc
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 165 deletions.
38 changes: 20 additions & 18 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,23 @@ defmodule BroadwayKafka.BrodClient do
{:ok, group_config} <- validate_group_config(opts),
{:ok, fetch_config} <- validate_fetch_config(opts),
{:ok, client_config} <- validate_client_config(opts) do
{:ok,
%{
hosts: parse_hosts(hosts),
group_id: group_id,
topics: topics,
receive_interval: receive_interval,
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config,
shared_client: shared_client,
shared_client_id: build_shared_client_id(opts)
}}
config = %{
hosts: parse_hosts(hosts),
group_id: group_id,
topics: topics,
receive_interval: receive_interval,
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config,
shared_client: shared_client,
shared_client_id: build_shared_client_id(opts)
}

{:ok, shared_client_child_spec(config), config}
end
end

Expand Down Expand Up @@ -153,8 +154,9 @@ defmodule BroadwayKafka.BrodClient do
end
end

@impl true
def shared_client_child_spec(config) do
defp shared_client_child_spec(%{shared_client: false}), do: []

defp shared_client_child_spec(%{shared_client: true} = config) do
[
%{
id: config.shared_client_id,
Expand Down
5 changes: 0 additions & 5 deletions lib/broadway_kafka/kafka_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,4 @@ defmodule BroadwayKafka.KafkaClient do
@callback update_topics(:brod.group_coordinator(), [:brod.topic()]) :: :ok
@callback connected?(:brod.client()) :: boolean
@callback disconnect(:brod.client()) :: :ok

@callback shared_client_child_spec(config()) :: [child_spec]
when child_spec: :supervisor.child_spec() | {module, any} | module

@optional_callbacks shared_client_child_spec: 1
end
104 changes: 46 additions & 58 deletions lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -235,54 +235,48 @@ defmodule BroadwayKafka.Producer do
def init(opts) do
Process.flag(:trap_exit, true)

client = opts[:client] || BroadwayKafka.BrodClient
config = opts[:initialized_client_config]

case opts[:initialized_client_config] || client.init(opts) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{inspect(client)}.init/1, " <> message

{:ok, config} ->
{_, producer_name} = Process.info(self(), :registered_name)
draining_after_revoke_flag =
self()
|> drain_after_revoke_table_name!()
|> drain_after_revoke_table_init!()

draining_after_revoke_flag =
self()
|> drain_after_revoke_table_name!()
|> drain_after_revoke_table_init!()
prefix = get_in(config, [:client_config, :client_id_prefix])

prefix = get_in(config, [:client_config, :client_id_prefix])
{_, producer_name} = Process.info(self(), :registered_name)

client_id =
config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}"
client_id =
config[:shared_client_id] || :"#{prefix}#{Module.concat([producer_name, Client])}"

max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do
max_demand
else
_ -> 10
end
max_demand =
with [{_first, processor_opts}] <- opts[:broadway][:processors],
max_demand when is_integer(max_demand) <- processor_opts[:max_demand] do
max_demand
else
_ -> 10
end

state = %{
client: client,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand,
shared_client: config.shared_client
}
state = %{
client: opts[:client] || BroadwayKafka.BrodClient,
client_id: client_id,
group_coordinator: nil,
receive_timer: nil,
receive_interval: config.receive_interval,
reconnect_timeout: config.reconnect_timeout,
acks: Acknowledger.new(),
config: config,
allocator_names: allocator_names(opts[:broadway]),
revoke_caller: nil,
draining_after_revoke_flag: draining_after_revoke_flag,
demand: 0,
shutting_down?: false,
buffer: :queue.new(),
max_demand: max_demand,
shared_client: config.shared_client
}

{:producer, connect(state)}
end
{:producer, connect(state)}
end

defp allocator_names(broadway_config) do
Expand Down Expand Up @@ -518,27 +512,21 @@ defmodule BroadwayKafka.Producer do

{producer_mod, producer_opts} = opts[:producer][:module]

{extra_child_specs, initialized_client_config} =
if producer_opts[:shared_client] do
client = producer_opts[:client] || BroadwayKafka.BrodClient
client = producer_opts[:client] || BroadwayKafka.BrodClient

case client.init(Keyword.put(producer_opts, :broadway, opts)) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message

{:ok, config} = result ->
{client.shared_client_child_spec(config), result}
end
else
{[], nil}
end
case client.init(Keyword.put(producer_opts, :broadway, opts)) do
{:error, message} ->
raise ArgumentError, "invalid options given to #{client}.init/1, " <> message

new_producer_opts =
Keyword.put(producer_opts, :initialized_client_config, initialized_client_config)
{:ok, extra_child_specs, config} ->
new_producer_opts =
Keyword.put(producer_opts, :initialized_client_config, config)

updated_opts = put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts})
updated_opts =
put_in(updated_opts, [:producer, :module], {producer_mod, new_producer_opts})

{allocators ++ extra_child_specs, updated_opts}
{allocators ++ extra_child_specs, updated_opts}
end
end

@impl :brod_group_member
Expand Down
Loading

0 comments on commit 8a9cbcc

Please sign in to comment.