Skip to content

Commit

Permalink
feat(elixir): basic api and access control for relays
Browse files Browse the repository at this point in the history
allow custom attributes when registering local addresses,
for now only used for attaching metadata to relays. Use that
metadata to store the identifier that creatd the relay, to
enforce access control over it (who can take it over / delete it)
  • Loading branch information
polvorin committed Oct 13, 2023
1 parent eec9a09 commit dbfd4b0
Show file tree
Hide file tree
Showing 20 changed files with 795 additions and 240 deletions.
12 changes: 7 additions & 5 deletions implementations/elixir/ockam/ockam/lib/ockam/node.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,19 +42,21 @@ defmodule Ockam.Node do
def register_address(address, module \\ nil) do
self = self()

case Registry.register(address, module) do
case Registry.register(address, %{module: module, attributes: %{}}) do
:ok -> :ok
{:error, {:already_registered, ^self}} -> :ok
error -> error
end
end

@spec set_address_module(any(), module()) :: :ok | :error
@spec update_address_metadata(any(), (map() -> %{module: module(), attributes: map()})) ::
:ok | :error
@doc """
Sets module name for already registered process
Sets module name and attributes for already registered process.
This can only be called for the process registered at that address itself
"""
def set_address_module(address, module) do
Registry.set_module(address, module)
def update_address_metadata(address, callback) do
Registry.update_metadata(address, callback)
end

@spec get_address_module(any()) :: {:ok, module()} | :error
Expand Down
36 changes: 26 additions & 10 deletions implementations/elixir/ockam/ockam/lib/ockam/node/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,48 @@ defmodule Ockam.Node.Registry do
# This function is used in custom process registration
#
# Module should be the worker implementation module
def register(address, module) do
case Registry.register(__MODULE__, address, module) do
def register(address, meta) when is_map(meta) or is_nil(meta) do
case Registry.register(__MODULE__, address, meta) do
{:ok, _owner} -> :ok
{:error, reason} -> {:error, reason}
end
end

@spec set_module(any(), module()) :: :ok | :error
@spec update_metadata(address :: any(), callback :: (map() -> map())) :: :ok | :error
@doc false
# Set worker module for the current process
# Set worker metadata for the current process
#
# This function is called from the worker behaviour
# Module is not set when registering with register_name from `:via` option
# so this function needs to be called to set it after the process is created
def set_module(address, module) do
case Registry.update_value(__MODULE__, address, fn _old -> module end) do
# Metadata is not set when registering with register_name from `:via` option
# so this function needs to be called to set it after the process is created,
# and whenever we want to update it
def update_metadata(address, callback) do
case Registry.update_value(__MODULE__, address, callback) do
:error -> :error
{_new, _old} -> :ok
end
end

@spec lookup(address :: any()) :: {:ok, pid, module} | :error
@spec lookup(address :: any()) :: {:ok, pid(), module() | nil} | :error
def lookup(address) do
case Registry.lookup(__MODULE__, address) do
[{pid, module}] -> {:ok, pid, module}
[{pid, nil}] -> {:ok, pid, nil}
[{pid, meta}] when is_map(meta) -> {:ok, pid, Map.get(meta, :module)}
[] -> :error
end
end

@spec lookup_meta(address :: any()) :: {:ok, map()} | :error
def lookup_meta(address) do
case Registry.lookup(__MODULE__, address) do
[{_pid, nil}] -> {:ok, %{}}
[{_pid, meta}] when is_map(meta) -> {:ok, meta}
[] -> :error
end
end

def select_by_attribute(name, value) do
Registry.select(__MODULE__, [{{:_, :_, %{attributes: %{name => value}}}, [], [{{:"$_"}}]}])
|> Enum.map(fn {{addr, {_pid, %{attributes: attributes}}}} -> {addr, attributes} end)
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,9 @@ defmodule Ockam.Session.Pluggable do
all_addresses = Map.get(state, :all_addresses, [])

Enum.each(all_addresses, fn address ->
Ockam.Node.set_address_module(address, module)
Ockam.Node.update_address_metadata(address, fn _prev ->
%{module: module, attributes: %{}}
end)
end)
end

Expand Down
29 changes: 25 additions & 4 deletions implementations/elixir/ockam/ockam/lib/ockam/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ defmodule Ockam.Worker do
require Logger

@callback setup(options :: Keyword.t(), initial_state :: map()) ::
{:ok, state :: map()} | {:error, reason :: any()}
{:ok, state :: map()}
| {:ok, registry_attributes :: map(), state :: map()}
| {:error, reason :: any()}

@callback handle_message(message :: Ockam.Message.t(), state :: map()) ::
{:ok, state :: map()}
Expand Down Expand Up @@ -213,8 +215,6 @@ defmodule Ockam.Worker do
end

def handle_post_init(module, options) do
Node.set_address_module(Keyword.fetch!(options, :address), module)

return_value =
with_init_metric(module, options, fn ->
with {:ok, address} <- Keyword.fetch(options, :address),
Expand Down Expand Up @@ -242,7 +242,7 @@ defmodule Ockam.Worker do
Keyword.get(options, :extra_addresses, []),
base_state
) do
module.setup(options, state)
complete_setup(module, options, state)
end
else
:error ->
Expand All @@ -265,6 +265,27 @@ defmodule Ockam.Worker do
end
end

