From ddc2b199136d4cf7af93b68ac96aa84192b9d6dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Fri, 29 Nov 2024 10:56:47 +0100 Subject: [PATCH] Use default env, update docs --- lib/broadway.ex | 51 +++++++------------ lib/broadway/config_storage.ex | 11 +--- lib/broadway/config_storage/ets.ex | 22 +++----- .../config_storage/persistent_term.ex | 10 ++-- lib/broadway/producer.ex | 7 +-- mix.exs | 3 +- test/broadway/config_storage_test.exs | 26 ++-------- 7 files changed, 41 insertions(+), 89 deletions(-) diff --git a/lib/broadway.ex b/lib/broadway.ex index 2eee487..5b6b6a6 100644 --- a/lib/broadway.ex +++ b/lib/broadway.ex @@ -609,55 +609,42 @@ defmodule Broadway do ## Configuration Storage - Broadway stores configuration globally in a chosen storage method. Broadway comes with two configuration storage options: + Broadway stores configuration globally in a chosen storage method. + Broadway comes with two configuration storage options: - `:persistent_term`, the default. - `:ets` - - ### Persistent Term - A `:persistent_term` backed configuration storage, which is the default storage option used. Configurations are not deleted when the Broadway server process goes down, so as to avoid a global GC. + This is the most efficient option for static Broadway pipeline definitions, + as this option never deletes the Broadway configuration from storage: ```elixir - config Broadway, config_storage: :persistent_term + config :broadway, config_storage: :persistent_term ``` + The speed of storing and updating using `:persistent_term` is proportional + to the number of already-created terms in the storage. If you are creating + several Broadway pipelines dynamically, that may affect the persistent term + storage performance. Furthermore, even if you are restarting the same pipeline + but you are using different parameters each time, that will require a global + GC to update the `:persistent_term` configuration. If you are starting Broadway + pipelines dynamically, you must use `:ets`. + ### ETS - An ETS-backed configuration storage. Only use this if performance improvements over the default `:persistent_term`-based storage is needed. + An ETS-backed configuration storage, useful if Broadway pipelines are + started dynamically. To use this configuration storage option, set your application config.exs as so: ```elixir - config Broadway, config_storage: :ets - ``` - - To pass options, use a tuple with a keyword list as so: - - ```elixir - config Broadway, - config_storage: :ets, - config_storage_opts: [ - table_name: :my_table - ] + config :broadway, config_storage: :ets ``` - Accepted options: - - `:table_name` - configure the table name. Defaults to `:broadway_configs`. - - #### Performance Improvements over `:persistent_term` - `:persistent_term` will trigger a global GC on each `put` or `erase`. For situations where there are a large number of dynamically created Broadway pipelines that are created or removed, this may result in the global GC being triggered multiple times. If there is a large number of processes, this may cause the system to be less responsive until all heaps have been scanned. - - As `Broadway.ConfigStorage.PersistentTerm` does not perform an erase when the Broadway server process goes down, it may result in memory buildup over time within the `:persistent_term` hash table, especially when dynamic names are used for the Broadway servers. - - Furthermore, the speed of storing and updating using `:persistent_term` is proportional to the number of already-created terms in the hash table, as the hash table (and term) is copied. - - Using `:ets` as the config storage will allow for a large number of Broadway server configurations to be stored and fetched without the associated performance tradeoffs that `:persistent_term` has. - - - - + Using `:ets` as the config storage will allow for a dynamic number of Broadway server + configurations to be stored and fetched without the associated performance tradeoffs + that `:persistent_term` has. ## Telemetry diff --git a/lib/broadway/config_storage.ex b/lib/broadway/config_storage.ex index a08f7cc..ec6a554 100644 --- a/lib/broadway/config_storage.ex +++ b/lib/broadway/config_storage.ex @@ -34,19 +34,10 @@ defmodule Broadway.ConfigStorage do """ @spec get_module() :: module() def get_module() do - Application.get_env(Broadway, :config_storage, :persistent_term) - |> case do + case Application.fetch_env!(:broadway, :config_storage) do :ets -> Ets :persistent_term -> PersistentTerm mod -> mod end end - - @doc """ - Retrieves any options set on the `:config_storage` key. - """ - @spec get_options() :: keyword() - def get_options() do - Application.get_env(Broadway, :config_storage_opts) || [] - end end diff --git a/lib/broadway/config_storage/ets.ex b/lib/broadway/config_storage/ets.ex index 0402f7d..04db2b2 100644 --- a/lib/broadway/config_storage/ets.ex +++ b/lib/broadway/config_storage/ets.ex @@ -1,13 +1,10 @@ defmodule Broadway.ConfigStorage.Ets do @moduledoc false - alias Broadway.ConfigStorage - @behaviour ConfigStorage + @behaviour Broadway.ConfigStorage - @default_table :broadway_configs + def table(), do: __MODULE__ - def default_table(), do: @default_table - - @impl ConfigStorage + @impl true def setup do if :undefined == :ets.whereis(table()) do :ets.new(table(), [:named_table, :public, :set, {:read_concurrency, true}]) @@ -16,12 +13,12 @@ defmodule Broadway.ConfigStorage.Ets do :ok end - @impl ConfigStorage + @impl true def list do :ets.select(table(), [{{:"$1", :_}, [], [:"$1"]}]) end - @impl ConfigStorage + @impl true def get(server) do case :ets.match(table(), {server, :"$1"}) do [[topology]] -> topology @@ -29,18 +26,13 @@ defmodule Broadway.ConfigStorage.Ets do end end - @impl ConfigStorage + @impl true def put(server, topology) do :ets.insert(table(), {server, topology}) end - @impl ConfigStorage + @impl true def delete(server) do :ets.delete(table(), server) end - - defp table() do - opts = ConfigStorage.get_options() - Keyword.get(opts, :table_name, @default_table) - end end diff --git a/lib/broadway/config_storage/persistent_term.ex b/lib/broadway/config_storage/persistent_term.ex index 345efc3..5dc9c4c 100644 --- a/lib/broadway/config_storage/persistent_term.ex +++ b/lib/broadway/config_storage/persistent_term.ex @@ -2,7 +2,7 @@ defmodule Broadway.ConfigStorage.PersistentTerm do @moduledoc false @behaviour Broadway.ConfigStorage - @impl Broadway.ConfigStorage + @impl true def setup do unless Code.ensure_loaded?(:persistent_term) do require Logger @@ -13,24 +13,24 @@ defmodule Broadway.ConfigStorage.PersistentTerm do :ok end - @impl Broadway.ConfigStorage + @impl true def list do for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get() do name end end - @impl Broadway.ConfigStorage + @impl true def get(server) do :persistent_term.get({Broadway, server}, nil) end - @impl Broadway.ConfigStorage + @impl true def put(server, topology) do :persistent_term.put({Broadway, server}, topology) end - @impl Broadway.ConfigStorage + @impl true def delete(_server) do # We don't delete from persistent term on purpose. Since the process is # named, we can assume it does not start dynamically, so it will either diff --git a/lib/broadway/producer.ex b/lib/broadway/producer.ex index 11f91a3..ac40e71 100644 --- a/lib/broadway/producer.ex +++ b/lib/broadway/producer.ex @@ -19,11 +19,8 @@ defmodule Broadway.Producer do If `options` is a keyword list, Broadway injects a `:broadway` option into such keyword list. This option contains the configuration for the - complete Broadway topology (see `Broadway.start_link/2`. For example, you can use - `options[:broadway][:name]` to uniquely identify the topology, - allowing you to write terms to things such as - [`:persistent_term`](https://erlang.org/doc/man/persistent_term.html) - or ETS tables. + complete Broadway topology (see `Broadway.start_link/2`. For example, + you can use `options[:broadway][:name]` to uniquely identify the topology. The `:broadway` configuration also has an `:index` key. This is the index of the producer in its supervision tree (starting diff --git a/mix.exs b/mix.exs index e191e94..0f929de 100644 --- a/mix.exs +++ b/mix.exs @@ -21,7 +21,8 @@ defmodule Broadway.MixProject do def application do [ - extra_applications: [:logger] + extra_applications: [:logger], + env: [config_storage: :persistent_term] ] end diff --git a/test/broadway/config_storage_test.exs b/test/broadway/config_storage_test.exs index 893aabd..e77f00d 100644 --- a/test/broadway/config_storage_test.exs +++ b/test/broadway/config_storage_test.exs @@ -3,38 +3,22 @@ defmodule Broadway.ConfigStorageTest do alias Broadway.ConfigStorage.Ets setup do - prev = Application.get_env(Broadway, :config_storage) - prev_opts = Application.get_env(Broadway, :config_storage_opts) + prev = Application.fetch_env!(:broadway, :config_storage) on_exit(fn -> - Application.put_env(Broadway, :config_storage, prev) - Application.put_env(Broadway, :config_storage_opts, prev_opts) + Application.put_env(:broadway, :config_storage, prev) end) end test "ets default options" do - Application.put_env(Broadway, :config_storage, :ets) + Application.put_env(:broadway, :config_storage, :ets) Ets.setup() assert [] = Ets.list() assert Ets.put("some name", %Broadway.Topology{}) assert ["some name"] = Ets.list() assert %Broadway.Topology{} = Ets.get("some name") - assert :ets.info(Ets.default_table(), :size) == 1 + assert :ets.info(Ets.table(), :size) == 1 Ets.delete("some name") - assert :ets.info(Ets.default_table(), :size) == 0 - end - - test "ets custom name" do - Application.put_env(Broadway, :config_storage, :ets) - Application.put_env(Broadway, :config_storage_opts, table_name: :my_table) - Ets.setup() - assert :ets.info(:my_table, :size) == 0 - assert [] = Ets.list() - assert Ets.put("some name", %Broadway.Topology{}) - assert ["some name"] = Ets.list() - assert %Broadway.Topology{} = Ets.get("some name") - assert :ets.info(:my_table, :size) == 1 - Ets.delete("some name") - assert :ets.info(:my_table, :size) == 0 + assert :ets.info(Ets.table(), :size) == 0 end end