Skip to content

Commit

Permalink
improvement: implement a subscription notification batcher (#217)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Barnabas Jovanovics <[email protected]>
  • Loading branch information
zachdaniel and barnabasJ authored Oct 7, 2024
1 parent d2312f6 commit 3cb2c98
Show file tree
Hide file tree
Showing 10 changed files with 794 additions and 48 deletions.
2 changes: 1 addition & 1 deletion .formatter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ spark_locals_without_parens = [
]

[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"],
inputs: ["{mix,.formatter}.exs", "{config,lib,test,benchmarks}/**/*.{ex,exs}"],
locals_without_parens: spark_locals_without_parens,
export: [
locals_without_parens: spark_locals_without_parens
Expand Down
125 changes: 125 additions & 0 deletions benchmarks/subscriptions.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
alias AshGraphql.Test.PubSub
alias AshGraphql.Test.Schema

{:ok, _pubsub} = PubSub.start_link()
{:ok, _absinthe_sub} = Absinthe.Subscription.start_link(PubSub)

# Application.put_env(:ash_graphql, :simulate_subscription_processing_time, 1000)
:ok

admin = %{
id: 0,
role: :admin
}

create_mutation = """
mutation CreateSubscribable($input: CreateSubscribableInput) {
createSubscribable(input: $input) {
result{
id
text
}
errors{
message
}
}
}
"""

AshGraphql.Subscription.Batcher.start_link()

Benchee.run(
%{
"1 mutation" => fn _input ->
Absinthe.run(create_mutation, Schema,
variables: %{"input" => %{"text" => "foo"}},
context: %{actor: admin}
)
end
},
inputs: %{
"25 same subscribers" => {25, :same},
"500 same subscribers" => {500, :same},
"50 mixed subscribers" => {25, [:same, :different]},
"1000 mixed subscribers" => {500, [:same, :different]}
},
after_scenario: fn _ ->
count = fn counter ->
receive do
_msg ->
1 + counter.(counter)
after
0 -> 0
end
end

AshGraphql.Subscription.Batcher.drain()

IO.puts("Received #{count.(count)} messages")
end,
before_scenario: fn {input, types} ->
Application.put_env(PubSub, :notifier_test_pid, self())

if :different in List.wrap(types) do
Enum.each(1..input, fn i ->
actor = %{
id: i,
role: :admin
}

{:ok, %{"subscribed" => _topic}} =
Absinthe.run(
"""
subscription {
subscribableEvents {
created {
id
text
}
updated {
id
text
}
destroyed
}
}
""",
Schema,
context: %{actor: actor, pubsub: PubSub}
)
end)
end

if :same in List.wrap(types) do
Enum.each(1..input, fn _i ->
actor = %{
id: -1,
role: :admin
}

{:ok, %{"subscribed" => _topic}} =
Absinthe.run(
"""
subscription {
subscribableEvents {
created {
id
text
}
updated {
id
text
}
destroyed
}
}
""",
Schema,
context: %{actor: actor, pubsub: PubSub}
)
end)
end
end
)

AshGraphql.Subscription.Batcher.drain()
6 changes: 4 additions & 2 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ config :ash, :disable_async?, true
config :ash, :validate_domain_resource_inclusion?, false
config :ash, :validate_domain_config_inclusion?, false

config :logger, level: :warning

config :ash, :pub_sub, debug?: true
config :logger, level: :info

config :ash_graphql, :subscriptions, true

if Mix.env() == :test do
config :ash_graphql, :simulate_subscription_slowness?, true
end

if Mix.env() == :dev do
config :git_ops,
mix_project: AshGraphql.MixProject,
Expand Down
27 changes: 26 additions & 1 deletion documentation/topics/use-subscriptions-with-graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,39 @@ end

The subscription DSL is currently in beta and before using it you have to enable them in your config.

> ### Subscription response order {: .warning}
>
> The order in which the subscription responses are sent to the client is not guaranteed to be the
> same as the order in which the mutations were executed.
```elixir
config :ash_graphql, :policies, show_policy_breakdowns?: true
```

First you'll need to do some setup, follow the the [setup guide](https://hexdocs.pm/absinthe/subscriptions.html#absinthe-phoenix-setup)
in the absinthe docs, but instead of using `Absinthe.Pheonix.Endpoint` use `AshGraphql.Subscription.Endpoint`.

Afterwards add an empty subscription block to your schema module.
By default subscriptions are resolved synchronously as part of the mutation. This means that a resolver is run for every subscriber that
is not deduplicated. If you have a lot of subscribers you can add the `AshGraphql.Subscription.Batcher` to your supervision tree, which
batches up notifications and runs subscription resolution out-of-band.

```elixir
@impl true
def start(_type, _args) do
children = [
...,
{Absinthe.Subscription, MyAppWeb.Endpoint},
AshGraphql.Subscription.Batcher
]

# See https://hexdocs.pm/elixir/Supervisor.html
# for other strategies and supported options
opts = [strategy: :one_for_one, name: MyAppWeb.Supervisor]
Supervisor.start_link(children, opts)
end
```

Afterwards, add an empty subscription block to your schema module.

```elixir
defmodule MyAppWeb.Schema do
Expand Down
168 changes: 168 additions & 0 deletions lib/graphql/resolver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,166 @@ defmodule AshGraphql.Graphql.Resolver do
end
end

def resolve(
%{root_value: {:pre_resolved, item}} = resolution,
{_, _, %AshGraphql.Resource.Subscription{}, _}
) do
Absinthe.Resolution.put_result(
resolution,
{:ok, item}
)
end

def resolve(
%{arguments: args, context: context, root_value: notifications} = resolution,
{domain, resource,
%AshGraphql.Resource.Subscription{read_action: read_action, name: name}, _input?}
)
when is_list(notifications) do
case handle_arguments(resource, read_action, args) do
{:ok, args} ->
metadata = %{
domain: domain,
resource: resource,
resource_short_name: Ash.Resource.Info.short_name(resource),
actor: Map.get(context, :actor),
tenant: Map.get(context, :tenant),
action: read_action,
source: :graphql,
subscription: name,
authorize?: AshGraphql.Domain.Info.authorize?(domain)
}

trace domain,
resource,
:gql_subscription,
name,
metadata do
opts = [
actor: Map.get(context, :actor),
action: read_action,
authorize?: AshGraphql.Domain.Info.authorize?(domain),
tenant: Map.get(context, :tenant)
]

subscription_events =
notifications
|> Enum.group_by(& &1.action.type)
|> Enum.map(fn {type, notifications} ->
subscription_field = subcription_field_from_action_type(type)
key = String.to_existing_atom(subscription_field)

if type in [:create, :update] do
data = Enum.map(notifications, & &1.data)
{filter, args} = Map.pop(args, :filter)

read_action =
read_action || Ash.Resource.Info.primary_action!(resource, :read).name

# read the records that were just created/updated
query =
resource
|> Ash.Query.do_filter(massage_filter(resource, filter))
|> Ash.Query.for_read(read_action, args, opts)
|> AshGraphql.Subscription.query_for_subscription(
domain,
resolution,
subscription_result_type(name),
[subscription_field]
)

query_with_authorization_rules =
Ash.can(
query,
opts[:actor],
tenant: opts[:tenant],
run_queries?: false,
alter_source?: true
)

current_filter = query.filter

{known_results, need_refetch} =
case query_with_authorization_rules do
{:ok, true, %{authorize_results: [], filter: nil} = query} ->
{data, []}

{:ok, true,
%{authorize_results: [], filter: %Ash.Filter{expression: nil}} = query} ->
{data, []}

{:ok, true, %{authorize_results: []} = query} ->
Enum.reduce(data, {[], []}, fn record, {known, refetch} ->
case Ash.Expr.eval(query.filter,
record: data,
unknown_on_unknown_refs?: true
) do
{:ok, true} ->
{[record | known], refetch}

{:ok, false} ->
{known, refetch}

_ ->
{known, [record | refetch]}
end
end)

{:error, false, _} ->
{[], []}

_ ->
{[], data}
end

primary_key = Ash.Resource.Info.primary_key(resource)

primary_key_matches =
Enum.map(need_refetch, fn record ->
Map.take(record, primary_key)
end)

with {:ok, known_results} <- Ash.load(known_results, query),
{:ok, need_refetch} <- do_refetch(query, primary_key_matches) do
known_results
|> Stream.concat(need_refetch)
|> Enum.map(fn record ->
%{key => record}
end)
else
{:error, error} ->
# caught by the batch resolver
raise Ash.Error.to_error_class(error)
end
else
Enum.map(notifications, fn notification ->
%{key => AshGraphql.Resource.encode_id(notification.data, false)}
end)
end
end)

case List.flatten(subscription_events) do
[] ->
Absinthe.Resolution.put_result(
resolution,
{:error, to_errors([Ash.Error.Query.NotFound.exception()], context, domain)}
)

[first | rest] ->
Process.put(:batch_resolved, rest)

Absinthe.Resolution.put_result(
resolution,
{:ok, first}
)
end
end

{:error, error} ->
{:error, error}
end
end

def resolve(
%{arguments: args, context: context, root_value: notification} = resolution,
{domain, resource,
Expand Down Expand Up @@ -631,6 +791,14 @@ defmodule AshGraphql.Graphql.Resolver do
end
end

defp do_refetch(_query, []) do
{:ok, []}
end

defp do_refetch(query, primary_key_matches) do
Ash.read(Ash.Query.do_filter(query, or: primary_key_matches))
end

defp subcription_field_from_action_type(:create), do: "created"
defp subcription_field_from_action_type(:update), do: "updated"
defp subcription_field_from_action_type(:destroy), do: "destroyed"
Expand Down
Loading

0 comments on commit 3cb2c98

Please sign in to comment.