defp complete_setup(module, options, state) do
case module.setup(options, state) do
{:ok, state} ->
Node.update_address_metadata(Keyword.fetch!(options, :address), fn _prev ->
%{module: module, attributes: %{}}
end)

{:ok, state}

{:ok, attrs, state} ->
Node.update_address_metadata(Keyword.fetch!(options, :address), fn _prev ->
%{module: module, attributes: attrs}
end)

{:ok, state}

{:error, reason} ->
{:error, reason}
end
end

def schedule_idle_timeout(state) do
case Map.get(state, :idle_timeout, :infinity) do
:infinity ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ defmodule Ockam.Services.API.Endpoint do
end

@type routes() :: [
{auth_type :: any(), method :: atom(), path :: String.t(), handler :: atom()}
{auth_type :: any(), method :: atom(), path :: String.t(), handler :: fun()}
]

@callback authorize(auth_type :: any(), req :: %Ockam.API.Request{}, bindings :: map()) ::
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ defmodule Ockam.Services.Provider.Routing do

@behaviour Ockam.Services.Provider

alias Ockam.Services.API.StaticForwarding, as: StaticForwardingAPI
alias Ockam.Services.Echo, as: EchoService
alias Ockam.Services.Forwarding, as: ForwardingService
alias Ockam.Services.PubSub, as: PubSubService
alias Ockam.Services.StaticForwarding, as: StaticForwardingService
alias Ockam.Services.Relay.StaticForwarding
alias Ockam.Services.Relay.StaticForwardingAPI
alias Ockam.Services.Tracing, as: TracingService

## TODO: API to start all services in a provider?
Expand Down Expand Up @@ -39,7 +39,7 @@ defmodule Ockam.Services.Provider.Routing do
end

def child_spec(:static_forwarding, args) do
{StaticForwardingService,
{StaticForwarding,
Keyword.merge(
[
address: "static_forwarding",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
defmodule Ockam.Services.Relay.StaticForwarding do
@moduledoc """
Static forwarding service
Subscribes workers (by return route) to a string forwarding alias
Forwarding alias is parsed from the payload as a BARE `string` type
New subscriptions update the forwarding route in the same forwarding alias
Forwarder address is created from prefix and alias as <prefix>_<alias>
e.g. if prefix is `forward_to_` and alias is `my_alias`, forwarder address will be: `forward_to_my_alias`
Messages sent to the forwarder address will be forwarded to the forwarding route
Options:
`prefix` - address prefix
"""
use Ockam.Worker

alias Ockam.Services.Relay.Types.CreateRelayRequest
alias Ockam.Services.Relay.Types.Relay
alias Ockam.Services.Relay.Worker, as: Forwarder

alias Ockam.Message

require Logger

@spec list_running_relays() :: [{Ockam.Address.t(), map()}]
def list_running_relays() do
Ockam.Node.Registry.select_by_attribute(:service, :relay)
|> Enum.map(&Relay.from_registry_attributes/1)
end

@spec relay_info(addr :: Ockam.Address.t()) :: {:ok, Relay.t()} | :error
def relay_info(addr) do
with {:ok, meta} <- Ockam.Node.Registry.lookup_meta(addr) do
{:ok, Relay.from_registry_attributes({addr, meta.attributes})}
end
end

@impl true
def setup(options, state) do
prefix = Keyword.get(options, :prefix, state.address)

forwarder_options = Keyword.get(options, :forwarder_options, [])

{:ok,
Map.merge(state, %{
prefix: prefix,
forwarder_options: forwarder_options
})}
end

@impl true
def handle_message(message, state) do
payload = Message.payload(message)

case parse_create_relay_req(payload) do
{:ok, req} ->
return_route = Message.return_route(message)
target_identifier = Message.local_metadata_value(message, :identity_id)
_ignored = subscribe(req.alias, req.tags, return_route, target_identifier, true, state)
{:ok, state}

{:error, reason} ->
Logger.error("Invalid relay create msg: #{inspect(payload)}, reason #{inspect(reason)}")
{:ok, state}
end
end

def parse_create_relay_req(data) do
case :bare.decode(data, :string) do
{:ok, alias_str, ""} ->
{:ok, %CreateRelayRequest{alias: alias_str, tags: %{}}}

_err ->
CreateRelayRequest.decode_strict(data)
end
end

def subscribe(alias_str, tags, route, target_identifier, notify, state) do
forwarder_address = forwarder_address(alias_str, state)
forwarder_options = Map.fetch!(state, :forwarder_options)

case Ockam.Node.whereis(forwarder_address) do
nil ->
Forwarder.create(
Keyword.merge(forwarder_options,
address: forwarder_address,
relay_options: [
alias: alias_str,
route: route,
tags: tags,
notify: notify,
target_identifier: target_identifier
]
)
)

_pid ->
with :ok <-
Forwarder.update_route(forwarder_address, route, target_identifier, tags, notify) do
{:ok, forwarder_address}
end
end
end

def forwarder_address(alias_str, state) do
Map.get(state, :prefix, "") <> "_" <> alias_str
end
end
Loading

0 comments on commit dbfd4b0

Please sign in to comment.