From 4c377e541a2adb6befc651f3dccc6a797bc58ba9 Mon Sep 17 00:00:00 2001 From: Barnabas Jovanovics Date: Fri, 27 Sep 2024 14:56:41 +0200 Subject: [PATCH] feat: subscription dsl (#97) --- .formatter.exs | 7 + config/config.exs | 5 + documentation/dsls/DSL:-AshGraphql.Domain.md | 74 +++- .../dsls/DSL:-AshGraphql.Resource.md | 76 ++++ .../topics/use-subscriptions-with-graphql.md | 76 +++- lib/ash_graphql.ex | 19 +- lib/domain/domain.ex | 57 ++- lib/domain/info.ex | 6 +- lib/graphql/resolver.ex | 149 ++++++- lib/resource/info.ex | 21 + lib/resource/resource.ex | 179 ++++++++- lib/resource/subscription.ex | 45 +++ lib/resource/transformers/subscription.ex | 27 ++ .../verifiers/verify_subscription_actions.ex | 56 +++ .../verifiers/verify_subscription_opt_in.ex | 23 ++ lib/subscription/actor.ex | 10 + lib/subscription/actor_function.ex | 15 + lib/subscription/config.ex | 64 +++ lib/subscription/endpoint.ex | 10 + lib/subscription/notifier.ex | 22 ++ lib/subscription/runner.ex | 52 +++ lib/subscriptions.ex | 15 +- mix.exs | 1 + mix.lock | 7 + test/subscription_test.exs | 373 ++++++++++++++++++ test/support/domain.ex | 7 + test/support/pub_sub.ex | 34 ++ test/support/resources/subscribable.ex | 101 +++++ test/support/schema.ex | 3 + 29 files changed, 1512 insertions(+), 22 deletions(-) create mode 100644 lib/resource/subscription.ex create mode 100644 lib/resource/transformers/subscription.ex create mode 100644 lib/resource/verifiers/verify_subscription_actions.ex create mode 100644 lib/resource/verifiers/verify_subscription_opt_in.ex create mode 100644 lib/subscription/actor.ex create mode 100644 lib/subscription/actor_function.ex create mode 100644 lib/subscription/config.ex create mode 100644 lib/subscription/endpoint.ex create mode 100644 lib/subscription/notifier.ex create mode 100644 lib/subscription/runner.ex create mode 100644 test/subscription_test.exs create mode 100644 test/support/pub_sub.ex create mode 100644 test/support/resources/subscribable.ex diff --git a/.formatter.exs b/.formatter.exs index 21951119..a0a49475 100644 --- a/.formatter.exs +++ b/.formatter.exs @@ -2,6 +2,9 @@ spark_locals_without_parens = [ action: 2, action: 3, action: 4, + action_types: 1, + actions: 1, + actor: 1, allow_nil?: 1, argument_input_types: 1, argument_names: 1, @@ -47,6 +50,7 @@ spark_locals_without_parens = [ paginate_relationship_with: 1, paginate_with: 1, primary_key_delimiter: 1, + pubsub: 1, read_action: 1, read_one: 2, read_one: 3, @@ -58,6 +62,9 @@ spark_locals_without_parens = [ show_fields: 1, show_metadata: 1, show_raised_errors?: 1, + subscribe: 1, + subscribe: 2, + subscribe: 3, tracer: 1, type: 1, type_name: 1, diff --git a/config/config.exs b/config/config.exs index 5635fe54..7c092045 100644 --- a/config/config.exs +++ b/config/config.exs @@ -6,6 +6,11 @@ config :ash, :validate_domain_config_inclusion?, false config :logger, level: :warning +config :ash, :pub_sub, debug?: true +config :logger, level: :info + +config :ash_graphql, :subscriptions, true + if Mix.env() == :dev do config :git_ops, mix_project: AshGraphql.MixProject, diff --git a/documentation/dsls/DSL:-AshGraphql.Domain.md b/documentation/dsls/DSL:-AshGraphql.Domain.md index 2c44a32f..bbce22f8 100644 --- a/documentation/dsls/DSL:-AshGraphql.Domain.md +++ b/documentation/dsls/DSL:-AshGraphql.Domain.md @@ -21,6 +21,8 @@ Domain level configuration for GraphQL * update * destroy * action + * [subscriptions](#graphql-subscriptions) + * subscribe ### Examples @@ -269,9 +271,9 @@ Mutations (create/update/destroy actions) to expose for the resource. ### Examples ``` mutations do - create :create_post, :create - update :update_post, :update - destroy :destroy_post, :destroy + create Post, :create_post, :create + update Post, :update_post, :update + destroy Post, :destroy_post, :destroy end ``` @@ -445,6 +447,72 @@ action :check_status, :check_status Target: `AshGraphql.Resource.Action` +## graphql.subscriptions +Subscriptions to expose for the resource. + + +### Nested DSLs + * [subscribe](#graphql-subscriptions-subscribe) + + +### Examples +``` +subscription do + subscribe Post, :post_created do + action_types(:create) + end +end + +``` + + + + +## graphql.subscriptions.subscribe +```elixir +subscribe resource, name +``` + + +A subscription to listen for changes on the resource + + + +### Examples +``` +subscribe :post_created do + action_types(:create) +end + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`resource`](#graphql-subscriptions-subscribe-resource){: #graphql-subscriptions-subscribe-resource } | `module` | | The resource that the action is defined on | +| [`name`](#graphql-subscriptions-subscribe-name){: #graphql-subscriptions-subscribe-name } | `atom` | | The name to use for the subscription. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`actor`](#graphql-subscriptions-subscribe-actor){: #graphql-subscriptions-subscribe-actor } | `(any -> any) \| module` | | The actor to use for authorization. | +| [`actions`](#graphql-subscriptions-subscribe-actions){: #graphql-subscriptions-subscribe-actions } | `list(atom) \| atom` | | The create/update/destroy actions the subsciption should listen to. | +| [`action_types`](#graphql-subscriptions-subscribe-action_types){: #graphql-subscriptions-subscribe-action_types } | `list(atom) \| atom` | | The type of actions the subsciption should listen to. | +| [`read_action`](#graphql-subscriptions-subscribe-read_action){: #graphql-subscriptions-subscribe-read_action } | `atom` | | The read action to use for reading data | +| [`hide_inputs`](#graphql-subscriptions-subscribe-hide_inputs){: #graphql-subscriptions-subscribe-hide_inputs } | `list(atom)` | `[]` | A list of inputs to hide from the subscription, usable if the read action has arguments. | + + + + + +### Introspection + +Target: `AshGraphql.Resource.Subscription` + + diff --git a/documentation/dsls/DSL:-AshGraphql.Resource.md b/documentation/dsls/DSL:-AshGraphql.Resource.md index 60127fb3..4ef7b381 100644 --- a/documentation/dsls/DSL:-AshGraphql.Resource.md +++ b/documentation/dsls/DSL:-AshGraphql.Resource.md @@ -21,6 +21,8 @@ Configuration for a given resource in graphql * update * destroy * action + * [subscriptions](#graphql-subscriptions) + * subscribe * [managed_relationships](#graphql-managed_relationships) * managed_relationship @@ -464,6 +466,80 @@ action :check_status, :check_status Target: `AshGraphql.Resource.Action` +## graphql.subscriptions +Subscriptions (notifications) to expose for the resource. + + +### Nested DSLs + * [subscribe](#graphql-subscriptions-subscribe) + + +### Examples +``` +subscriptions do + subscribe :bucket_created do + actions :create + read_action :read + end +end + +``` + + + + +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`pubsub`](#graphql-subscriptions-pubsub){: #graphql-subscriptions-pubsub .spark-required} | `module` | | The pubsub module to use for the subscription | + + + +## graphql.subscriptions.subscribe +```elixir +subscribe name +``` + + +A subscription to listen for changes on the resource + + + +### Examples +``` +subscribe :post_created do + action_types(:create) +end + +``` + + + +### Arguments + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`name`](#graphql-subscriptions-subscribe-name){: #graphql-subscriptions-subscribe-name } | `atom` | | The name to use for the subscription. | +### Options + +| Name | Type | Default | Docs | +|------|------|---------|------| +| [`actor`](#graphql-subscriptions-subscribe-actor){: #graphql-subscriptions-subscribe-actor } | `(any -> any) \| module` | | The actor to use for authorization. | +| [`actions`](#graphql-subscriptions-subscribe-actions){: #graphql-subscriptions-subscribe-actions } | `list(atom) \| atom` | | The create/update/destroy actions the subsciption should listen to. | +| [`action_types`](#graphql-subscriptions-subscribe-action_types){: #graphql-subscriptions-subscribe-action_types } | `list(atom) \| atom` | | The type of actions the subsciption should listen to. | +| [`read_action`](#graphql-subscriptions-subscribe-read_action){: #graphql-subscriptions-subscribe-read_action } | `atom` | | The read action to use for reading data | +| [`hide_inputs`](#graphql-subscriptions-subscribe-hide_inputs){: #graphql-subscriptions-subscribe-hide_inputs } | `list(atom)` | `[]` | A list of inputs to hide from the subscription, usable if the read action has arguments. | + + + + + +### Introspection + +Target: `AshGraphql.Resource.Subscription` + + ## graphql.managed_relationships Generates input objects for `manage_relationship` arguments on resource actions. diff --git a/documentation/topics/use-subscriptions-with-graphql.md b/documentation/topics/use-subscriptions-with-graphql.md index eb08a39e..b172f791 100644 --- a/documentation/topics/use-subscriptions-with-graphql.md +++ b/documentation/topics/use-subscriptions-with-graphql.md @@ -1,6 +1,6 @@ # Using Subscriptions -The AshGraphql DSL does not currently support subscriptions. However, you can do this with Absinthe direclty, and use `AshGraphql.Subscription.query_for_subscription/3`. Here is an example of how you could do this for a subscription for a single record. This example could be extended to support lists of records as well. +You can do this with Absinthe directly, and use `AshGraphql.Subscription.query_for_subscription/3`. Here is an example of how you could do this for a subscription for a single record. This example could be extended to support lists of records as well. ```elixir # in your absinthe schema file @@ -27,3 +27,77 @@ subscription do end end ``` + +## Subscription DSL (beta) + +The subscription DSL is currently in beta and before using it you have to enable them in your config. + +```elixir +config :ash_graphql, :policies, show_policy_breakdowns?: true +``` + +First you'll need to do some setup, follow the the [setup guide](https://hexdocs.pm/absinthe/subscriptions.html#absinthe-phoenix-setup) +in the absinthe docs, but instead of using `Absinthe.Pheonix.Endpoint` use `AshGraphql.Subscription.Endpoint`. + +Afterwards add an empty subscription block to your schema module. + +```elixir +defmodule MyAppWeb.Schema do + ... + + subscription do + end +end +``` + +Now you can define subscriptions on your resource or domain + +```elixir +defmodule MyApp.Resource do + use Ash.Resource, + data_layer: Ash.DataLayer.Ets, + extensions: [AshGraphql.Resource] + + graphql do + subscriptions do + subscribe :resource_created do + action_types :create + end + end + end +end +``` + +For further Details checkout the DSL docs for [resource](/documentation/dsls/DSL:-AshGraphql.Resource.md#graphql-subscriptions) and [domain](/documentation/dsls/DSL:-AshGraphql.Domain.md#graphql-subscriptions) + +### Deduplication + +By default, Absinthe will deduplicate subscriptions based on the `context_id`. +We use the some of the context like actor and tenant to create a `context_id` for you. + +If you want to customize the deduplication you can do so by adding a actor function to your subscription. +This function will be called with the actor that subscribes and you can return a more generic actor, this +way you can have one actor for multiple users, which will lead to less resolver executions. + +```elixir +defmodule MyApp.Resource do + use Ash.Resource, + data_layer: Ash.DataLayer.Ets, + extensions: [AshGraphql.Resource] + + graphql do + subscriptions do + subscribe :resource_created do + action_types :create + actor fn actor -> + if check_actor(actor) do + %{id: "your generic actor", ...} + else + actor + end + end + end + end + end +end +``` diff --git a/lib/ash_graphql.ex b/lib/ash_graphql.ex index 441b6763..9f364780 100644 --- a/lib/ash_graphql.ex +++ b/lib/ash_graphql.ex @@ -157,6 +157,7 @@ defmodule AshGraphql do @dialyzer {:nowarn_function, {:run, 2}} def run(blueprint, _opts) do domain = unquote(domain) + action_middleware = unquote(action_middleware) all_domains = unquote(Enum.map(domains, &elem(&1, 0))) @@ -204,6 +205,18 @@ defmodule AshGraphql do Absinthe.Blueprint.add_field(blueprint, "RootMutationType", mutation) end) + blueprint_with_subscriptions = + domain + |> AshGraphql.Domain.subscriptions( + all_domains, + unquote(resources), + action_middleware, + unquote(schema) + ) + |> Enum.reduce(blueprint_with_mutations, fn subscription, blueprint -> + Absinthe.Blueprint.add_field(blueprint, "RootSubscriptionType", subscription) + end) + managed_relationship_types = AshGraphql.Resource.managed_relationship_definitions( Process.get(:managed_relationship_requirements, []), @@ -212,7 +225,7 @@ defmodule AshGraphql do |> Enum.uniq_by(& &1.identifier) |> Enum.reject(fn type -> existing_types = - case blueprint_with_mutations do + case blueprint_with_subscriptions do %{schema_definitions: [%{type_definitions: type_definitions}]} -> type_definitions @@ -293,7 +306,7 @@ defmodule AshGraphql do end new_defs = - List.update_at(blueprint_with_mutations.schema_definitions, 0, fn schema_def -> + List.update_at(blueprint_with_subscriptions.schema_definitions, 0, fn schema_def -> %{ schema_def | type_definitions: @@ -302,7 +315,7 @@ defmodule AshGraphql do } end) - {:ok, %{blueprint_with_mutations | schema_definitions: new_defs}} + {:ok, %{blueprint_with_subscriptions | schema_definitions: new_defs}} end end diff --git a/lib/domain/domain.ex b/lib/domain/domain.ex index 6aff12ff..6ff7a4b2 100644 --- a/lib/domain/domain.ex +++ b/lib/domain/domain.ex @@ -36,9 +36,9 @@ defmodule AshGraphql.Domain do examples: [ """ mutations do - create :create_post, :create - update :update_post, :update - destroy :destroy_post, :destroy + create Post, :create_post, :create + update Post, :update_post, :update + destroy Post, :destroy_post, :destroy end """ ], @@ -57,6 +57,35 @@ defmodule AshGraphql.Domain do ) } + @subscriptions %Spark.Dsl.Section{ + name: :subscriptions, + describe: """ + Subscriptions to expose for the resource. + """, + examples: [ + """ + subscription do + subscribe Post, :post_created do + action_types(:create) + end + end + """ + ], + entities: + Enum.map( + AshGraphql.Resource.subscriptions(), + &%{ + &1 + | args: [:resource | &1.args], + schema: + Keyword.put(&1.schema, :resource, + type: {:spark, Ash.Resource}, + doc: "The resource that the action is defined on" + ) + } + ) + } + @graphql %Spark.Dsl.Section{ name: :graphql, describe: """ @@ -71,7 +100,8 @@ defmodule AshGraphql.Domain do ], sections: [ @queries, - @mutations + @mutations, + @subscriptions ], schema: [ authorize?: [ @@ -209,6 +239,22 @@ defmodule AshGraphql.Domain do ) end + def subscriptions(domain, all_domains, resources, action_middleware, schema) do + resources + |> Enum.filter(fn resource -> + AshGraphql.Resource in Spark.extensions(resource) + end) + |> Enum.flat_map( + &AshGraphql.Resource.subscriptions( + domain, + all_domains, + &1, + action_middleware, + schema + ) + ) + end + @doc false def type_definitions( domain, @@ -226,7 +272,8 @@ defmodule AshGraphql.Domain do |> Enum.flat_map(fn resource -> if AshGraphql.Resource in Spark.extensions(resource) do AshGraphql.Resource.type_definitions(resource, domain, all_domains, schema, relay_ids?) ++ - AshGraphql.Resource.mutation_types(resource, all_domains, schema) + AshGraphql.Resource.mutation_types(resource, all_domains, schema) ++ + AshGraphql.Resource.subscription_types(resource, all_domains, schema) else AshGraphql.Resource.no_graphql_types(resource, schema) end diff --git a/lib/domain/info.ex b/lib/domain/info.ex index 0fbd6820..85c6055a 100644 --- a/lib/domain/info.ex +++ b/lib/domain/info.ex @@ -34,7 +34,7 @@ defmodule AshGraphql.Domain.Info do @doc "The queries exposed by the domain" def queries(resource) do - Extension.get_entities(resource, [:graphql, :queries]) + Extension.get_entities(resource, [:graphql, :queries]) || [] end @doc "The mutations exposed by the domain" @@ -42,6 +42,10 @@ defmodule AshGraphql.Domain.Info do Extension.get_entities(resource, [:graphql, :mutations]) || [] end + def subscriptions(resource) do + Extension.get_entities(resource, [:graphql, :subscriptions]) || [] + end + @doc "Whether or not to render raised errors in the GraphQL response" def show_raised_errors?(domain) do Extension.get_opt(domain, [:graphql], :show_raised_errors?, false, true) diff --git a/lib/graphql/resolver.ex b/lib/graphql/resolver.ex index 18e7863c..0a7b7d82 100644 --- a/lib/graphql/resolver.ex +++ b/lib/graphql/resolver.ex @@ -3,6 +3,7 @@ defmodule AshGraphql.Graphql.Resolver do require Logger import Ash.Expr + require Ash.Query import AshGraphql.TraceHelpers import AshGraphql.ContextHelpers @@ -505,6 +506,135 @@ defmodule AshGraphql.Graphql.Resolver do end end + def resolve( + %{arguments: args, context: context, root_value: notification} = resolution, + {domain, resource, + %AshGraphql.Resource.Subscription{read_action: read_action, name: name}, _input?} + ) do + case handle_arguments(resource, read_action, args) do + {:ok, args} -> + metadata = %{ + domain: domain, + resource: resource, + resource_short_name: Ash.Resource.Info.short_name(resource), + actor: Map.get(context, :actor), + tenant: Map.get(context, :tenant), + action: read_action, + source: :graphql, + subscription: name, + authorize?: AshGraphql.Domain.Info.authorize?(domain) + } + + trace domain, + resource, + :gql_subscription, + name, + metadata do + opts = [ + actor: Map.get(context, :actor), + action: read_action, + authorize?: AshGraphql.Domain.Info.authorize?(domain), + tenant: Map.get(context, :tenant) + ] + + cond do + notification.action.type in [:create, :update] -> + data = notification.data + {filter, args} = Map.pop(args, :filter) + + read_action = + read_action || Ash.Resource.Info.primary_action!(resource, :read).name + + query = + Ash.Resource.Info.primary_key(resource) + |> Enum.reduce(resource, fn key, query -> + value = Map.get(data, key) + Ash.Query.filter(query, ^ref(key) == ^value) + end) + + query = + Ash.Query.do_filter( + query, + massage_filter(query.resource, filter) + ) + + query = + AshGraphql.Subscription.query_for_subscription( + query + |> Ash.Query.for_read(read_action, args, opts), + domain, + resolution, + subscription_result_type(name), + [subcription_field_from_action_type(notification.action.type)] + ) + + result = + with {:ok, true, query} <- + Ash.can( + query, + opts[:actor], + tenant: opts[:tenant], + run_queries?: false, + alter_source?: true + ), + [] <- query.authorize_results, + {:ok, true} <- + Ash.Expr.eval(query.filter, + record: data, + unknown_on_unknown_refs?: true + ) do + Ash.load(data, query) + else + _ -> + query |> Ash.read_one() + end + + case result do + # should only happen if a resource is created/updated and the subscribed user is not allowed to see it + {:ok, nil} -> + resolution + |> Absinthe.Resolution.put_result( + {:error, to_errors([Ash.Error.Query.NotFound.exception()], context, domain)} + ) + + {:ok, result} -> + resolution + |> Absinthe.Resolution.put_result( + {:ok, + %{ + String.to_existing_atom( + subcription_field_from_action_type(notification.action.type) + ) => result + }} + ) + + {:error, error} -> + resolution + |> Absinthe.Resolution.put_result({:error, to_errors([error], context, domain)}) + end + + notification.action.type in [:destroy] -> + resolution + |> Absinthe.Resolution.put_result( + {:ok, + %{ + String.to_existing_atom( + subcription_field_from_action_type(notification.action.type) + ) => AshGraphql.Resource.encode_id(notification.data, false) + }} + ) + end + end + + {:error, error} -> + {:error, error} + end + end + + defp subcription_field_from_action_type(:create), do: "created" + defp subcription_field_from_action_type(:update), do: "updated" + defp subcription_field_from_action_type(:destroy), do: "destroyed" + defp read_one_query(resource, args) do case Map.fetch(args, :filter) do {:ok, filter} when filter != %{} -> @@ -1597,9 +1727,9 @@ defmodule AshGraphql.Graphql.Resolver do end)} end - defp massage_filter(_resource, nil), do: nil + def massage_filter(_resource, nil), do: nil - defp massage_filter(resource, filter) when is_map(filter) do + def massage_filter(resource, filter) when is_map(filter) do Map.new(filter, fn {key, value} -> cond do rel = Ash.Resource.Info.relationship(resource, key) -> @@ -1614,7 +1744,7 @@ defmodule AshGraphql.Graphql.Resolver do end) end - defp massage_filter(_resource, other), do: other + def massage_filter(_resource, other), do: other defp calc_input(key, value) do case Map.fetch(value, :input) do @@ -2210,6 +2340,11 @@ defmodule AshGraphql.Graphql.Resolver do String.to_atom("#{mutation_name}_result") end + # sobelow_skip ["DOS.StringToAtom"] + defp subscription_result_type(subscription_name) do + String.to_atom("#{subscription_name}_result") + end + # sobelow_skip ["DOS.StringToAtom"] defp page_type(resource, strategy, relay?) do type = AshGraphql.Resource.Info.type(resource) @@ -2227,7 +2362,13 @@ defmodule AshGraphql.Graphql.Resolver do end @doc false - def select_fields(query_or_changeset, resource, resolution, type_override, nested \\ []) do + def select_fields( + query_or_changeset, + resource, + resolution, + type_override, + nested \\ [] + ) do subfields = get_select(resource, resolution, type_override, nested) case query_or_changeset do diff --git a/lib/resource/info.ex b/lib/resource/info.ex index 6b298ab7..11e2cbb3 100644 --- a/lib/resource/info.ex +++ b/lib/resource/info.ex @@ -35,6 +35,27 @@ defmodule AshGraphql.Resource.Info do |> Enum.concat(Extension.get_entities(resource, [:graphql, :mutations]) || []) end + @doc "The subscriptions exposed for the resource" + def subscriptions(resource, domain_or_domains \\ []) do + module = + if is_atom(resource) do + resource + else + Spark.Dsl.Extension.get_persisted(resource, :module) + end + + domain_or_domains + |> List.wrap() + |> Enum.flat_map(&AshGraphql.Domain.Info.subscriptions/1) + |> Enum.filter(&(&1.resource == module)) + |> Enum.concat(Extension.get_entities(resource, [:graphql, :subscriptions]) || []) + end + + @doc "The pubsub module used for subscriptions" + def subscription_pubsub(resource) do + Extension.get_opt(resource, [:graphql, :subscriptions], :pubsub) + end + @doc "Wether or not to encode the primary key as a single `id` field when reading and getting" def encode_primary_key?(resource) do Extension.get_opt(resource, [:graphql], :encode_primary_key?, true) diff --git a/lib/resource/resource.ex b/lib/resource/resource.ex index 25b6687a..a0f4e828 100644 --- a/lib/resource/resource.ex +++ b/lib/resource/resource.ex @@ -2,7 +2,7 @@ defmodule AshGraphql.Resource do alias Ash.Changeset.ManagedRelationshipHelpers alias Ash.Query.Aggregate alias AshGraphql.Resource - alias AshGraphql.Resource.{ManagedRelationship, Mutation, Query} + alias AshGraphql.Resource.{ManagedRelationship, Mutation, Query, Subscription} @get %Spark.Dsl.Entity{ name: :get, @@ -272,6 +272,50 @@ defmodule AshGraphql.Resource do def mutations, do: [@create, @update, @destroy, @action] + @subscribe %Spark.Dsl.Entity{ + name: :subscribe, + args: [:name], + describe: "A subscription to listen for changes on the resource", + examples: [ + """ + subscribe :post_created do + action_types(:create) + end + """ + ], + schema: Subscription.schema(), + target: Subscription + } + + @subscriptions %Spark.Dsl.Section{ + name: :subscriptions, + schema: [ + pubsub: [ + type: :module, + required: true, + doc: "The pubsub module to use for the subscription" + ] + ], + describe: """ + Subscriptions (notifications) to expose for the resource. + """, + examples: [ + """ + subscriptions do + subscribe :bucket_created do + actions :create + read_action :read + end + end + """ + ], + entities: [ + @subscribe + ] + } + + def subscriptions, do: [@subscribe] + @graphql %Spark.Dsl.Section{ name: :graphql, imports: [AshGraphql.Resource.Helpers], @@ -406,6 +450,7 @@ defmodule AshGraphql.Resource do sections: [ @queries, @mutations, + @subscriptions, @managed_relationships ] } @@ -413,13 +458,15 @@ defmodule AshGraphql.Resource do @transformers [ AshGraphql.Resource.Transformers.RequireKeysetForRelayQueries, AshGraphql.Resource.Transformers.ValidateActions, - AshGraphql.Resource.Transformers.ValidateCompatibleNames + AshGraphql.Resource.Transformers.ValidateCompatibleNames, + AshGraphql.Resource.Transformers.Subscription ] @verifiers [ AshGraphql.Resource.Verifiers.VerifyQueryMetadata, AshGraphql.Resource.Verifiers.RequirePkeyDelimiter, - AshGraphql.Resource.Verifiers.VerifyPaginateRelationshipWith + AshGraphql.Resource.Verifiers.VerifyPaginateRelationshipWith, + AshGraphql.Resource.Verifiers.VerifySubscriptionOptIn ] @sections [@graphql] @@ -436,6 +483,9 @@ defmodule AshGraphql.Resource do @deprecated "See `AshGraphql.Resource.Info.mutations/1`" defdelegate mutations(resource, domain \\ []), to: AshGraphql.Resource.Info + @deprecated "See `AshGraphql.Resource.Info.mutations/1`" + defdelegate subscriptions(resource, domain \\ []), to: AshGraphql.Resource.Info + @deprecated "See `AshGraphql.Resource.Info.managed_relationships/1`" defdelegate managed_relationships(resource), to: AshGraphql.Resource.Info @@ -1070,6 +1120,68 @@ defmodule AshGraphql.Resource do end) end + @doc false + # sobelow_skip ["DOS.StringToAtom"] + + def subscription_types(resource, all_domains, schema) do + resource + |> subscriptions(all_domains) + |> Enum.map(fn %Subscription{name: name, actions: actions, action_types: action_types} -> + resource_type = AshGraphql.Resource.Info.type(resource) + + action_types = + Ash.Resource.Info.actions(resource) + |> Stream.filter(&(&1.name in List.wrap(actions))) + |> Stream.map(& &1.name) + |> Stream.concat(List.wrap(action_types)) + |> Enum.uniq() + + result_type_name = + name + |> to_string() + |> then(&(&1 <> "_result")) + + result_type = + result_type_name + |> String.to_atom() + + %Absinthe.Blueprint.Schema.ObjectTypeDefinition{ + module: schema, + identifier: result_type, + name: result_type_name, + fields: + [ + :create in action_types && + %Absinthe.Blueprint.Schema.FieldDefinition{ + __reference__: ref(__ENV__), + identifier: :created, + module: schema, + name: "created", + type: resource_type + }, + :update in action_types && + %Absinthe.Blueprint.Schema.FieldDefinition{ + __reference__: ref(__ENV__), + identifier: :updated, + module: schema, + name: "updated", + type: resource_type + }, + :destroy in action_types && + %Absinthe.Blueprint.Schema.FieldDefinition{ + __reference__: ref(__ENV__), + identifier: :destroyed, + module: schema, + name: "destroyed", + type: :id + } + ] + |> Enum.filter(&(&1 != false)), + __reference__: ref(__ENV__) + } + end) + end + defp id_translation_middleware(relay_id_translations, true) do [{{AshGraphql.Graphql.IdTranslator, :translate_relay_ids}, relay_id_translations}] end @@ -1116,6 +1228,42 @@ defmodule AshGraphql.Resource do end end + # sobelow_skip ["DOS.StringToAtom"] + @doc false + + def subscriptions(domain, all_domains, resource, action_middleware, schema) do + resource + |> subscriptions(all_domains) + |> Enum.map(fn %Subscription{name: name, hide_inputs: hide_inputs} = subscription -> + result_type = name |> to_string() |> then(&(&1 <> "_result")) |> String.to_atom() + + action = + Ash.Resource.Info.action(resource, subscription.read_action) || + Ash.Resource.Info.primary_action(resource, :read) + + %Absinthe.Blueprint.Schema.FieldDefinition{ + arguments: args(:subscription, resource, action, schema, nil, hide_inputs), + identifier: name, + name: to_string(name), + config: + AshGraphql.Subscription.Config.create_config( + subscription, + domain, + resource + ), + module: schema, + middleware: + action_middleware ++ + domain_middleware(domain) ++ + [ + {{AshGraphql.Graphql.Resolver, :resolve}, {domain, resource, subscription, true}} + ], + type: result_type, + __reference__: ref(__ENV__) + } + end) + end + @doc false # sobelow_skip ["DOS.StringToAtom"] def embedded_type_input(source_resource, attribute, resource, schema) do @@ -1637,6 +1785,31 @@ defmodule AshGraphql.Resource do read_args(resource, action, schema, hide_inputs) end + defp args(:subscription, resource, action, schema, _identity, hide_inputs, _query) do + args = + if AshGraphql.Resource.Info.derive_filter?(resource) do + case resource_filter_fields(resource, schema) do + [] -> + [] + + _ -> + [ + %Absinthe.Blueprint.Schema.InputValueDefinition{ + name: "filter", + identifier: :filter, + type: resource_filter_type(resource), + description: "A filter to limit the results", + __reference__: ref(__ENV__) + } + ] + end + else + [] + end + + args ++ read_args(resource, action, schema, hide_inputs) + end + defp related_list_args(resource, related_resource, relationship_name, action, schema) do args(:list, related_resource, action, schema) ++ relationship_pagination_args(resource, relationship_name, action) diff --git a/lib/resource/subscription.ex b/lib/resource/subscription.ex new file mode 100644 index 00000000..764b090d --- /dev/null +++ b/lib/resource/subscription.ex @@ -0,0 +1,45 @@ +defmodule AshGraphql.Resource.Subscription do + @moduledoc "Represents a configured query on a resource" + defstruct [ + :name, + :resource, + :actions, + :action_types, + :read_action, + :actor, + :hide_inputs + ] + + @subscription_schema [ + name: [ + type: :atom, + doc: "The name to use for the subscription." + ], + actor: [ + type: + {:spark_function_behaviour, AshGraphql.Subscription.Actor, + {AshGraphql.Subscription.ActorFunction, 1}}, + doc: "The actor to use for authorization." + ], + actions: [ + type: {:or, [{:list, :atom}, :atom]}, + doc: "The create/update/destroy actions the subsciption should listen to." + ], + action_types: [ + type: {:or, [{:list, :atom}, :atom]}, + doc: "The type of actions the subsciption should listen to." + ], + read_action: [ + type: :atom, + doc: "The read action to use for reading data" + ], + hide_inputs: [ + type: {:list, :atom}, + doc: + "A list of inputs to hide from the subscription, usable if the read action has arguments.", + default: [] + ] + ] + + def schema, do: @subscription_schema +end diff --git a/lib/resource/transformers/subscription.ex b/lib/resource/transformers/subscription.ex new file mode 100644 index 00000000..74a28140 --- /dev/null +++ b/lib/resource/transformers/subscription.ex @@ -0,0 +1,27 @@ +defmodule AshGraphql.Resource.Transformers.Subscription do + @moduledoc """ + Adds the notifier for Subscriptions to the Resource + """ + + use Spark.Dsl.Transformer + + alias Spark.Dsl.Transformer + + def transform(dsl) do + case dsl |> Transformer.get_entities([:graphql, :subscriptions]) do + [] -> + {:ok, dsl} + + _ -> + {:ok, + dsl + |> Transformer.persist( + :simple_notifiers, + [ + AshGraphql.Subscription.Notifier + ] ++ + Transformer.get_persisted(dsl, :simple_notifiers, []) + )} + end + end +end diff --git a/lib/resource/verifiers/verify_subscription_actions.ex b/lib/resource/verifiers/verify_subscription_actions.ex new file mode 100644 index 00000000..1856ec83 --- /dev/null +++ b/lib/resource/verifiers/verify_subscription_actions.ex @@ -0,0 +1,56 @@ +defmodule AshGraphql.Resource.Verifiers.VerifySubscriptionActions do + # Validates the paginate_relationship_with option + @moduledoc false + + use Spark.Dsl.Verifier + + alias Spark.Dsl.Transformer + + def verify(dsl) do + dsl + |> AshGraphql.Resource.Info.subscriptions(Ash.Resource.Info.domain(dsl)) + |> Enum.each(&verify_actions(dsl, &1)) + + :ok + end + + defp verify_actions(dsl, subscription) do + unless MapSet.subset?( + MapSet.new(List.wrap(subscription.action_types)), + MapSet.new([:create, :update, :destroy]) + ) do + raise Spark.Error.DslError, + module: Transformer.get_persisted(dsl, :module), + message: "`action_types` values must be on of `[:create, :update, :destroy]`.", + path: [:graphql, :subscriptions, subscription.name, :action_types] + end + + missing_write_actions = + MapSet.difference( + MapSet.new(List.wrap(subscription.actions)), + MapSet.new( + Ash.Resource.Info.actions(dsl) + |> Stream.filter(&(&1.type in [:create, :update, :destroy])) + |> Enum.map(& &1.name) + ) + ) + + unless Enum.empty?(missing_write_actions) do + raise Spark.Error.DslError, + module: Transformer.get_persisted(dsl, :module), + message: + "The actions #{Enum.join(missing_write_actions, ", ")} do not exist on the resource.", + path: [:graphql, :subscriptions, subscription.name, :actions] + end + + unless is_nil(subscription.read_action) or + subscription.read_action in (Ash.Resource.Info.actions(dsl) + |> Stream.filter(&(&1.type == :read)) + |> Enum.map(& &1.name)) do + raise Spark.Error.DslError, + module: Transformer.get_persisted(dsl, :module), + message: "The read action #{subscription.read_action} does not exist on the resource.", + path: [:graphql, :subscriptions, subscription.name, :read_action] + end + end +end diff --git a/lib/resource/verifiers/verify_subscription_opt_in.ex b/lib/resource/verifiers/verify_subscription_opt_in.ex new file mode 100644 index 00000000..42a271e1 --- /dev/null +++ b/lib/resource/verifiers/verify_subscription_opt_in.ex @@ -0,0 +1,23 @@ +defmodule AshGraphql.Resource.Verifiers.VerifySubscriptionOptIn do + # Checks if the users has opted into using subscriptions + @moduledoc false + + use Spark.Dsl.Verifier + alias Spark.Dsl.Transformer + + def verify(dsl) do + has_subscriptions = + not (dsl + |> AshGraphql.Resource.Info.subscriptions() + |> Enum.empty?()) + + if has_subscriptions && not Application.get_env(:ash_graphql, :subscriptions, false) do + raise Spark.Error.DslError, + module: Transformer.get_persisted(dsl, :module), + message: "Subscriptions are in beta and must be enabled in the config", + path: [:graphql, :subscriptions] + end + + :ok + end +end diff --git a/lib/subscription/actor.ex b/lib/subscription/actor.ex new file mode 100644 index 00000000..21d08582 --- /dev/null +++ b/lib/subscription/actor.ex @@ -0,0 +1,10 @@ +defmodule AshGraphql.Subscription.Actor do + @moduledoc """ + Allows the user to substitue an actor for another more generic actor, + this can be used to deduplicate subscription execution + """ + + # I'd like to have the typespec say that actor can be anything + # but that the input and output must be the same + @callback actor(actor :: any, opts :: Keyword.t()) :: actor :: any +end diff --git a/lib/subscription/actor_function.ex b/lib/subscription/actor_function.ex new file mode 100644 index 00000000..a24a6708 --- /dev/null +++ b/lib/subscription/actor_function.ex @@ -0,0 +1,15 @@ +defmodule AshGraphql.Subscription.ActorFunction do + @moduledoc false + + @behaviour AshGraphql.Subscription.Actor + + @impl true + def actor(actor, [{:fun, {m, f, a}}]) do + apply(m, f, [actor | a]) + end + + @impl true + def actor(actor, [{:fun, fun}]) do + fun.(actor) + end +end diff --git a/lib/subscription/config.ex b/lib/subscription/config.ex new file mode 100644 index 00000000..12b520cf --- /dev/null +++ b/lib/subscription/config.ex @@ -0,0 +1,64 @@ +defmodule AshGraphql.Subscription.Config do + @moduledoc """ + Creates a config function used for the absinthe subscription definition + + See https://github.com/absinthe-graphql/absinthe/blob/3d0823bd71c2ebb94357a5588c723e053de8c66a/lib/absinthe/schema/notation.ex#L58 + """ + alias AshGraphql.Resource.Subscription + + def create_config(%Subscription{} = subscription, _domain, resource) do + config_module = String.to_atom(Macro.camelize(Atom.to_string(subscription.name)) <> ".Config") + + defmodule config_module do + require Ash.Query + alias AshGraphql.Graphql.Resolver + + @subscription subscription + @resource resource + def config(args, %{context: context}) do + read_action = + @subscription.read_action || Ash.Resource.Info.primary_action!(@resource, :read).name + + actor = + case @subscription.actor do + {module, opts} -> + module.actor(context[:actor], opts) + + _ -> + context[:actor] + end + + # check with Ash.can? to make sure the user is able to read the resource + # otherwise we return an error here instead of just never sending something + # in the subscription + case Ash.can( + @resource + |> Ash.Query.new() + # not sure if we need this here + |> Ash.Query.do_filter(Resolver.massage_filter(@resource, Map.get(args, :filter))) + |> Ash.Query.set_tenant(context[:tenant]) + |> Ash.Query.for_read(read_action), + actor, + tenant: context[:tenant], + run_queries?: false, + alter_source?: true + ) do + {:ok, true} -> + {:ok, topic: "*", context_id: create_context_id(args, actor, context[:tenant])} + + {:ok, true, _} -> + {:ok, topic: "*", context_id: create_context_id(args, actor, context[:tenant])} + + _ -> + {:error, "unauthorized"} + end + end + + def create_context_id(args, actor, tenant) do + Base.encode64(:crypto.hash(:sha256, :erlang.term_to_binary({args, actor, tenant}))) + end + end + + &config_module.config/2 + end +end diff --git a/lib/subscription/endpoint.ex b/lib/subscription/endpoint.ex new file mode 100644 index 00000000..14fc3933 --- /dev/null +++ b/lib/subscription/endpoint.ex @@ -0,0 +1,10 @@ +defmodule AshGraphql.Subscription.Endpoint do + defmacro __using__(_opts) do + quote do + use Absinthe.Phoenix.Endpoint + + defdelegate run_docset(pubsub, docs_and_topics, notification), + to: AshGraphql.Subscription.Runner + end + end +end diff --git a/lib/subscription/notifier.ex b/lib/subscription/notifier.ex new file mode 100644 index 00000000..a8ed7030 --- /dev/null +++ b/lib/subscription/notifier.ex @@ -0,0 +1,22 @@ +defmodule AshGraphql.Subscription.Notifier do + @moduledoc """ + AshNotifier that triggers absinthe if subscriptions are listening + """ + alias AshGraphql.Resource.Info + use Ash.Notifier + + @impl Ash.Notifier + def notify(notification) do + pub_sub = Info.subscription_pubsub(notification.resource) + + for subscription <- + AshGraphql.Resource.Info.subscriptions(notification.resource, notification.domain) do + if notification.action.name in List.wrap(subscription.actions) or + notification.action.type in List.wrap(subscription.action_types) do + Absinthe.Subscription.publish(pub_sub, notification, [{subscription.name, "*"}]) + end + end + + :ok + end +end diff --git a/lib/subscription/runner.ex b/lib/subscription/runner.ex new file mode 100644 index 00000000..fbbf3e18 --- /dev/null +++ b/lib/subscription/runner.ex @@ -0,0 +1,52 @@ +defmodule AshGraphql.Subscription.Runner do + @moduledoc """ + Custom implementation if the run_docset function for the PubSub module used for Subscriptions + + Mostly a copy of https://github.com/absinthe-graphql/absinthe/blob/3d0823bd71c2ebb94357a5588c723e053de8c66a/lib/absinthe/subscription/local.ex#L40 + but this lets us decide if we want to send the data to the client or not in certain error cases + """ + alias Absinthe.Pipeline.BatchResolver + + require Logger + + def run_docset(pubsub, docs_and_topics, notification) do + for {topic, key_strategy, doc} <- docs_and_topics do + try do + pipeline = + Absinthe.Subscription.Local.pipeline(doc, notification) + + {:ok, %{result: data}, _} = Absinthe.Pipeline.run(doc.source, pipeline) + + Logger.debug(""" + Absinthe Subscription Publication + Field Topic: #{inspect(key_strategy)} + Subscription id: #{inspect(topic)} + Data: #{inspect(data)} + """) + + case should_send?(data) do + false -> + :ok + + true -> + :ok = pubsub.publish_subscription(topic, data) + end + rescue + e -> + BatchResolver.pipeline_error(e, __STACKTRACE__) + end + end + end + + defp should_send?(%{errors: errors}) do + # if the user is not allowed to see the data or the query didn't + # return any data we do not send the error to the client + # because it would just expose unnecessary information + # and the user can not really do anything usefull with it + not (errors + |> List.wrap() + |> Enum.any?(fn error -> Map.get(error, :code) in ["forbidden", "not_found", nil] end)) + end + + defp should_send?(_), do: true +end diff --git a/lib/subscriptions.ex b/lib/subscriptions.ex index 1bec15e5..1e8f935b 100644 --- a/lib/subscriptions.ex +++ b/lib/subscriptions.ex @@ -8,12 +8,23 @@ defmodule AshGraphql.Subscription do @doc """ Produce a query that will load the correct data for a subscription. """ - def query_for_subscription(query, domain, %{context: context} = resolution) do + def query_for_subscription( + query, + domain, + %{context: context} = resolution, + type_override \\ nil, + nested \\ [] + ) do query |> Ash.Query.new() |> Ash.Query.set_tenant(Map.get(context, :tenant)) |> Ash.Query.set_context(get_context(context)) - |> AshGraphql.Graphql.Resolver.select_fields(query.resource, resolution, nil) + |> AshGraphql.Graphql.Resolver.select_fields( + query.resource, + resolution, + type_override, + nested + ) |> AshGraphql.Graphql.Resolver.load_fields( [ domain: domain, diff --git a/mix.exs b/mix.exs index 6e2e7132..6b8f8762 100644 --- a/mix.exs +++ b/mix.exs @@ -142,6 +142,7 @@ defmodule AshGraphql.MixProject do {:ash, ash_version("~> 3.0 and >= 3.2.3")}, {:absinthe_plug, "~> 1.4"}, {:absinthe, "~> 1.7"}, + {:absinthe_phoenix, "~> 2.0.0", optional: true}, {:jason, "~> 1.2"}, {:igniter, "~> 0.3 and >= 0.3.34"}, {:spark, "~> 2.2 and >= 2.2.10"}, diff --git a/mix.lock b/mix.lock index 6e62d5d5..8a1b0945 100644 --- a/mix.lock +++ b/mix.lock @@ -1,8 +1,10 @@ %{ "absinthe": {:hex, :absinthe, "1.7.8", "43443d12ad2b4fcce60e257ac71caf3081f3d5c4ddd5eac63a02628bcaf5b556", [:mix], [{:dataloader, "~> 1.0.0 or ~> 2.0", [hex: :dataloader, repo: "hexpm", optional: true]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}, {:opentelemetry_process_propagator, "~> 0.2.1 or ~> 0.3", [hex: :opentelemetry_process_propagator, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "c4085df201892a498384f997649aedb37a4ce8a726c170d5b5617ed3bf45d40b"}, + "absinthe_phoenix": {:hex, :absinthe_phoenix, "2.0.3", "74e0862f280424b7bc290f6f69e133268bce0b4e7db0218c7e129c5c2b1d3fd4", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:absinthe_plug, "~> 1.5", [hex: :absinthe_plug, repo: "hexpm", optional: false]}, {:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.5", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.13 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}], "hexpm", "caffaea03c17ea7419fe07e4bc04c2399c47f0d8736900623dbf4749a826fd2c"}, "absinthe_plug": {:hex, :absinthe_plug, "1.5.8", "38d230641ba9dca8f72f1fed2dfc8abd53b3907d1996363da32434ab6ee5d6ab", [:mix], [{:absinthe, "~> 1.5", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "bbb04176647b735828861e7b2705465e53e2cf54ccf5a73ddd1ebd855f996e5a"}, "ash": {:hex, :ash, "3.4.21", "d97b060c64084613ca8317272864be908d591aaa30671d1b04de41f82f8ce368", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:ecto, "~> 3.7", [hex: :ecto, repo: "hexpm", optional: false]}, {:ets, "~> 0.8", [hex: :ets, repo: "hexpm", optional: false]}, {:igniter, ">= 0.3.36 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: false]}, {:jason, ">= 1.0.0", [hex: :jason, repo: "hexpm", optional: false]}, {:owl, "~> 0.11", [hex: :owl, repo: "hexpm", optional: false]}, {:picosat_elixir, "~> 0.2", [hex: :picosat_elixir, repo: "hexpm", optional: true]}, {:plug, ">= 0.0.0", [hex: :plug, repo: "hexpm", optional: true]}, {:reactor, "~> 0.9", [hex: :reactor, repo: "hexpm", optional: false]}, {:simple_sat, ">= 0.1.1 and < 1.0.0-0", [hex: :simple_sat, repo: "hexpm", optional: true]}, {:spark, ">= 2.2.29 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:stream_data, "~> 1.0", [hex: :stream_data, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "1fedea9b994c4b1d18722d49333fd8f30db4af058c9d56cd8cc438b420e6a6a8"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "castore": {:hex, :castore, "1.0.9", "5cc77474afadf02c7c017823f460a17daa7908e991b0cc917febc90e466a375c", [:mix], [], "hexpm", "5ea956504f1ba6f2b4eb707061d8e17870de2bee95fb59d512872c2ef06925e7"}, "credo": {:hex, :credo, "1.7.7", "771445037228f763f9b2afd612b6aa2fd8e28432a95dbbc60d8e03ce71ba4446", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "8bc87496c9aaacdc3f90f01b7b0582467b69b4bd2441fe8aae3109d843cc2f2e"}, "decimal": {:hex, :decimal, "2.1.1", "5611dca5d4b2c3dd497dec8f68751f1f1a54755e8ed2a966c2633cf885973ad6", [:mix], [], "hexpm", "53cfe5f497ed0e7771ae1a475575603d77425099ba5faef9394932b35020ffcc"}, "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, @@ -28,6 +30,9 @@ "mix_test_watch": {:hex, :mix_test_watch, "1.2.0", "1f9acd9e1104f62f280e30fc2243ae5e6d8ddc2f7f4dc9bceb454b9a41c82b42", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "278dc955c20b3fb9a3168b5c2493c2e5cffad133548d307e0a50c7f2cfbf34f6"}, "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, "owl": {:hex, :owl, "0.11.0", "2cd46185d330aa2400f1c8c3cddf8d2ff6320baeff23321d1810e58127082cae", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "73f5783f0e963cc04a061be717a0dbb3e49ae0c4bfd55fb4b78ece8d33a65efe"}, + "phoenix": {:hex, :phoenix, "1.7.14", "a7d0b3f1bc95987044ddada111e77bd7f75646a08518942c72a8440278ae7825", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "c7859bc56cc5dfef19ecfc240775dae358cbaa530231118a9e014df392ace61a"}, + "phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.3", "3168d78ba41835aecad272d5e8cd51aa87a7ac9eb836eabc42f6e57538e3731d", [:mix], [], "hexpm", "bba06bc1dcfd8cb086759f0edc94a8ba2bc8896d5331a1e2c2902bf8e36ee502"}, + "phoenix_template": {:hex, :phoenix_template, "1.0.4", "e2092c132f3b5e5b2d49c96695342eb36d0ed514c5b252a77048d5969330d639", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "2c0c81f0e5c6753faf5cca2f229c9709919aba34fab866d3bc05060c9c444206"}, "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, "reactor": {:hex, :reactor, "0.10.0", "1206113c21ba69b889e072b2c189c05a7aced523b9c3cb8dbe2dab7062cb699a", [:mix], [{:igniter, "~> 0.2", [hex: :igniter, repo: "hexpm", optional: false]}, {:iterex, "~> 0.1", [hex: :iterex, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "4003c33e4c8b10b38897badea395e404d74d59a31beb30469a220f2b1ffe6457"}, @@ -40,6 +45,8 @@ "splode": {:hex, :splode, "0.2.4", "71046334c39605095ca4bed5d008372e56454060997da14f9868534c17b84b53", [:mix], [], "hexpm", "ca3b95f0d8d4b482b5357954fec857abd0fa3ea509d623334c1328e7382044c2"}, "stream_data": {:hex, :stream_data, "1.1.1", "fd515ca95619cca83ba08b20f5e814aaf1e5ebff114659dc9731f966c9226246", [:mix], [], "hexpm", "45d0cd46bd06738463fd53f22b70042dbb58c384bb99ef4e7576e7bb7d3b8c8c"}, "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, + "websock_adapter": {:hex, :websock_adapter, "0.5.7", "65fa74042530064ef0570b75b43f5c49bb8b235d6515671b3d250022cb8a1f9e", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "d0f478ee64deddfec64b800673fd6e0c8888b079d9f3444dd96d2a98383bdbd1"}, "yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"}, "yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"}, } diff --git a/test/subscription_test.exs b/test/subscription_test.exs new file mode 100644 index 00000000..f219e091 --- /dev/null +++ b/test/subscription_test.exs @@ -0,0 +1,373 @@ +defmodule AshGraphql.SubscriptionTest do + use ExUnit.Case + + alias AshGraphql.Test.PubSub + alias AshGraphql.Test.Schema + alias AshGraphql.Test.Subscribable + + def assert_down(pid) do + ref = Process.monitor(pid) + + assert_receive {:DOWN, ^ref, _, _, _} + end + + setup do + Application.put_env(PubSub, :notifier_test_pid, self()) + {:ok, pubsub} = PubSub.start_link() + {:ok, absinthe_sub} = Absinthe.Subscription.start_link(PubSub) + :ok + + on_exit(fn -> + Process.exit(pubsub, :normal) + Process.exit(absinthe_sub, :normal) + # block until the processes have exited + assert_down(pubsub) + assert_down(absinthe_sub) + end) + end + + @admin %{ + id: 1, + role: :admin + } + + test "can subscribe to all action types resource" do + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: @admin, pubsub: PubSub} + ) + + create_mutation = """ + mutation CreateSubscribable($input: CreateSubscribableInput) { + createSubscribable(input: $input) { + result{ + id + text + } + errors{ + message + } + } + } + """ + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(create_mutation, Schema, + variables: %{"input" => %{"text" => "foo"}}, + context: %{actor: @admin} + ) + + assert Enum.empty?(mutation_result["createSubscribable"]["errors"]) + + subscribable_id = mutation_result["createSubscribable"]["result"]["id"] + refute is_nil(subscribable_id) + + assert_receive({^topic, %{data: subscription_data}}) + + assert subscribable_id == + subscription_data["subscribableEvents"]["created"]["id"] + + update_mutation = """ + mutation CreateSubscribable($id: ID! $input: UpdateSubscribableInput) { + updateSubscribable(id: $id, input: $input) { + result{ + id + text + } + errors{ + message + } + } + } + """ + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(update_mutation, Schema, + variables: %{"id" => subscribable_id, "input" => %{"text" => "bar"}}, + context: %{actor: @admin} + ) + + assert Enum.empty?(mutation_result["updateSubscribable"]["errors"]) + + assert_receive({^topic, %{data: subscription_data}}) + + assert subscription_data["subscribableEvents"]["updated"]["text"] == "bar" + + destroy_mutation = """ + mutation CreateSubscribable($id: ID!) { + destroySubscribable(id: $id) { + result{ + id + } + errors{ + message + } + } + } + """ + + assert {:ok, %{data: mutation_result}} = + Absinthe.run(destroy_mutation, Schema, + variables: %{"id" => subscribable_id}, + context: %{actor: @admin} + ) + + assert Enum.empty?(mutation_result["destroySubscribable"]["errors"]) + + assert_receive({^topic, %{data: subscription_data}}) + + assert subscription_data["subscribableEvents"]["destroyed"] == subscribable_id + end + + test "policies are applied to subscriptions" do + actor1 = %{ + id: 1, + role: :user + } + + actor2 = %{ + id: 2, + role: :user + } + + assert {:ok, %{"subscribed" => topic1}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: actor1, pubsub: PubSub} + ) + + assert {:ok, %{"subscribed" => topic2}} = + Absinthe.run( + """ + subscription { + subscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """, + Schema, + context: %{actor: actor2, pubsub: PubSub} + ) + + assert topic1 != topic2 + + subscribable = + Subscribable + |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) + |> Ash.create!() + + # actor1 will get data because it can see the resource + assert_receive {^topic1, %{data: subscription_data}} + # actor 2 will not get data because it cannot see the resource + refute_receive({^topic2, _}) + + assert subscribable.id == + subscription_data["subscribableEvents"]["created"]["id"] + end + + test "can dedup with actor fun" do + actor1 = %{ + id: 1, + role: :user + } + + actor2 = %{ + id: 2, + role: :user + } + + subscription = """ + subscription { + dedupedSubscribableEvents { + created { + id + text + } + updated { + id + text + } + destroyed + } + } + """ + + assert {:ok, %{"subscribed" => topic1}} = + Absinthe.run( + subscription, + Schema, + context: %{actor: actor1, pubsub: PubSub} + ) + + assert {:ok, %{"subscribed" => topic2}} = + Absinthe.run( + subscription, + Schema, + context: %{actor: actor2, pubsub: PubSub} + ) + + assert topic1 == topic2 + + subscribable = + Subscribable + |> Ash.Changeset.for_create(:create, %{text: "foo", actor_id: 1}, actor: @admin) + |> Ash.create!() + + assert_receive {^topic1, %{data: subscription_data}} + + assert subscribable.id == + subscription_data["dedupedSubscribableEvents"]["created"]["id"] + end + + test "can subscribe to read actions that take arguments" do + actor1 = %{ + id: 1, + role: :user + } + + subscription = """ + subscription WithArguments($topic: String!) { + subscribableEventsWithArguments(topic: $topic) { + created { + id + text + } + } + } + """ + + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + subscription, + Schema, + variables: %{"topic" => "news"}, + context: %{actor: actor1, pubsub: PubSub} + ) + + subscribable = + Subscribable + |> Ash.Changeset.for_create(:create, %{text: "foo", topic: "news", actor_id: 1}, + actor: @admin + ) + |> Ash.create!() + + assert_receive {^topic, %{data: subscription_data}} + + assert subscribable.id == + subscription_data["subscribableEventsWithArguments"]["created"]["id"] + end + + test "can subscribe on the domain" do + actor1 = %{ + id: 1, + role: :user + } + + subscription = """ + subscription { + subscribedOnDomain { + created { + id + text + } + } + } + """ + + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + subscription, + Schema, + context: %{actor: actor1, pubsub: PubSub} + ) + + subscribable = + Subscribable + |> Ash.Changeset.for_create(:create, %{text: "foo", topic: "news", actor_id: 1}, + actor: @admin + ) + |> Ash.create!() + + assert_receive {^topic, %{data: subscription_data}} + + assert subscribable.id == + subscription_data["subscribedOnDomain"]["created"]["id"] + end + + test "can not see forbidden field" do + actor1 = %{ + id: 1, + role: :user + } + + subscription = """ + subscription { + subscribedOnDomain { + created { + id + text + hiddenField + } + } + } + """ + + assert {:ok, %{"subscribed" => topic}} = + Absinthe.run( + subscription, + Schema, + context: %{actor: actor1, pubsub: PubSub} + ) + + Subscribable + |> Ash.Changeset.for_create(:create, %{text: "foo", topic: "news", actor_id: 1}, + actor: @admin + ) + |> Ash.create!() + + assert_receive {^topic, %{data: subscription_data, errors: errors}} + + assert is_nil(subscription_data["subscribedOnDomain"]["created"]) + refute Enum.empty?(errors) + assert [%{code: "forbidden_field"}] = errors + end +end diff --git a/test/support/domain.ex b/test/support/domain.ex index 916b6825..65cf0f8f 100644 --- a/test/support/domain.ex +++ b/test/support/domain.ex @@ -12,6 +12,12 @@ defmodule AshGraphql.Test.Domain do get AshGraphql.Test.Comment, :get_comment, :read list AshGraphql.Test.Post, :post_score, :score end + + subscriptions do + subscribe AshGraphql.Test.Subscribable, :subscribed_on_domain do + action_types(:create) + end + end end resources do @@ -45,5 +51,6 @@ defmodule AshGraphql.Test.Domain do resource(AshGraphql.Test.Message) resource(AshGraphql.Test.TextMessage) resource(AshGraphql.Test.ImageMessage) + resource(AshGraphql.Test.Subscribable) end end diff --git a/test/support/pub_sub.ex b/test/support/pub_sub.ex new file mode 100644 index 00000000..4edec1ac --- /dev/null +++ b/test/support/pub_sub.ex @@ -0,0 +1,34 @@ +defmodule AshGraphql.Test.PubSub do + @moduledoc """ + PubSub mock implementation for subscription tests + """ + @behaviour Absinthe.Subscription.Pubsub + + def start_link do + Registry.start_link(keys: :duplicate, name: __MODULE__) + end + + def node_name do + Atom.to_string(node()) + end + + def subscribe(_topic) do + :ok + end + + defdelegate run_docset(pubsub, docs_and_topics, mutation_result), + to: AshGraphql.Subscription.Runner + + def publish_subscription(topic, data) do + send( + Application.get_env(__MODULE__, :notifier_test_pid), + {topic, data} + ) + + :ok + end + + def publish_mutation(_proxy_topic, _mutation_result, _subscribed_fields) do + :ok + end +end diff --git a/test/support/resources/subscribable.ex b/test/support/resources/subscribable.ex new file mode 100644 index 00000000..4979cefe --- /dev/null +++ b/test/support/resources/subscribable.ex @@ -0,0 +1,101 @@ +defmodule AshGraphql.Test.Subscribable do + @moduledoc false + use Ash.Resource, + domain: AshGraphql.Test.Domain, + data_layer: Ash.DataLayer.Ets, + authorizers: [Ash.Policy.Authorizer], + extensions: [AshGraphql.Resource] + + require Ash.Query + + graphql do + type :subscribable + + queries do + get :get_subscribable, :read + end + + mutations do + create :create_subscribable, :create + update :update_subscribable, :update + destroy :destroy_subscribable, :destroy + end + + subscriptions do + pubsub(AshGraphql.Test.PubSub) + + subscribe(:subscribable_events) do + action_types([:create, :update, :destroy]) + end + + subscribe(:deduped_subscribable_events) do + actions([:create, :update, :destroy]) + read_action(:open_read) + + actor(fn _ -> + %{id: -1, role: :deduped_actor} + end) + end + + subscribe(:subscribable_events_with_arguments) do + read_action(:read_with_arg) + actions([:create]) + end + end + end + + policies do + bypass actor_attribute_equals(:role, :admin) do + authorize_if(always()) + end + + policy action(:read) do + authorize_if(expr(actor_id == ^actor(:id))) + end + + policy action([:open_read, :read_with_arg]) do + authorize_if(always()) + end + end + + field_policies do + field_policy :hidden_field do + authorize_if(actor_attribute_equals(:role, :admin)) + end + + field_policy :* do + authorize_if(always()) + end + end + + actions do + default_accept(:*) + defaults([:create, :read, :update, :destroy]) + + read(:open_read) + + read :read_with_arg do + argument(:topic, :string) do + allow_nil? false + end + + filter(expr(topic == ^arg(:topic))) + end + end + + attributes do + uuid_primary_key(:id) + + attribute(:hidden_field, :string) do + public?(true) + default("hidden") + allow_nil?(false) + end + + attribute(:text, :string, public?: true) + attribute(:topic, :string, public?: true) + attribute(:actor_id, :integer, public?: true) + create_timestamp(:created_at) + update_timestamp(:updated_at) + end +end diff --git a/test/support/schema.ex b/test/support/schema.ex index e4fc6f18..64b2ca72 100644 --- a/test/support/schema.ex +++ b/test/support/schema.ex @@ -27,4 +27,7 @@ defmodule AshGraphql.Test.Schema do value(:open, description: "The post is open") value(:closed, description: "The post is closed") end + + subscription do + end end