diff --git a/lib/broadway_kafka/brod_client.ex b/lib/broadway_kafka/brod_client.ex index 3793fcb..d6ed97c 100644 --- a/lib/broadway_kafka/brod_client.ex +++ b/lib/broadway_kafka/brod_client.ex @@ -43,6 +43,10 @@ defmodule BroadwayKafka.BrodClient do @default_offset_reset_policy :latest + @begin_offset_values [:assigned, :reset] + + @default_begin_offset :assigned + @impl true def init(opts) do with {:ok, hosts} <- validate(opts, :hosts, required: true), @@ -56,6 +60,8 @@ defmodule BroadwayKafka.BrodClient do validate(opts, :offset_commit_on_ack, default: @default_offset_commit_on_ack), {:ok, offset_reset_policy} <- validate(opts, :offset_reset_policy, default: @default_offset_reset_policy), + {:ok, begin_offset} <- + validate(opts, :begin_offset, default: @default_begin_offset), {:ok, group_config} <- validate_group_config(opts), {:ok, fetch_config} <- validate_fetch_config(opts), {:ok, client_config} <- validate_client_config(opts) do @@ -68,6 +74,7 @@ defmodule BroadwayKafka.BrodClient do reconnect_timeout: reconnect_timeout, offset_commit_on_ack: offset_commit_on_ack, offset_reset_policy: offset_reset_policy, + begin_offset: begin_offset, group_config: [{:offset_commit_policy, @offset_commit_policy} | group_config], fetch_config: Map.new(fetch_config || []), client_config: client_config @@ -228,6 +235,11 @@ defmodule BroadwayKafka.BrodClient do ) end + defp validate_option(:begin_offset, value) + when value not in @begin_offset_values do + validation_error(:begin_offset, "one of #{inspect(@begin_offset_values)}", value) + end + defp validate_option(:offset_commit_interval_seconds, value) when not is_integer(value) or value < 1, do: validation_error(:offset_commit_interval_seconds, "a positive integer", value) diff --git a/lib/broadway_kafka/producer.ex b/lib/broadway_kafka/producer.ex index a4c8202..40e9728 100644 --- a/lib/broadway_kafka/producer.ex +++ b/lib/broadway_kafka/producer.ex @@ -43,6 +43,12 @@ defmodule BroadwayKafka.Producer do offset in Kafka or if the current offset has expired. Possible values are `:earliest` or `:latest`. Default is `:latest`. + * `:begin_offset` - Optional. Defines how to get the initial offset for the consumers. + The possible values are `:assigned` or `:reset`. When set to `:assigned` the starting offset will be the + ones returned in the kafka partition assignments (the lastest committed offsets for the consumer group). + When set to `:reset`, the starting offset will be dictated by the `:offset_reset_policy` option, either + starting from the `:earliest` or the `:latest` offsets of the topic. Default is `:assigned`. + * `:group_config` - Optional. A list of options used to configure the group coordinator. See the ["Group config options"](#module-group-config-options) section below for a list of all available options. @@ -356,11 +362,17 @@ defmodule BroadwayKafka.Producer do brod_received_assignment( topic: topic, partition: partition, - begin_offset: begin_offset + begin_offset: assigned_begin_offset ) = assignment offset_reset_policy = state.config[:offset_reset_policy] + begin_offset = + case state.config[:begin_offset] do + :assigned -> assigned_begin_offset + :reset -> :undefined + end + offset = state.client.resolve_offset( topic, diff --git a/test/brod_client_test.exs b/test/brod_client_test.exs index f6de4ba..83fad3d 100644 --- a/test/brod_client_test.exs +++ b/test/brod_client_test.exs @@ -124,6 +124,21 @@ defmodule BroadwayKafka.BrodClientTest do assert {:ok, %{offset_reset_policy: :latest}} = BrodClient.init(opts) end + test ":begin_offset can be :assigned or :reset. Default is :assigned" do + assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(@opts) + + opts = Keyword.put(@opts, :begin_offset, :an_atom) + + assert BrodClient.init(opts) == + {:error, "expected :begin_offset to be one of [:assigned, :reset], got: :an_atom"} + + opts = Keyword.put(@opts, :begin_offset, :assigned) + assert {:ok, %{begin_offset: :assigned}} = BrodClient.init(opts) + + opts = Keyword.put(@opts, :begin_offset, :reset) + assert {:ok, %{begin_offset: :reset}} = BrodClient.init(opts) + end + test ":offset_commit_interval_seconds is an optional non-negative integer" do opts = put_in(@opts, [:group_config, :offset_commit_interval_seconds], :an_atom) diff --git a/test/producer_test.exs b/test/producer_test.exs index 7f685b2..3db4b7b 100644 --- a/test/producer_test.exs +++ b/test/producer_test.exs @@ -631,6 +631,7 @@ defmodule BroadwayKafka.ProducerTest do reconnect_timeout: 10, max_bytes: 10, offset_commit_on_ack: false, + begin_offset: :assigned, ack_raises_on_offset: ack_raises_on_offset ]}, concurrency: producers_concurrency