Skip to content

Commit

Permalink
Use default env, update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
josevalim committed Nov 29, 2024
1 parent 1151db2 commit ddc2b19
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 89 deletions.
51 changes: 19 additions & 32 deletions lib/broadway.ex
Original file line number Diff line number Diff line change
Expand Up @@ -609,55 +609,42 @@ defmodule Broadway do
## Configuration Storage
Broadway stores configuration globally in a chosen storage method. Broadway comes with two configuration storage options:
Broadway stores configuration globally in a chosen storage method.
Broadway comes with two configuration storage options:
- `:persistent_term`, the default.
- `:ets`
### Persistent Term
A `:persistent_term` backed configuration storage, which is the default storage option used. Configurations are not deleted when the Broadway server process goes down, so as to avoid a global GC.
This is the most efficient option for static Broadway pipeline definitions,
as this option never deletes the Broadway configuration from storage:
```elixir
config Broadway, config_storage: :persistent_term
config :broadway, config_storage: :persistent_term
```
The speed of storing and updating using `:persistent_term` is proportional
to the number of already-created terms in the storage. If you are creating
several Broadway pipelines dynamically, that may affect the persistent term
storage performance. Furthermore, even if you are restarting the same pipeline
but you are using different parameters each time, that will require a global
GC to update the `:persistent_term` configuration. If you are starting Broadway
pipelines dynamically, you must use `:ets`.
### ETS
An ETS-backed configuration storage. Only use this if performance improvements over the default `:persistent_term`-based storage is needed.
An ETS-backed configuration storage, useful if Broadway pipelines are
started dynamically.
To use this configuration storage option, set your application config.exs as so:
```elixir
config Broadway, config_storage: :ets
```
To pass options, use a tuple with a keyword list as so:
```elixir
config Broadway,
config_storage: :ets,
config_storage_opts: [
table_name: :my_table
]
config :broadway, config_storage: :ets
```
Accepted options:
- `:table_name` - configure the table name. Defaults to `:broadway_configs`.
#### Performance Improvements over `:persistent_term`
`:persistent_term` will trigger a global GC on each `put` or `erase`. For situations where there are a large number of dynamically created Broadway pipelines that are created or removed, this may result in the global GC being triggered multiple times. If there is a large number of processes, this may cause the system to be less responsive until all heaps have been scanned.
As `Broadway.ConfigStorage.PersistentTerm` does not perform an erase when the Broadway server process goes down, it may result in memory buildup over time within the `:persistent_term` hash table, especially when dynamic names are used for the Broadway servers.
Furthermore, the speed of storing and updating using `:persistent_term` is proportional to the number of already-created terms in the hash table, as the hash table (and term) is copied.
Using `:ets` as the config storage will allow for a large number of Broadway server configurations to be stored and fetched without the associated performance tradeoffs that `:persistent_term` has.
Using `:ets` as the config storage will allow for a dynamic number of Broadway server
configurations to be stored and fetched without the associated performance tradeoffs
that `:persistent_term` has.
## Telemetry
Expand Down
11 changes: 1 addition & 10 deletions lib/broadway/config_storage.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,10 @@ defmodule Broadway.ConfigStorage do
"""
@spec get_module() :: module()
def get_module() do
Application.get_env(Broadway, :config_storage, :persistent_term)
|> case do
case Application.fetch_env!(:broadway, :config_storage) do
:ets -> Ets
:persistent_term -> PersistentTerm
mod -> mod
end
end

@doc """
Retrieves any options set on the `:config_storage` key.
"""
@spec get_options() :: keyword()
def get_options() do
Application.get_env(Broadway, :config_storage_opts) || []
end
end
22 changes: 7 additions & 15 deletions lib/broadway/config_storage/ets.ex
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
defmodule Broadway.ConfigStorage.Ets do
@moduledoc false
alias Broadway.ConfigStorage
@behaviour ConfigStorage
@behaviour Broadway.ConfigStorage

@default_table :broadway_configs
def table(), do: __MODULE__

def default_table(), do: @default_table

@impl ConfigStorage
@impl true
def setup do
if :undefined == :ets.whereis(table()) do
:ets.new(table(), [:named_table, :public, :set, {:read_concurrency, true}])
Expand All @@ -16,31 +13,26 @@ defmodule Broadway.ConfigStorage.Ets do
:ok
end

@impl ConfigStorage
@impl true
def list do
:ets.select(table(), [{{:"$1", :_}, [], [:"$1"]}])
end

@impl ConfigStorage
@impl true
def get(server) do
case :ets.match(table(), {server, :"$1"}) do
[[topology]] -> topology
_ -> nil
end
end

@impl ConfigStorage
@impl true
def put(server, topology) do
:ets.insert(table(), {server, topology})
end

@impl ConfigStorage
@impl true
def delete(server) do
:ets.delete(table(), server)
end

defp table() do
opts = ConfigStorage.get_options()
Keyword.get(opts, :table_name, @default_table)
end
end
10 changes: 5 additions & 5 deletions lib/broadway/config_storage/persistent_term.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Broadway.ConfigStorage.PersistentTerm do
@moduledoc false
@behaviour Broadway.ConfigStorage

@impl Broadway.ConfigStorage
@impl true
def setup do
unless Code.ensure_loaded?(:persistent_term) do
require Logger
Expand All @@ -13,24 +13,24 @@ defmodule Broadway.ConfigStorage.PersistentTerm do
:ok
end

@impl Broadway.ConfigStorage
@impl true
def list do
for {{Broadway, name}, %Broadway.Topology{}} <- :persistent_term.get() do
name
end
end

@impl Broadway.ConfigStorage
@impl true
def get(server) do
:persistent_term.get({Broadway, server}, nil)
end

@impl Broadway.ConfigStorage
@impl true
def put(server, topology) do
:persistent_term.put({Broadway, server}, topology)
end

@impl Broadway.ConfigStorage
@impl true
def delete(_server) do
# We don't delete from persistent term on purpose. Since the process is
# named, we can assume it does not start dynamically, so it will either
Expand Down
7 changes: 2 additions & 5 deletions lib/broadway/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@ defmodule Broadway.Producer do
If `options` is a keyword list, Broadway injects a `:broadway` option
into such keyword list. This option contains the configuration for the
complete Broadway topology (see `Broadway.start_link/2`. For example, you can use
`options[:broadway][:name]` to uniquely identify the topology,
allowing you to write terms to things such as
[`:persistent_term`](https://erlang.org/doc/man/persistent_term.html)
or ETS tables.
complete Broadway topology (see `Broadway.start_link/2`. For example,
you can use `options[:broadway][:name]` to uniquely identify the topology.
The `:broadway` configuration also has an `:index` key. This
is the index of the producer in its supervision tree (starting
Expand Down
3 changes: 2 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ defmodule Broadway.MixProject do

def application do
[
extra_applications: [:logger]
extra_applications: [:logger],
env: [config_storage: :persistent_term]
]
end

Expand Down
26 changes: 5 additions & 21 deletions test/broadway/config_storage_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,22 @@ defmodule Broadway.ConfigStorageTest do
alias Broadway.ConfigStorage.Ets

setup do
prev = Application.get_env(Broadway, :config_storage)
prev_opts = Application.get_env(Broadway, :config_storage_opts)
prev = Application.fetch_env!(:broadway, :config_storage)

on_exit(fn ->
Application.put_env(Broadway, :config_storage, prev)
Application.put_env(Broadway, :config_storage_opts, prev_opts)
Application.put_env(:broadway, :config_storage, prev)
end)
end

test "ets default options" do
Application.put_env(Broadway, :config_storage, :ets)
Application.put_env(:broadway, :config_storage, :ets)
Ets.setup()
assert [] = Ets.list()
assert Ets.put("some name", %Broadway.Topology{})
assert ["some name"] = Ets.list()
assert %Broadway.Topology{} = Ets.get("some name")
assert :ets.info(Ets.default_table(), :size) == 1
assert :ets.info(Ets.table(), :size) == 1
Ets.delete("some name")
assert :ets.info(Ets.default_table(), :size) == 0
end

test "ets custom name" do
Application.put_env(Broadway, :config_storage, :ets)
Application.put_env(Broadway, :config_storage_opts, table_name: :my_table)
Ets.setup()
assert :ets.info(:my_table, :size) == 0
assert [] = Ets.list()
assert Ets.put("some name", %Broadway.Topology{})
assert ["some name"] = Ets.list()
assert %Broadway.Topology{} = Ets.get("some name")
assert :ets.info(:my_table, :size) == 1
Ets.delete("some name")
assert :ets.info(:my_table, :size) == 0
assert :ets.info(Ets.table(), :size) == 0
end
end

0 comments on commit ddc2b19

Please sign in to comment.