From dbfd4b0264c9ff24974fa4d3afd18ef562737653 Mon Sep 17 00:00:00 2001 From: Pablo Polvorin Date: Fri, 6 Oct 2023 22:08:32 -0300 Subject: [PATCH] feat(elixir): basic api and access control for relays allow custom attributes when registering local addresses, for now only used for attaching metadata to relays. Use that metadata to store the identifier that creatd the relay, to enforce access control over it (who can take it over / delete it) --- .../elixir/ockam/ockam/lib/ockam/node.ex | 12 +- .../ockam/ockam/lib/ockam/node/registry.ex | 36 ++-- .../ockam/lib/ockam/session/pluggable.ex | 4 +- .../elixir/ockam/ockam/lib/ockam/worker.ex | 29 +++- .../lib/services/api/endpoint.ex | 2 +- .../lib/services/api/static_forwarding_api.ex | 53 ------ .../lib/services/provider/routing.ex | 6 +- .../lib/services/relay/static_forwarding.ex | 113 +++++++++++++ .../services/relay/static_forwarding_api.ex | 149 ++++++++++++++++ .../lib/services/relay/types.ex | 40 +++++ .../lib/services/relay/worker.ex | 95 +++++++++++ .../lib/services/static_forwarding.ex | 137 --------------- .../elixir/ockam/ockam_services/mix.exs | 1 + .../elixir/ockam/ockam_services/mix.lock | 1 + .../test/authorization_test.exs | 10 +- .../test/services/forwarding_test.exs | 23 +-- .../services/static_forwarding_api_test.exs | 160 ++++++++++++++++++ .../test/services/static_forwarding_test.exs | 159 ++++++++++++++++- .../services/token_lease_manager_test.exs | 2 +- .../ockam/ockam_typed_cbor/lib/typed_cbor.ex | 3 +- 20 files changed, 795 insertions(+), 240 deletions(-) delete mode 100644 implementations/elixir/ockam/ockam_services/lib/services/api/static_forwarding_api.ex create mode 100644 implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding.ex create mode 100644 implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding_api.ex create mode 100644 implementations/elixir/ockam/ockam_services/lib/services/relay/types.ex create mode 100644 implementations/elixir/ockam/ockam_services/lib/services/relay/worker.ex delete mode 100644 implementations/elixir/ockam/ockam_services/lib/services/static_forwarding.ex create mode 100644 implementations/elixir/ockam/ockam_services/test/services/static_forwarding_api_test.exs diff --git a/implementations/elixir/ockam/ockam/lib/ockam/node.ex b/implementations/elixir/ockam/ockam/lib/ockam/node.ex index 8e415bfad54..94740e99b9e 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/node.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/node.ex @@ -42,19 +42,21 @@ defmodule Ockam.Node do def register_address(address, module \\ nil) do self = self() - case Registry.register(address, module) do + case Registry.register(address, %{module: module, attributes: %{}}) do :ok -> :ok {:error, {:already_registered, ^self}} -> :ok error -> error end end - @spec set_address_module(any(), module()) :: :ok | :error + @spec update_address_metadata(any(), (map() -> %{module: module(), attributes: map()})) :: + :ok | :error @doc """ - Sets module name for already registered process + Sets module name and attributes for already registered process. + This can only be called for the process registered at that address itself """ - def set_address_module(address, module) do - Registry.set_module(address, module) + def update_address_metadata(address, callback) do + Registry.update_metadata(address, callback) end @spec get_address_module(any()) :: {:ok, module()} | :error diff --git a/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex b/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex index 54702add852..c859fb0110e 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex @@ -71,32 +71,48 @@ defmodule Ockam.Node.Registry do # This function is used in custom process registration # # Module should be the worker implementation module - def register(address, module) do - case Registry.register(__MODULE__, address, module) do + def register(address, meta) when is_map(meta) or is_nil(meta) do + case Registry.register(__MODULE__, address, meta) do {:ok, _owner} -> :ok {:error, reason} -> {:error, reason} end end - @spec set_module(any(), module()) :: :ok | :error + @spec update_metadata(address :: any(), callback :: (map() -> map())) :: :ok | :error @doc false - # Set worker module for the current process + # Set worker metadata for the current process # # This function is called from the worker behaviour - # Module is not set when registering with register_name from `:via` option - # so this function needs to be called to set it after the process is created - def set_module(address, module) do - case Registry.update_value(__MODULE__, address, fn _old -> module end) do + # Metadata is not set when registering with register_name from `:via` option + # so this function needs to be called to set it after the process is created, + # and whenever we want to update it + def update_metadata(address, callback) do + case Registry.update_value(__MODULE__, address, callback) do :error -> :error {_new, _old} -> :ok end end - @spec lookup(address :: any()) :: {:ok, pid, module} | :error + @spec lookup(address :: any()) :: {:ok, pid(), module() | nil} | :error def lookup(address) do case Registry.lookup(__MODULE__, address) do - [{pid, module}] -> {:ok, pid, module} + [{pid, nil}] -> {:ok, pid, nil} + [{pid, meta}] when is_map(meta) -> {:ok, pid, Map.get(meta, :module)} [] -> :error end end + + @spec lookup_meta(address :: any()) :: {:ok, map()} | :error + def lookup_meta(address) do + case Registry.lookup(__MODULE__, address) do + [{_pid, nil}] -> {:ok, %{}} + [{_pid, meta}] when is_map(meta) -> {:ok, meta} + [] -> :error + end + end + + def select_by_attribute(name, value) do + Registry.select(__MODULE__, [{{:_, :_, %{attributes: %{name => value}}}, [], [{{:"$_"}}]}]) + |> Enum.map(fn {{addr, {_pid, %{attributes: attributes}}}} -> {addr, attributes} end) + end end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/session/pluggable.ex b/implementations/elixir/ockam/ockam/lib/ockam/session/pluggable.ex index 86dd3378017..90aa6223892 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/session/pluggable.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/session/pluggable.ex @@ -198,7 +198,9 @@ defmodule Ockam.Session.Pluggable do all_addresses = Map.get(state, :all_addresses, []) Enum.each(all_addresses, fn address -> - Ockam.Node.set_address_module(address, module) + Ockam.Node.update_address_metadata(address, fn _prev -> + %{module: module, attributes: %{}} + end) end) end diff --git a/implementations/elixir/ockam/ockam/lib/ockam/worker.ex b/implementations/elixir/ockam/ockam/lib/ockam/worker.ex index d538801ae16..ea2487c1e80 100644 --- a/implementations/elixir/ockam/ockam/lib/ockam/worker.ex +++ b/implementations/elixir/ockam/ockam/lib/ockam/worker.ex @@ -8,7 +8,9 @@ defmodule Ockam.Worker do require Logger @callback setup(options :: Keyword.t(), initial_state :: map()) :: - {:ok, state :: map()} | {:error, reason :: any()} + {:ok, state :: map()} + | {:ok, registry_attributes :: map(), state :: map()} + | {:error, reason :: any()} @callback handle_message(message :: Ockam.Message.t(), state :: map()) :: {:ok, state :: map()} @@ -213,8 +215,6 @@ defmodule Ockam.Worker do end def handle_post_init(module, options) do - Node.set_address_module(Keyword.fetch!(options, :address), module) - return_value = with_init_metric(module, options, fn -> with {:ok, address} <- Keyword.fetch(options, :address), @@ -242,7 +242,7 @@ defmodule Ockam.Worker do Keyword.get(options, :extra_addresses, []), base_state ) do - module.setup(options, state) + complete_setup(module, options, state) end else :error -> @@ -265,6 +265,27 @@ defmodule Ockam.Worker do end end + defp complete_setup(module, options, state) do + case module.setup(options, state) do + {:ok, state} -> + Node.update_address_metadata(Keyword.fetch!(options, :address), fn _prev -> + %{module: module, attributes: %{}} + end) + + {:ok, state} + + {:ok, attrs, state} -> + Node.update_address_metadata(Keyword.fetch!(options, :address), fn _prev -> + %{module: module, attributes: attrs} + end) + + {:ok, state} + + {:error, reason} -> + {:error, reason} + end + end + def schedule_idle_timeout(state) do case Map.get(state, :idle_timeout, :infinity) do :infinity -> diff --git a/implementations/elixir/ockam/ockam_services/lib/services/api/endpoint.ex b/implementations/elixir/ockam/ockam_services/lib/services/api/endpoint.ex index 4467783059c..1f4069d936c 100644 --- a/implementations/elixir/ockam/ockam_services/lib/services/api/endpoint.ex +++ b/implementations/elixir/ockam/ockam_services/lib/services/api/endpoint.ex @@ -57,7 +57,7 @@ defmodule Ockam.Services.API.Endpoint do end @type routes() :: [ - {auth_type :: any(), method :: atom(), path :: String.t(), handler :: atom()} + {auth_type :: any(), method :: atom(), path :: String.t(), handler :: fun()} ] @callback authorize(auth_type :: any(), req :: %Ockam.API.Request{}, bindings :: map()) :: diff --git a/implementations/elixir/ockam/ockam_services/lib/services/api/static_forwarding_api.ex b/implementations/elixir/ockam/ockam_services/lib/services/api/static_forwarding_api.ex deleted file mode 100644 index 1cd6d16fc32..00000000000 --- a/implementations/elixir/ockam/ockam_services/lib/services/api/static_forwarding_api.ex +++ /dev/null @@ -1,53 +0,0 @@ -defmodule Ockam.Services.API.StaticForwarding do - @moduledoc """ - API for static forwarding service - - See `Ockam.Services.StaticForwarding` - - Methods: - :post, path: "", body: alias - register a forwarding alias - """ - use Ockam.Services.API - - alias Ockam.API.Request - alias Ockam.Services.API - alias Ockam.Services.StaticForwarding, as: Base - - @impl true - def setup(options, state) do - Base.setup(options, state) - end - - @impl true - def handle_request( - %Request{method: :post, path: "", from_route: from_route, body: alias_str}, - state - ) - when is_binary(alias_str) and is_list(from_route) do - case subscribe(alias_str, from_route, state) do - {:ok, worker} -> - {:reply, :ok, worker, state} - - {:error, reason} -> - {:error, reason} - - other -> - {:error, {:unexpected_return, other}} - end - end - - def handle_request(%Request{method: :post}, _state) do - {:error, :bad_request} - end - - def handle_request(%Request{}, _state) do - {:error, :method_not_allowed} - end - - def subscribe(alias_str, route, state) do - with {:ok, worker} <- Base.ensure_alias_worker(alias_str, state), - :ok <- Base.Forwarder.update_route(worker, route, notify: false) do - {:ok, worker} - end - end -end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex b/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex index 97119b503b0..d30c25cc985 100644 --- a/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex +++ b/implementations/elixir/ockam/ockam_services/lib/services/provider/routing.ex @@ -6,11 +6,11 @@ defmodule Ockam.Services.Provider.Routing do @behaviour Ockam.Services.Provider - alias Ockam.Services.API.StaticForwarding, as: StaticForwardingAPI alias Ockam.Services.Echo, as: EchoService alias Ockam.Services.Forwarding, as: ForwardingService alias Ockam.Services.PubSub, as: PubSubService - alias Ockam.Services.StaticForwarding, as: StaticForwardingService + alias Ockam.Services.Relay.StaticForwarding + alias Ockam.Services.Relay.StaticForwardingAPI alias Ockam.Services.Tracing, as: TracingService ## TODO: API to start all services in a provider? @@ -39,7 +39,7 @@ defmodule Ockam.Services.Provider.Routing do end def child_spec(:static_forwarding, args) do - {StaticForwardingService, + {StaticForwarding, Keyword.merge( [ address: "static_forwarding", diff --git a/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding.ex b/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding.ex new file mode 100644 index 00000000000..778b1987ef9 --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding.ex @@ -0,0 +1,113 @@ +defmodule Ockam.Services.Relay.StaticForwarding do + @moduledoc """ + Static forwarding service + + Subscribes workers (by return route) to a string forwarding alias + + Forwarding alias is parsed from the payload as a BARE `string` type + + New subscriptions update the forwarding route in the same forwarding alias + + Forwarder address is created from prefix and alias as _ + e.g. if prefix is `forward_to_` and alias is `my_alias`, forwarder address will be: `forward_to_my_alias` + + Messages sent to the forwarder address will be forwarded to the forwarding route + + Options: + + `prefix` - address prefix + """ + use Ockam.Worker + + alias Ockam.Services.Relay.Types.CreateRelayRequest + alias Ockam.Services.Relay.Types.Relay + alias Ockam.Services.Relay.Worker, as: Forwarder + + alias Ockam.Message + + require Logger + + @spec list_running_relays() :: [{Ockam.Address.t(), map()}] + def list_running_relays() do + Ockam.Node.Registry.select_by_attribute(:service, :relay) + |> Enum.map(&Relay.from_registry_attributes/1) + end + + @spec relay_info(addr :: Ockam.Address.t()) :: {:ok, Relay.t()} | :error + def relay_info(addr) do + with {:ok, meta} <- Ockam.Node.Registry.lookup_meta(addr) do + {:ok, Relay.from_registry_attributes({addr, meta.attributes})} + end + end + + @impl true + def setup(options, state) do + prefix = Keyword.get(options, :prefix, state.address) + + forwarder_options = Keyword.get(options, :forwarder_options, []) + + {:ok, + Map.merge(state, %{ + prefix: prefix, + forwarder_options: forwarder_options + })} + end + + @impl true + def handle_message(message, state) do + payload = Message.payload(message) + + case parse_create_relay_req(payload) do + {:ok, req} -> + return_route = Message.return_route(message) + target_identifier = Message.local_metadata_value(message, :identity_id) + _ignored = subscribe(req.alias, req.tags, return_route, target_identifier, true, state) + {:ok, state} + + {:error, reason} -> + Logger.error("Invalid relay create msg: #{inspect(payload)}, reason #{inspect(reason)}") + {:ok, state} + end + end + + def parse_create_relay_req(data) do + case :bare.decode(data, :string) do + {:ok, alias_str, ""} -> + {:ok, %CreateRelayRequest{alias: alias_str, tags: %{}}} + + _err -> + CreateRelayRequest.decode_strict(data) + end + end + + def subscribe(alias_str, tags, route, target_identifier, notify, state) do + forwarder_address = forwarder_address(alias_str, state) + forwarder_options = Map.fetch!(state, :forwarder_options) + + case Ockam.Node.whereis(forwarder_address) do + nil -> + Forwarder.create( + Keyword.merge(forwarder_options, + address: forwarder_address, + relay_options: [ + alias: alias_str, + route: route, + tags: tags, + notify: notify, + target_identifier: target_identifier + ] + ) + ) + + _pid -> + with :ok <- + Forwarder.update_route(forwarder_address, route, target_identifier, tags, notify) do + {:ok, forwarder_address} + end + end + end + + def forwarder_address(alias_str, state) do + Map.get(state, :prefix, "") <> "_" <> alias_str + end +end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding_api.ex b/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding_api.ex new file mode 100644 index 00000000000..3447857cf3d --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/lib/services/relay/static_forwarding_api.ex @@ -0,0 +1,149 @@ +defmodule Ockam.Services.Relay.StaticForwardingAPI do + @moduledoc """ + API for static forwarding service + + See `Ockam.Services.StaticForwarding` + """ + + use Ockam.Services.API.Endpoint + + alias Ockam.API.Request + alias Ockam.Services.API + alias Ockam.Services.Relay.StaticForwarding, as: Base + alias Ockam.Services.Relay.Types.CreateRelayRequest + alias Ockam.Services.Relay.Types.Relay + + require Logger + + @impl true + def init_endpoint(options) do + delete_authorization = + case Keyword.get(options, :check_owner_on_delete, false) do + true -> :relay_owner + false -> :identity + end + + with {:ok, state} <- Base.setup(options, %{address: "fwd_to"}) do + {:ok, state, + [ + {:identity, :get, "/", &list/2}, + {:identity, :post, "/", &create_relay/2}, + {:identity, :get, "/:addr", &get/2}, + {delete_authorization, :delete, "/:addr", &delete/2} + ]} + end + end + + @impl true + def authorize(:identity, %Request{} = req, _bindings) do + # Oposed to the legacy StaticForwarding, here we do enforce authentication of caller + case Request.caller_identity_id(req) do + {:ok, identifier} -> + {true, %{identifier: identifier}} + + :error -> + false + end + end + + def authorize(:relay_owner, %Request{} = req, %{addr: addr} = bindings) do + with {true, %{identifier: caller_identifier}} = resp <- authorize(:identity, req, bindings) do + case Base.relay_info(addr) do + {:ok, %Relay{target_identifier: ^caller_identifier}} -> + resp + + other -> + Logger.warning( + "Operation restricted to relay' owner. addr #{inspect(addr)} (caller #{inspect(caller_identifier)}) : #{inspect(other)}" + ) + + false + end + end + end + + def list(_req, %{ + auth_data: %{identifier: _identifier}, + state: _ + }) do + Relay.encode_list(Base.list_running_relays()) + end + + def create_relay(%Request{body: body, from_route: from_route}, %{ + auth_data: %{identifier: identifier}, + state: state + }) do + with {:ok, %CreateRelayRequest{alias: alias, tags: tags}} <- + CreateRelayRequest.decode_strict(body), + {:ok, worker_addr} <- Base.subscribe(alias, tags, from_route, identifier, false, state), + {:ok, relay} <- wait_for_relay_worker(worker_addr) do + Relay.encode(relay) + else + {:error, :not_authorized} -> + {:error, {:unauthorized, "relay already taken"}} + end + end + + # Workers perform initialization (including attaching metadata on the registry) _asynchronously_ after + # returned from init(). This means we might get the addr _before_ any metadata is attached to it. + # This wait for it to be available. + # Note metadata is attached inmediately after init() returns, so in normal circustances there is no need + # to wait, if the node is very busy we might need to wait for a short amount. + # TODO: improve how workers starts up to avoid this. + defp wait_for_relay_worker(worker_addr), do: wait_for_relay_worker(worker_addr, 5) + + defp wait_for_relay_worker(_worker_addr, 0), do: {:error, :timeout} + + defp wait_for_relay_worker(worker_addr, n) do + case Base.relay_info(worker_addr) do + {:ok, %Relay{created_at: c} = relay} when c != nil -> + {:ok, relay} + + _other -> + Process.sleep(50) + wait_for_relay_worker(worker_addr, n - 1) + end + end + + @spec get(any, %{ + :auth_data => %{:identifier => any, optional(any) => any}, + :bindings => %{:addr => binary | Ockam.Address.t(), optional(any) => any}, + :state => any, + optional(any) => any + }) :: {:error, any} | {:ok, binary} + def get(_req, %{ + bindings: %{addr: addr}, + auth_data: %{identifier: _identifier}, + state: _ + }) do + case Base.relay_info(addr) do + {:ok, relay} -> + Relay.encode(relay) + + other -> + Logger.warning( + "Error attempting to retrieve relay information for addr #{inspect(addr)} : #{inspect(other)}" + ) + + {:error, 404} + end + end + + def delete(_req, %{ + bindings: %{addr: addr}, + auth_data: %{identifier: identifier}, + state: _ + }) do + with {:ok, %Relay{}} <- Base.relay_info(addr), + :ok <- Ockam.Node.stop(addr) do + {:ok, nil} + else + other -> + Logger.warning( + "Error attempting to delete relay information for addr #{inspect(addr)} (caller #{inspect(identifier)}) : #{inspect(other)}" + ) + + {:error, 401} + end + end +end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/relay/types.ex b/implementations/elixir/ockam/ockam_services/lib/services/relay/types.ex new file mode 100644 index 00000000000..4711713f523 --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/lib/services/relay/types.ex @@ -0,0 +1,40 @@ +defmodule Ockam.Services.Relay.Types do + defmodule CreateRelayRequest do + @moduledoc false + use TypedStruct + + typedstruct do + plugin(Ockam.TypedCBOR.Plugin) + field(:alias, String.t(), minicbor: [key: 1]) + field(:tags, %{String.t() => String.t()}, minicbor: [key: 2]) + end + end + + defmodule Relay do + @moduledoc false + + use TypedStruct + + alias Ockam.Services.Relay.Types.CBORUnixTimestamp + + typedstruct do + plugin(Ockam.TypedCBOR.Plugin) + field(:addr, String.t(), minicbor: [key: 1]) + field(:tags, %{String.t() => String.t()}, minicbor: [key: 2]) + field(:target_identifier, binary(), minicbor: [key: 3, schema: Ockam.Identity.Identifier]) + field(:created_at, integer(), minicbor: [key: 4, schema: CBORUnixTimestamp]) + field(:updated_at, integer(), minicbor: [key: 5, schema: CBORUnixTimestamp]) + end + + def from_registry_attributes({addr, attrs}) do + struct(Relay, Map.put(attrs, :addr, addr)) + end + end + + defmodule CBORUnixTimestamp do + @moduledoc false + def from_cbor_term(val), do: DateTime.from_unix(val) + + def to_cbor_term(datetime), do: {:ok, DateTime.to_unix(datetime, :second)} + end +end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/relay/worker.ex b/implementations/elixir/ockam/ockam_services/lib/services/relay/worker.ex new file mode 100644 index 00000000000..4419296e578 --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/lib/services/relay/worker.ex @@ -0,0 +1,95 @@ +defmodule Ockam.Services.Relay.Worker do + @moduledoc """ + Forwards all messages to the subscribed route + """ + use Ockam.Worker + + alias Ockam.Message + + require Logger + + def update_route(worker, route, target_identifier, tags, notify) do + Ockam.Worker.call(worker, {:update_route, route, target_identifier, tags, notify}) + end + + @impl true + def setup(options, state) do + relay_options = Keyword.get(options, :relay_options, []) + alias_str = Keyword.get(relay_options, :alias) + user_defined_tags = Keyword.get(relay_options, :tags, %{}) + target_identifier = Keyword.get(relay_options, :target_identifier) + notify = Keyword.get(relay_options, :notify, false) + route = Keyword.get(relay_options, :route) + {:ok, ts} = DateTime.now("Etc/UTC") + + regitry_metadata = %{ + service: :relay, + tags: user_defined_tags, + target_identifier: target_identifier, + created_at: ts, + updated_at: ts + } + + maybe_notify_target(notify, route, alias_str, state.address) + + {:ok, regitry_metadata, + Map.merge(state, %{alias: alias_str, route: route, target_identifier: target_identifier})} + end + + @impl true + def handle_call( + {:update_route, _route, target_identifier, _tags, _notify}, + _from, + %{target_identifier: new_target_identifier} = state + ) + when target_identifier != new_target_identifier do + # A relay can only be updated by the same identity that created it on the first place. + # Note that if the relay was created without a secure channel, the identifier is nil and + # updated only allowed if they are also not comming from a secure channel, this is intentional + # (in future we can disallow non-secure channels relays entirely as there is little use of them) + {:reply, {:error, :not_authorized}, state} + end + + def handle_call( + {:update_route, route, _target_identifier, user_defined_tags, notify}, + _from, + %{alias: alias_str} = state + ) do + state = Map.put(state, :route, route) + {:ok, ts} = DateTime.now("Etc/UTC") + # Update metadata attributes + :ok = + Ockam.Node.update_address_metadata( + state.address, + fn some -> + %{attributes: attrs} = some + %{some | attributes: %{attrs | updated_at: ts, tags: user_defined_tags}} + end + ) + + :ok = maybe_notify_target(notify, route, alias_str, state.address) + {:reply, :ok, state} + end + + defp maybe_notify_target(true, route, alias_str, address) do + Ockam.Router.route(%{ + onward_route: route, + return_route: [address], + payload: :bare.encode("#{alias_str}", :string) + }) + end + + defp maybe_notify_target(false, _route, _alias_str, _address), do: :ok + + @impl true + def handle_message(message, %{route: [_ | _] = route} = state) do + [_me | onward_route] = Message.onward_route(message) + Ockam.Router.route(Message.set_onward_route(message, route ++ onward_route)) + {:ok, state} + end + + def handle_message(msg, state) do + Logger.warning("message #{inspect(msg)} received without target route setup, discarded") + {:ok, state} + end +end diff --git a/implementations/elixir/ockam/ockam_services/lib/services/static_forwarding.ex b/implementations/elixir/ockam/ockam_services/lib/services/static_forwarding.ex deleted file mode 100644 index f6738eab2a5..00000000000 --- a/implementations/elixir/ockam/ockam_services/lib/services/static_forwarding.ex +++ /dev/null @@ -1,137 +0,0 @@ -defmodule Ockam.Services.StaticForwarding do - @moduledoc """ - Static forwarding service - - Subscribes workers (by return route) to a string forwarding alias - - Forwarding alias is parsed from the payload as a BARE `string` type - - New subscriptions update the forwarding route in the same forwarding alias - - Forwarder address is created from prefix and alias as _ - e.g. if prefix is `forward_to_` and alias is `my_alias`, forwarder address will be: `forward_to_my_alias` - - Messages sent to the forwarder address will be forwarded to the forwarding route - - Options: - - `prefix` - address prefix - """ - use Ockam.Worker - - alias __MODULE__.Forwarder - alias Ockam.Message - - require Logger - - @impl true - def setup(options, state) do - prefix = Keyword.get(options, :prefix, state.address) - - forwarder_options = Keyword.get(options, :forwarder_options, []) - - {:ok, - Map.merge(state, %{ - prefix: prefix, - forwarder_options: forwarder_options - })} - end - - @impl true - def handle_message(message, state) do - payload = Message.payload(message) - - case :bare.decode(payload, :string) do - {:ok, alias_str, ""} -> - return_route = Message.return_route(message) - subscribe(alias_str, return_route, state) - - err -> - Logger.error("Invalid message format: #{inspect(payload)}, reason #{inspect(err)}") - end - end - - def subscribe(alias_str, route, state) do - with {:ok, worker} <- ensure_alias_worker(alias_str, state) do - ## NOTE: Non-ockam message routing here - Forwarder.update_route(worker, route) - {:ok, state} - end - end - - def ensure_alias_worker(alias_str, state) do - forwarder_address = forwarder_address(alias_str, state) - forwarder_options = Map.fetch!(state, :forwarder_options) - - case Ockam.Node.whereis(forwarder_address) do - nil -> - Forwarder.create( - Keyword.merge(forwarder_options, - alias: alias_str, - address: forwarder_address - ) - ) - - _pid -> - {:ok, forwarder_address} - end - end - - def forwarder_address(alias_str, state) do - Map.get(state, :prefix, "") <> "_" <> alias_str - end -end - -defmodule Ockam.Services.StaticForwarding.Forwarder do - @moduledoc """ - Topic subscription for pub_sub service - - Forwards all messages to all subscribed routes - - Subscribe API is internal, it adds a route to the subscribers set - """ - use Ockam.Worker - - alias Ockam.Message - - def update_route(worker, route, options \\ []) do - ## TODO: reply to the subscriber? - Ockam.Worker.call(worker, {:update_route, route, options}) - end - - @impl true - def setup(options, state) do - alias_str = Keyword.get(options, :alias) - {:ok, Map.merge(state, %{alias: alias_str, route: []})} - end - - @impl true - def handle_call({:update_route, route, options}, _from, %{alias: alias_str} = state) do - state = Map.put(state, :route, route) - - case Keyword.get(options, :notify, true) do - true -> - Ockam.Router.route(%{ - onward_route: route, - return_route: [state.address], - payload: :bare.encode("#{alias_str}", :string) - }) - - false -> - :ok - end - - {:reply, :ok, state} - end - - @impl true - def handle_message(message, state) do - [_me | onward_route] = Message.onward_route(message) - - route = Map.get(state, :route, []) - - Ockam.Router.route(Message.set_onward_route(message, route ++ onward_route)) - - {:ok, state} - end -end diff --git a/implementations/elixir/ockam/ockam_services/mix.exs b/implementations/elixir/ockam/ockam_services/mix.exs index 86a16474be6..d5d5c2084a9 100644 --- a/implementations/elixir/ockam/ockam_services/mix.exs +++ b/implementations/elixir/ockam/ockam_services/mix.exs @@ -46,6 +46,7 @@ defmodule Ockam.Services.MixProject do defp deps do [ + {:assert_eventually, "~>1.0.0", only: [:test], runtime: false}, {:credo, "~> 1.6", only: [:dev, :test], runtime: false}, {:dialyxir, "~> 1.1", only: [:dev], runtime: false}, {:ex_doc, "~> 0.25", only: :dev, runtime: false}, diff --git a/implementations/elixir/ockam/ockam_services/mix.lock b/implementations/elixir/ockam/ockam_services/mix.lock index a55881f5ba0..ea56d433a1f 100644 --- a/implementations/elixir/ockam/ockam_services/mix.lock +++ b/implementations/elixir/ockam/ockam_services/mix.lock @@ -1,4 +1,5 @@ %{ + "assert_eventually": {:hex, :assert_eventually, "1.0.0", "f1539f28ba3ffa99a712433c77723c7103986932aa341d05eee94c333a920d15", [:mix], [{:ex_doc, ">= 0.0.0", [hex: :ex_doc, repo: "hexpm", optional: true]}], "hexpm", "c658ac4103c8bd82d0cf72a2fdb77477ba3fbc6b15228c5c801003d239625c69"}, "bare": {:hex, :bare, "0.1.1", "d9dc757cdf6a4f236055d7d098b523fcc70ff7e805a981e3e82786c7a99725b9", [:rebar3], [], "hexpm", "07797a39167e37108c0827fe1a686e69d5f27d7b658fb64f14496a6db72a2415"}, "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, "cbor": {:hex, :cbor, "1.0.1", "39511158e8ea5a57c1fcb9639aaa7efde67129678fee49ebbda780f6f24959b0", [:mix], [], "hexpm", "5431acbe7a7908f17f6a9cd43311002836a34a8ab01876918d8cfb709cd8b6a2"}, diff --git a/implementations/elixir/ockam/ockam_services/test/authorization_test.exs b/implementations/elixir/ockam/ockam_services/test/authorization_test.exs index de75075fe23..83ab85848b1 100644 --- a/implementations/elixir/ockam/ockam_services/test/authorization_test.exs +++ b/implementations/elixir/ockam/ockam_services/test/authorization_test.exs @@ -201,7 +201,9 @@ defmodule Ockam.Services.Authorization.Tests do test "static forwarder authorization" do {:ok, service} = - Ockam.Services.StaticForwarding.create(forwarder_options: [authorization: [:is_local]]) + Ockam.Services.Relay.StaticForwarding.create( + forwarder_options: [authorization: [:is_local]] + ) {:ok, test_address} = Ockam.Node.register_random_address() @@ -221,6 +223,12 @@ defmodule Ockam.Services.Authorization.Tests do 5_000 ) + on_exit(fn -> + Ockam.Node.stop(service) + Ockam.Node.stop(forwarder_address) + Ockam.Node.unregister_address(test_address) + end) + local_message = %Message{ onward_route: [forwarder_address, "smth"], payload: "hello", diff --git a/implementations/elixir/ockam/ockam_services/test/services/forwarding_test.exs b/implementations/elixir/ockam/ockam_services/test/services/forwarding_test.exs index 013c802598a..f69a3a27b25 100644 --- a/implementations/elixir/ockam/ockam_services/test/services/forwarding_test.exs +++ b/implementations/elixir/ockam/ockam_services/test/services/forwarding_test.exs @@ -11,14 +11,6 @@ defmodule Test.Services.ForwardingTest do {:ok, _service_pid, service_address} = ForwardingService.start_link(address: "forwarding_address") - on_exit(fn -> - ## TODO: implement Worker.stop - case Node.whereis(service_address) do - nil -> :ok - pid -> GenServer.stop(pid) - end - end) - {:ok, me} = Node.register_random_address() register_message = %Ockam.Message{ @@ -34,10 +26,9 @@ defmodule Test.Services.ForwardingTest do forwarder_address = List.last(forwarder_route) on_exit(fn -> - case Node.whereis(forwarder_address) do - nil -> :ok - pid -> GenServer.stop(pid) - end + Node.stop(service_address) + Node.stop(forwarder_address) + Node.unregister_address(me) end) msg = %{onward_route: [forwarder_address], return_route: [me], payload: "HI!"} @@ -60,12 +51,8 @@ defmodule Test.Services.ForwardingTest do forwarder_address = RemoteForwarder.forwarder_address(forwarder) on_exit(fn -> - case Node.whereis(service_address) do - nil -> :ok - pid -> GenServer.stop(pid) - end - - Node.stop(forwarder) + Node.stop(service_address) + Node.stop(forwarder_address) Node.unregister_address(me) end) diff --git a/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_api_test.exs b/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_api_test.exs new file mode 100644 index 00000000000..67f57fe6d7e --- /dev/null +++ b/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_api_test.exs @@ -0,0 +1,160 @@ +defmodule Test.Services.StaticForwardingApiTest do + use ExUnit.Case + + # Fail after 200ms of retrying with time between attempts 10ms + use AssertEventually, timeout: 200, interval: 10 + + alias Ockam.API.Client + alias Ockam.Identity + alias Ockam.SecureChannel + + alias Ockam.Services.Relay.StaticForwardingAPI + alias Ockam.Services.Relay.Types.CreateRelayRequest + alias Ockam.Services.Relay.Types.Relay + + test "crud" do + {:ok, listener_identity} = Identity.create() + {:ok, listener_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, attestation} = Identity.attest_purpose_key(listener_identity, listener_keypair) + + {:ok, listener} = + SecureChannel.create_listener( + identity: listener_identity, + encryption_options: [ + static_keypair: listener_keypair, + static_key_attestation: attestation + ] + ) + + {:ok, bob} = Identity.create() + {:ok, alice} = Identity.create() + bob_id = Identity.get_identifier(bob) + {:ok, bob_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, bob_attestation} = Identity.attest_purpose_key(bob, bob_keypair) + {:ok, alice_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, alice_attestation} = Identity.attest_purpose_key(alice, alice_keypair) + + {:ok, bob_channel} = + SecureChannel.create_channel( + identity: bob, + encryption_options: [static_keypair: bob_keypair, static_key_attestation: bob_attestation], + route: [listener] + ) + + {:ok, alice_channel} = + SecureChannel.create_channel( + identity: alice, + encryption_options: [ + static_keypair: alice_keypair, + static_key_attestation: alice_attestation + ], + route: [listener] + ) + + {:ok, service_address} = + StaticForwardingAPI.create(prefix: "forward_to", check_owner_on_delete: true) + + {:ok, service_address_no_enforcement} = + StaticForwardingAPI.create(prefix: "forward_to", check_owner_on_delete: false) + + alias_str_1 = "test_static_forwarding_alias_1" + alias_str_2 = "test_static_forwarding_alias_2" + + forwarder_address_1 = "forward_to_" <> alias_str_1 + forwarder_address_2 = "forward_to_" <> alias_str_2 + + {:ok, resp} = Client.sync_request(:get, "/", nil, [bob_channel, service_address]) + assert %{status: 200, body: body} = resp + assert {:ok, []} = Relay.decode_list_strict(body) + + req = %CreateRelayRequest{alias: alias_str_1, tags: %{"name" => "test_relay1"}} + + {:ok, resp} = + Client.sync_request(:post, "/", CreateRelayRequest.encode!(req), [ + bob_channel, + service_address + ]) + + assert %{status: 200, body: body} = resp + + assert {:ok, %Relay{target_identifier: ^bob_id, tags: %{"name" => "test_relay1"}}} = + Relay.decode_strict(body) + + req = %CreateRelayRequest{alias: alias_str_2, tags: %{"name" => "test_relay2"}} + + {:ok, resp} = + Client.sync_request(:post, "/", CreateRelayRequest.encode!(req), [ + bob_channel, + service_address + ]) + + assert %{status: 200, body: body} = resp + + assert {:ok, %Relay{target_identifier: ^bob_id, tags: %{"name" => "test_relay2"}}} = + Relay.decode_strict(body) + + {:ok, resp} = Client.sync_request(:get, "/", nil, [bob_channel, service_address]) + assert %{status: 200, body: body} = resp + assert {:ok, [_, _]} = Relay.decode_list_strict(body) + + # Alice not allowed to overtake bob' relay + req = %CreateRelayRequest{alias: alias_str_2, tags: %{"name" => "test_relay2"}} + + {:ok, resp} = + Client.sync_request(:post, "/", CreateRelayRequest.encode!(req), [ + alice_channel, + service_address + ]) + + assert %{status: 401} = resp + + assert {:ok, [%Relay{target_identifier: ^bob_id}, %Relay{target_identifier: ^bob_id}]} = + Relay.decode_list_strict(body) + + # Alice not allowed to remove bob' relay + {:ok, resp} = + Client.sync_request(:delete, "/#{forwarder_address_1}", nil, [ + alice_channel, + service_address + ]) + + assert %{status: 401} = resp + + # Bob can change its relay + req = %CreateRelayRequest{alias: alias_str_2, tags: %{"name" => "changed!"}} + + {:ok, resp} = + Client.sync_request(:post, "/", CreateRelayRequest.encode!(req), [ + bob_channel, + service_address + ]) + + assert %{status: 200, body: body} = resp + + assert {:ok, %Relay{target_identifier: ^bob_id, tags: %{"name" => "changed!"}}} = + Relay.decode_strict(body) + + # Bob can remove its own relay + {:ok, resp} = + Client.sync_request(:delete, "/#{forwarder_address_1}", nil, [bob_channel, service_address]) + + assert %{status: 200} = resp + + {:ok, resp} = Client.sync_request(:get, "/", nil, [bob_channel, service_address]) + assert %{status: 200, body: body} = resp + assert {:ok, [%Relay{addr: ^forwarder_address_2}]} = Relay.decode_list_strict(body) + + # Alice (and anyone) allowed to remove anyone relay if so configured.. + {:ok, resp} = + Client.sync_request(:delete, "/#{forwarder_address_2}", nil, [ + alice_channel, + service_address_no_enforcement + ]) + + assert %{status: 200} = resp + + {:ok, resp} = Client.sync_request(:get, "/", nil, [bob_channel, service_address]) + assert %{status: 200, body: body} = resp + assert {:ok, []} = Relay.decode_list_strict(body) + end +end diff --git a/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_test.exs b/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_test.exs index f2134673423..8d434567ae9 100644 --- a/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_test.exs +++ b/implementations/elixir/ockam/ockam_services/test/services/static_forwarding_test.exs @@ -1,7 +1,15 @@ defmodule Test.Services.StaticForwardingTest do use ExUnit.Case - alias Ockam.Services.StaticForwarding, as: StaticForwardingService + # Fail after 200ms of retrying with time between attempts 10ms + use AssertEventually, timeout: 200, interval: 10 + + alias Ockam.Identity + alias Ockam.SecureChannel + + alias Ockam.Services.Relay.StaticForwarding, as: StaticForwardingService + alias Ockam.Services.Relay.Types.CreateRelayRequest + alias Ockam.Services.Relay.Types.Relay alias Ockam.Message alias Ockam.Node @@ -50,9 +58,110 @@ defmodule Test.Services.StaticForwardingTest do Router.route(forwarded_message) assert_receive(%Message{payload: "hello", onward_route: [^test_address, "smth"]}, 5_000) + + assert_eventually( + [%Relay{addr: ^forwarder_address, target_identifier: nil}] = + StaticForwardingService.list_running_relays() + ) + + Ockam.Node.stop(forwarder_address) + assert_eventually([] = StaticForwardingService.list_running_relays()) + end + + test "static forwarding cbor msg with tags" do + {:ok, service_address} = StaticForwardingService.create(prefix: "forward_to") + + {:ok, test_address} = Node.register_random_address() + + alias_str = "test_static_forwarding_alias" + + forwarder_address = "forward_to_" <> alias_str + + on_exit(fn -> + Node.stop(service_address) + Node.stop(forwarder_address) + Node.unregister_address(test_address) + end) + + req = %CreateRelayRequest{alias: alias_str, tags: %{"name" => "test"}} + + register_message = %Message{ + onward_route: [service_address], + payload: CreateRelayRequest.encode!(req), + return_route: [test_address] + } + + Router.route(register_message) + + assert_receive( + %Message{ + payload: encoded_payload, + onward_route: [^test_address], + return_route: [^forwarder_address] + }, + 5_000 + ) + + assert {:ok, alias_str, ""} == :bare.decode(encoded_payload, :string) + + forwarded_message = %Message{ + onward_route: [forwarder_address, "smth"], + payload: "hello", + return_route: [test_address] + } + + Router.route(forwarded_message) + + assert_receive(%Message{payload: "hello", onward_route: [^test_address, "smth"]}, 5_000) + + assert_eventually( + [%Relay{addr: ^forwarder_address, target_identifier: nil, tags: %{"name" => "test"}}] = + StaticForwardingService.list_running_relays() + ) + + Ockam.Node.stop(forwarder_address) + assert_eventually([] = StaticForwardingService.list_running_relays()) end test "forwarding route override" do + {:ok, listener_identity} = Identity.create() + {:ok, listener_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, attestation} = Identity.attest_purpose_key(listener_identity, listener_keypair) + + {:ok, listener} = + SecureChannel.create_listener( + identity: listener_identity, + encryption_options: [ + static_keypair: listener_keypair, + static_key_attestation: attestation + ] + ) + + {:ok, bob} = Identity.create() + {:ok, alice} = Identity.create() + bob_id = Identity.get_identifier(bob) + {:ok, bob_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, bob_attestation} = Identity.attest_purpose_key(bob, bob_keypair) + {:ok, alice_keypair} = SecureChannel.Crypto.generate_dh_keypair() + {:ok, alice_attestation} = Identity.attest_purpose_key(alice, alice_keypair) + + {:ok, bob_channel} = + SecureChannel.create_channel( + identity: bob, + encryption_options: [static_keypair: bob_keypair, static_key_attestation: bob_attestation], + route: [listener] + ) + + {:ok, alice_channel} = + SecureChannel.create_channel( + identity: alice, + encryption_options: [ + static_keypair: alice_keypair, + static_key_attestation: alice_attestation + ], + route: [listener] + ) + {:ok, service_address} = StaticForwardingService.create(prefix: "forward_to") {:ok, test_address} = Node.register_random_address() @@ -69,8 +178,9 @@ defmodule Test.Services.StaticForwardingTest do Node.unregister_address(test_address) end) + # Bob creates the relay register_message = %Message{ - onward_route: [service_address], + onward_route: [bob_channel, service_address], payload: encoded_alias_str, return_route: [test_address] } @@ -81,30 +191,53 @@ defmodule Test.Services.StaticForwardingTest do %Message{ payload: ^encoded_alias_str, onward_route: [^test_address], - return_route: [^forwarder_address] + return_route: [^bob_channel, ^forwarder_address] }, 5_000 ) + assert_eventually( + [ + %Relay{ + addr: ^forwarder_address, + created_at: t1, + updated_at: t2, + target_identifier: ^bob_id + } + ] = StaticForwardingService.list_running_relays() + ) + + assert t1 == t2 + {:ok, test_address2} = Node.register_random_address() register_message2 = %Message{ - onward_route: [service_address], + onward_route: [bob_channel, service_address], payload: encoded_alias_str, return_route: [test_address2] } + # Bob make a modification to the relay, it's allowed Router.route(register_message2) assert_receive( %Message{ payload: ^encoded_alias_str, onward_route: [^test_address2], - return_route: [^forwarder_address] + return_route: [^bob_channel, ^forwarder_address] }, 5_000 ) + assert_eventually( + ( + [%Relay{addr: ^forwarder_address, created_at: ^t1, updated_at: t3}] = + StaticForwardingService.list_running_relays() + + :lt == DateTime.compare(t1, t3) + ) + ) + forwarded_message = %Message{ onward_route: [forwarder_address, "smth"], payload: "hello", @@ -116,5 +249,21 @@ defmodule Test.Services.StaticForwardingTest do assert_receive(%Message{payload: "hello", onward_route: [^test_address2, "smth"]}, 5_000) refute_receive(%Message{payload: "hello", onward_route: [^test_address, "smth"]}, 100) + + {:ok, test_address3} = Node.register_random_address() + + register_message2 = %Message{ + onward_route: [alice_channel, service_address], + payload: encoded_alias_str, + return_route: [test_address3] + } + + # Alice try to make a modification to the relay, it isn't allowed + Router.route(register_message2) + refute_receive(%Message{onward_route: [^test_address3]}, 100) + + # Relay is unchanged + Router.route(forwarded_message) + assert_receive(%Message{payload: "hello", onward_route: [^test_address2, "smth"]}, 5_000) end end diff --git a/implementations/elixir/ockam/ockam_services/test/services/token_lease_manager_test.exs b/implementations/elixir/ockam/ockam_services/test/services/token_lease_manager_test.exs index bd19dbd473d..2a1332bf2f7 100644 --- a/implementations/elixir/ockam/ockam_services/test/services/token_lease_manager_test.exs +++ b/implementations/elixir/ockam/ockam_services/test/services/token_lease_manager_test.exs @@ -129,7 +129,7 @@ defmodule Ockam.Services.TokenLeaseManager.Test do {:ok, resp} = Client.sync_request(:post, "/", nil, [bob_channel, lm]) assert %{status: 200, body: body} = resp - assert {:ok, %Lease{issued_for: bob_id} = bob_lease1} = Lease.decode_strict(body) + assert {:ok, %Lease{issued_for: ^bob_id} = bob_lease1} = Lease.decode_strict(body) {:ok, resp} = Client.sync_request(:post, "/", nil, [bob_channel, lm]) assert %{status: 200, body: body} = resp diff --git a/implementations/elixir/ockam/ockam_typed_cbor/lib/typed_cbor.ex b/implementations/elixir/ockam/ockam_typed_cbor/lib/typed_cbor.ex index 7a17230f9ef..0ad4ccbd4c3 100644 --- a/implementations/elixir/ockam/ockam_typed_cbor/lib/typed_cbor.ex +++ b/implementations/elixir/ockam/ockam_typed_cbor/lib/typed_cbor.ex @@ -154,6 +154,7 @@ defmodule Ockam.TypedCBOR do def from_cbor_term(schema, data) do with true <- is_atom(schema), + {:module, ^schema} <- Code.ensure_loaded(schema), true <- function_exported?(schema, :from_cbor_term, 1), {:ok, val} <- schema.from_cbor_term(data) do val @@ -249,12 +250,12 @@ defmodule Ockam.TypedCBOR do def to_cbor_term(schema, val) do with true <- is_atom(schema), + {:module, ^schema} <- Code.ensure_loaded(schema), true <- function_exported?(schema, :to_cbor_term, 1), {:ok, cbor} <- schema.to_cbor_term(val) do cbor else _ -> - Logger.error("type mismatch, expected schema #{inspect(schema)}, value: #{inspect(val)}") raise(Exception, "type mismatch, expected schema #{inspect(schema)}") end end