Skip to content

Commit

Permalink
feat: add :begin_offset option
Browse files Browse the repository at this point in the history
closes #129
  • Loading branch information
Daniel Marin Cabillas committed Aug 25, 2023
1 parent 731e30d commit e0655c4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 1 deletion.
12 changes: 12 additions & 0 deletions lib/broadway_kafka/brod_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ defmodule BroadwayKafka.BrodClient do

@default_offset_reset_policy :latest

@begin_offset_values [:assigned, :reset]

@default_begin_offset :assigned

@impl true
def init(opts) do
with {:ok, hosts} <- validate(opts, :hosts, required: true),
Expand All @@ -56,6 +60,8 @@ defmodule BroadwayKafka.BrodClient do
validate(opts, :offset_commit_on_ack, default: @default_offset_commit_on_ack),
{:ok, offset_reset_policy} <-
validate(opts, :offset_reset_policy, default: @default_offset_reset_policy),
{:ok, begin_offset} <-
validate(opts, :begin_offset, default: @default_begin_offset),
{:ok, group_config} <- validate_group_config(opts),
{:ok, fetch_config} <- validate_fetch_config(opts),
{:ok, client_config} <- validate_client_config(opts) do
Expand All @@ -68,6 +74,7 @@ defmodule BroadwayKafka.BrodClient do
reconnect_timeout: reconnect_timeout,
offset_commit_on_ack: offset_commit_on_ack,
offset_reset_policy: offset_reset_policy,
begin_offset: begin_offset,
group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config],
fetch_config: Map.new(fetch_config || []),
client_config: client_config
Expand Down Expand Up @@ -228,6 +235,11 @@ defmodule BroadwayKafka.BrodClient do
)
end

defp validate_option(:begin_offset, value)
when value not in @begin_offset_values do
validation_error(:begin_offset, "one of #{inspect(@begin_offset_values)}", value)
end

defp validate_option(:offset_commit_interval_seconds, value)
when not is_integer(value) or value < 1,
do: validation_error(:offset_commit_interval_seconds, "a positive integer", value)
Expand Down
14 changes: 13 additions & 1 deletion lib/broadway_kafka/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ defmodule BroadwayKafka.Producer do
offset in Kafka or if the current offset has expired. Possible values are `:earliest` or
`:latest`. Default is `:latest`.
* `:begin_offset` - Optional. Defines how to get the initial offset for the consumers.
The possible values are `:assigned` or `:reset`. When set to `:assigned` the starting offset will be the
ones returned in the kafka partition assignments (the lastest committed offsets for the consumer group).
When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either
starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`.
* `:group_config` - Optional. A list of options used to configure the group
coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available
options.
Expand Down Expand Up @@ -356,11 +362,17 @@ defmodule BroadwayKafka.Producer do
brod_received_assignment(
topic: topic,
partition: partition,
begin_offset: begin_offset
begin_offset: assigned_begin_offset
) = assignment

offset_reset_policy = state.config[:offset_reset_policy]

begin_offset =
case state.config[:begin_offset] do
:assigned -> assigned_begin_offset
:reset -> :undefined
end

offset =
state.client.resolve_offset(
topic,
Expand Down
15 changes: 15 additions & 0 deletions test/brod_client_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ defmodule BroadwayKafka.BrodClientTest do
assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(opts)
end

test ":begin_offset can be :assigned or :reset. Default is :assigned" do
assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(@opts)

opts = Keyword.put(@opts, :begin_offset, :an_atom)

assert BrodClient.init(opts) ==
{:error, "expected :begin_offset to be one of [:assigned, :reset], got: :an_atom"}

opts = Keyword.put(@opts, :begin_offset, :assigned)
assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(opts)

opts = Keyword.put(@opts, :begin_offset, :reset)
assert {:ok, %{begin_offset: :reset}} = BrodClient.init(opts)
end

test ":offset_commit_interval_seconds is an optional non-negative integer" do
opts = put_in(@opts, [:group_config, :offset_commit_interval_seconds], :an_atom)

Expand Down
1 change: 1 addition & 0 deletions test/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,7 @@ defmodule BroadwayKafka.ProducerTest do
reconnect_timeout: 10,
max_bytes: 10,
offset_commit_on_ack: false,
begin_offset: :assigned,
ack_raises_on_offset: ack_raises_on_offset
]},
concurrency: producers_concurrency
Expand Down

0 comments on commit e0655c4

Please sign in to comment.