diff --git a/README.md b/README.md index ca48c5c..b998f65 100644 --- a/README.md +++ b/README.md @@ -30,11 +30,12 @@ Available environment variables are: | `LOG_LEVEL` | info | Log level | | `PUBLIC_API_PORT` | 4000 | TCP port of the public API | | `PUBLIC_API_JWT_MAX_LIFETIME` | 120 | Max lifetime in seconds allowed for JWT tokens issued on the public API | +| `PUBLIC_API_CONTEXT_PATH` | "" | URL prefix for resources of the public API - Useful to mount Neurow on a existing website| | `PREFLIGHT_MAX_AGE` | 86400 | Value of the `access-control-max-age` headers on CROS preflight responses on the public API | | `SSE_TIMEOUT` | 900000 | SSE deconnection delay in ms, after the last received message | `SSE_KEEPALIVE` | 600000 | Neurow periodically send `ping` events on SSE connections to prevent connections from being closed by network devices. This variable defines the delay between two ping events | | `INTERNAL_API_PORT` | 3000 | TCP port fo the internal API | -| `INTERNAL_API_JWT_MAX_LIFETIME` | 1500 | Max lifetime in seconds allowed for JWT tokens issued on the internal | API | +| `INTERNAL_API_JWT_MAX_LIFETIME` | 1500 | Max lifetime in seconds allowed for JWT tokens issued on the internal API | | `HISTORY_MIN_DURATION` | 30 | Messages are persisted in the Neurow cluster, so clients can re-fetch recent messages after a short term disconnection by using the `Last-Event-Id` on SSE connections. Messages are only persisted for a limited time. `HISTORY_MIN_DURATION` defines the minimum retention guaranteed by the Neurow server. diff --git a/neurow/config/runtime.exs b/neurow/config/runtime.exs index b4780b5..2b17d8c 100644 --- a/neurow/config/runtime.exs +++ b/neurow/config/runtime.exs @@ -11,6 +11,7 @@ config :neurow, String.to_integer(System.get_env("PUBLIC_API_JWT_MAX_LIFETIME") || "120"), public_api_allowed_origins: [~r/^https:\/\/.*\.doctolib\.(fr|it|de)(:3000)?$/], public_api_preflight_max_age: String.to_integer(System.get_env("PREFLIGHT_MAX_AGE") || "86400"), + public_api_context_path: System.get_env("PUBLIC_API_CONTEXT_PATH") || "", sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000"), sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000") diff --git a/neurow/integration_test/test_cluster.exs b/neurow/integration_test/test_cluster.exs index a036f7e..5e61f3f 100644 --- a/neurow/integration_test/test_cluster.exs +++ b/neurow/integration_test/test_cluster.exs @@ -44,7 +44,7 @@ defmodule Neurow.IntegrationTest.TestCluster do A call to this method is expected in the setup block of integration tests """ def flush_history do - GenServer.call(__MODULE__, :flush__history) + GenServer.call(__MODULE__, :flush_history) end # -- GenServer callbacks, should not be used directly -- @@ -87,11 +87,11 @@ defmodule Neurow.IntegrationTest.TestCluster do end end - def handle_call(:flush__history, _from, state) do + def handle_call(:flush_history, _from, state) do state.nodes |> Enum.map(fn {node, _public_api_port, _internal_api_port} -> Task.async(fn -> - :ok = :rpc.call(node, Neurow.ReceiverShardManager, :flush_history, []) + :ok = :rpc.call(node, Neurow.Broker.ReceiverShardManager, :flush_history, []) end) end) |> Enum.map(fn task -> Task.await(task, 2_000) end) diff --git a/neurow/lib/neurow/application.ex b/neurow/lib/neurow/application.ex index 91ff7fa..91c1423 100644 --- a/neurow/lib/neurow/application.ex +++ b/neurow/lib/neurow/application.ex @@ -74,9 +74,9 @@ defmodule Neurow.Application do scheme: sse_http_scheme, plug: Neurow.PublicApi.Endpoint, options: public_api_http_config}, {Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.Endpoint], shutdown: 20_000}, {StopListener, []}, - {Neurow.ReceiverShardManager, [history_min_duration]} + {Neurow.Broker.ReceiverShardManager, [history_min_duration]} ] ++ - Neurow.ReceiverShardManager.create_receivers() ++ + Neurow.Broker.ReceiverShardManager.create_receivers() ++ if cluster_topologies do [{Cluster.Supervisor, [cluster_topologies, [name: Neurow.ClusterSupervisor]]}] else diff --git a/neurow/lib/neurow/internal_api/message.ex b/neurow/lib/neurow/broker/message.ex similarity index 62% rename from neurow/lib/neurow/internal_api/message.ex rename to neurow/lib/neurow/broker/message.ex index 35627b7..c94e951 100644 --- a/neurow/lib/neurow/internal_api/message.ex +++ b/neurow/lib/neurow/broker/message.ex @@ -1,20 +1,6 @@ -defmodule Neurow.InternalApi.Message do +defmodule Neurow.Broker.Message do defstruct [:event, :timestamp, :payload] - def from_json(payload) when is_binary(payload) do - {:ok, from_json(:jiffy.decode(payload, [:return_maps]))} - rescue - exception -> {:error, exception} - end - - def from_json(payload) when is_map(payload) do - %Neurow.InternalApi.Message{ - event: payload["event"], - timestamp: payload["timestamp"], - payload: payload["payload"] - } - end - def validate(message) do cond do message.event == nil -> diff --git a/neurow/lib/neurow/receiver_shard.ex b/neurow/lib/neurow/broker/receiver_shard.ex similarity index 90% rename from neurow/lib/neurow/receiver_shard.ex rename to neurow/lib/neurow/broker/receiver_shard.ex index 5a85774..463cd46 100644 --- a/neurow/lib/neurow/receiver_shard.ex +++ b/neurow/lib/neurow/broker/receiver_shard.ex @@ -1,4 +1,4 @@ -defmodule Neurow.ReceiverShard do +defmodule Neurow.Broker.ReceiverShard do use GenServer def start_link(shard) do @@ -23,7 +23,12 @@ defmodule Neurow.ReceiverShard do @impl true def init(shard) do - :ok = Phoenix.PubSub.subscribe(Neurow.PubSub, Neurow.ReceiverShardManager.build_topic(shard)) + :ok = + Phoenix.PubSub.subscribe( + Neurow.PubSub, + Neurow.Broker.ReceiverShardManager.build_topic(shard) + ) + table_0 = table_name(shard, 0) table_1 = table_name(shard, 1) create_table(table_0) diff --git a/neurow/lib/neurow/receiver_shard_manager.ex b/neurow/lib/neurow/broker/receiver_shard_manager.ex similarity index 84% rename from neurow/lib/neurow/receiver_shard_manager.ex rename to neurow/lib/neurow/broker/receiver_shard_manager.ex index d394257..1ff7aad 100644 --- a/neurow/lib/neurow/receiver_shard_manager.ex +++ b/neurow/lib/neurow/broker/receiver_shard_manager.ex @@ -1,4 +1,4 @@ -defmodule Neurow.ReceiverShardManager do +defmodule Neurow.Broker.ReceiverShardManager do require Logger use GenServer @@ -37,7 +37,7 @@ defmodule Neurow.ReceiverShardManager do def all_pids(fun) do Enum.map(0..(@shards - 1), fn shard -> - fun.({shard, Neurow.ReceiverShard.build_name(shard)}) + fun.({shard, Neurow.Broker.ReceiverShard.build_name(shard)}) end) end @@ -58,7 +58,7 @@ defmodule Neurow.ReceiverShardManager do @impl true def handle_call({:flush_history}, _from, state) do all_pids(fn {_, pid} -> - pid |> Neurow.ReceiverShard.flush_history() + pid |> Neurow.Broker.ReceiverShard.flush_history() end) {:reply, :ok, state} @@ -70,12 +70,12 @@ defmodule Neurow.ReceiverShardManager do # Read from the current process, not from GenServer process def get_history(topic) do - Neurow.ReceiverShard.get_history(shard_from_topic(topic), topic) + Neurow.Broker.ReceiverShard.get_history(shard_from_topic(topic), topic) end def create_receivers() do all_pids(fn {shard, pid} -> - Supervisor.child_spec({Neurow.ReceiverShard, shard}, id: pid) + Supervisor.child_spec({Neurow.Broker.ReceiverShard, shard}, id: pid) end) end diff --git a/neurow/lib/neurow/configuration.ex b/neurow/lib/neurow/configuration.ex index 17fa056..dea5220 100644 --- a/neurow/lib/neurow/configuration.ex +++ b/neurow/lib/neurow/configuration.ex @@ -49,6 +49,10 @@ defmodule Neurow.Configuration do Application.fetch_env!(:neurow, :sse_keepalive) end + def public_api_context_path do + Application.get_env(:neurow, :public_api_context_path, "") + end + @impl true def init(_opts) do {:ok, diff --git a/neurow/lib/neurow/internal_api/endpoint.ex b/neurow/lib/neurow/internal_api/endpoint.ex index 6123b3a..355ff92 100644 --- a/neurow/lib/neurow/internal_api/endpoint.ex +++ b/neurow/lib/neurow/internal_api/endpoint.ex @@ -3,7 +3,7 @@ defmodule Neurow.InternalApi.Endpoint do require Node import Plug.Conn alias Neurow.InternalApi.PublishRequest - alias Neurow.InternalApi.Message + alias Neurow.Broker.Message use Plug.Router plug(MetricsPlugExporter) @@ -64,7 +64,7 @@ defmodule Neurow.InternalApi.Endpoint do end get "/history/:topic" do - history = Neurow.ReceiverShardManager.get_history(topic) + history = Neurow.Broker.ReceiverShardManager.get_history(topic) history = Enum.map(history, fn {_, message} -> @@ -87,7 +87,7 @@ defmodule Neurow.InternalApi.Endpoint do Enum.each(topics, fn topic -> Enum.each(messages, fn message -> :ok = - Neurow.ReceiverShardManager.broadcast(topic, %Message{ + Neurow.Broker.ReceiverShardManager.broadcast(topic, %Message{ message | timestamp: message.timestamp || publish_timestamp }) diff --git a/neurow/lib/neurow/internal_api/publish_request.ex b/neurow/lib/neurow/internal_api/publish_request.ex index 8c36858..3ef5db4 100644 --- a/neurow/lib/neurow/internal_api/publish_request.ex +++ b/neurow/lib/neurow/internal_api/publish_request.ex @@ -1,5 +1,5 @@ defmodule Neurow.InternalApi.PublishRequest do - alias Neurow.InternalApi.Message + alias Neurow.Broker.Message defstruct [:topics, :topic, :message, :messages] @@ -16,7 +16,7 @@ defmodule Neurow.InternalApi.PublishRequest do message: case payload["message"] do message when is_map(message) -> - Neurow.InternalApi.Message.from_json(payload["message"]) + message_from_json(payload["message"]) _ -> nil @@ -26,7 +26,7 @@ defmodule Neurow.InternalApi.PublishRequest do messages when is_list(messages) -> Enum.map(messages, fn message -> case message do - message when is_map(message) -> Neurow.InternalApi.Message.from_json(message) + message when is_map(message) -> message_from_json(message) _ -> nil end end) @@ -116,4 +116,12 @@ defmodule Neurow.InternalApi.PublishRequest do {nil, messages} -> messages end end + + defp message_from_json(payload) when is_map(payload) do + %Neurow.Broker.Message{ + event: payload["event"], + timestamp: payload["timestamp"], + payload: payload["payload"] + } + end end diff --git a/neurow/lib/neurow/public_api/endpoint.ex b/neurow/lib/neurow/public_api/endpoint.ex index 31ff70c..1ea5621 100644 --- a/neurow/lib/neurow/public_api/endpoint.ex +++ b/neurow/lib/neurow/public_api/endpoint.ex @@ -20,7 +20,19 @@ defmodule Neurow.PublicApi.Endpoint do plug(:match) plug(:dispatch) - get "/v1/subscribe" do + match _ do + context_path = Neurow.Configuration.public_api_context_path() + + case {conn.method, conn.request_path} do + {"GET", ^context_path <> "/v1/subscribe"} -> + subscribe(conn) + + _ -> + conn |> send_resp(404, "") + end + end + + defp subscribe(conn) do case conn.assigns[:jwt_payload] do %{"iss" => issuer, "sub" => sub} -> topic = "#{issuer}-#{sub}" @@ -81,35 +93,7 @@ defmodule Neurow.PublicApi.Endpoint do send_http_error(conn, :forbidden, error_code, error_message) end - def send_http_error(conn, http_status, error_code, error_message) do - origin = - case conn |> get_req_header("origin") do - [origin] -> origin - _ -> "*" - end - - response = - :jiffy.encode(%{ - errors: [ - %{error_code: error_code, error_message: error_message} - ] - }) - - now = :os.system_time(:seconds) - - {:ok, conn} = - conn - |> put_resp_header("content-type", "text/event-stream") - |> put_resp_header("access-control-allow-origin", origin) - |> put_resp_header("cache-control", "no-cache") - |> put_resp_header("connection", "close") - |> send_chunked(http_status) - |> chunk("id:#{now}\nevent: neurow_error_#{http_status}\ndata: #{response}\n\n") - - conn - end - - def preflight_request(conn, _options) do + def preflight_request(conn, _opts) do case conn.method do "OPTIONS" -> with( @@ -140,8 +124,32 @@ defmodule Neurow.PublicApi.Endpoint do end end - match _ do - send_resp(conn, 404, "") + defp send_http_error(conn, http_status, error_code, error_message) do + origin = + case conn |> get_req_header("origin") do + [origin] -> origin + _ -> "*" + end + + response = + :jiffy.encode(%{ + errors: [ + %{error_code: error_code, error_message: error_message} + ] + }) + + now = :os.system_time(:seconds) + + {:ok, conn} = + conn + |> put_resp_header("content-type", "text/event-stream") + |> put_resp_header("access-control-allow-origin", origin) + |> put_resp_header("cache-control", "no-cache") + |> put_resp_header("connection", "close") + |> send_chunked(http_status) + |> chunk("id:#{now}\nevent: neurow_error_#{http_status}\ndata: #{response}\n\n") + + conn end defp extract_last_event_id(conn) do @@ -162,7 +170,7 @@ defmodule Neurow.PublicApi.Endpoint do end defp import_history(conn, topic, last_event_id) do - history = Neurow.ReceiverShardManager.get_history(topic) + history = Neurow.Broker.ReceiverShardManager.get_history(topic) {conn, sent} = process_history(conn, last_event_id, 0, history) diff --git a/neurow/test/neurow/internal_api/message_test.exs b/neurow/test/neurow/broker/message_test.exs similarity index 50% rename from neurow/test/neurow/internal_api/message_test.exs rename to neurow/test/neurow/broker/message_test.exs index 5a557ed..a663709 100644 --- a/neurow/test/neurow/internal_api/message_test.exs +++ b/neurow/test/neurow/broker/message_test.exs @@ -1,89 +1,81 @@ -defmodule Neurow.InternalApi.MessageTest do +defmodule Neurow.Broker.MessageTest do use ExUnit.Case - alias Neurow.InternalApi.Message + alias Neurow.Broker.Message describe "#validate" do test "returns :ok if the message without a timestamp is valid" do - message = - Message.from_json(%{ - "event" => "test-event", - "payload" => "Hello !" - }) + message = %Message{ + event: "test-event", + payload: "Hello !" + } assert Message.validate(message) == :ok end test "returns :ok if the message with a timestamp is valid" do - message = - Message.from_json(%{ - "event" => "test-event", - "payload" => "Hello !", - "timestamp" => 1234 - }) + message = %Message{ + event: "test-event", + payload: "Hello !", + timestamp: 1234 + } assert Message.validate(message) == :ok end test "returns an error if 'event' is missing" do - message = - Message.from_json(%{ - "payload" => "Hello !", - "timestamp" => 1234 - }) + message = %Message{ + payload: "Hello !", + timestamp: 1234 + } assert Message.validate(message) == {:error, "'event' is expected"} end test "returns an error if 'event' is not a string" do - message = - Message.from_json(%{ - "event" => 123, - "payload" => "Hello !", - "timestamp" => 1234 - }) + message = %Message{ + event: 123, + payload: "Hello !", + timestamp: 1234 + } assert Message.validate(message) == {:error, "'event' must be a non-empty string"} end test "returns an error if the payload is missing" do - message = - Message.from_json(%{ - "event" => "test_event", - "timestamp" => 1234 - }) + message = %Message{ + event: "test_event", + timestamp: 1234 + } assert Message.validate(message) == {:error, "'payload' is expected"} end test "returns an error if the payload is not a string" do - message = - Message.from_json(%{ - "event" => "test_event", - "payload" => 1234, - "timestamp" => 1234 - }) + message = %Message{ + event: "test_event", + payload: 1234, + timestamp: 1234 + } assert Message.validate(message) == {:error, "'payload' must be a non-empty string"} end test "returns an error if the timestamp is not an integer" do - message = - Message.from_json(%{ - "event" => "test_event", - "payload" => "test payload", - "timestamp" => "foo" - }) + message = %Message{ + event: "test_event", + payload: "test payload", + timestamp: "foo" + } assert Message.validate(message) == {:error, "'timestamp' must be a positive integer"} end test "returns an error if the timestamp is a negative integer " do - message = - Message.from_json(%{ - "event" => "test_event", - "payload" => "test payload", - "timestamp" => -1 - }) + message = %Message{ + event: "test_event", + payload: "test payload", + timestamp: -1 + } assert Message.validate(message) == {:error, "'timestamp' must be a positive integer"} end diff --git a/neurow/test/neurow/internal_api/endpoint_test.exs b/neurow/test/neurow/internal_api/endpoint_test.exs index f1ee531..dbe2933 100644 --- a/neurow/test/neurow/internal_api/endpoint_test.exs +++ b/neurow/test/neurow/internal_api/endpoint_test.exs @@ -2,7 +2,7 @@ defmodule Neurow.InternalApi.EndpointTest do use ExUnit.Case use Plug.Test - alias Neurow.InternalApi.Message + alias Neurow.Broker.Message import JwtHelper diff --git a/neurow/test/neurow/public_api/endpoint_test.exs b/neurow/test/neurow/public_api/endpoint_test.exs index 82ce3b9..5ca510d 100644 --- a/neurow/test/neurow/public_api/endpoint_test.exs +++ b/neurow/test/neurow/public_api/endpoint_test.exs @@ -103,8 +103,8 @@ defmodule Neurow.PublicApi.EndpointTest do describe "history" do setup do - GenServer.call(Neurow.ReceiverShardManager, {:rotate}) - GenServer.call(Neurow.ReceiverShardManager, {:rotate}) + GenServer.call(Neurow.Broker.ReceiverShardManager, {:rotate}) + GenServer.call(Neurow.Broker.ReceiverShardManager, {:rotate}) Process.sleep(20) :ok end @@ -434,9 +434,53 @@ defmodule Neurow.PublicApi.EndpointTest do end end + describe "context path" do + setup do + Application.put_env(:neurow, :public_api_context_path, "/context_path") + on_exit(fn -> Application.put_env(:neurow, :public_api_context_path, "") end) + :ok + end + + test "the authentication logic is applyed to urls prefixed by the context path" do + conn = + conn(:get, "/v1/subscribe") + + call(Neurow.PublicApi.Endpoint, conn, fn -> + assert_receive {:send_chunked, 403} + assert_receive {:chunk, body} + + sse_event = parse_sse_event(body) + + assert sse_event.event == "neurow_error_forbidden" + end) + end + + test "The subscribe url is prefixed with the context path" do + conn = + conn(:get, "/context_path/v1/subscribe") + |> put_req_header( + "authorization", + "Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1")}" + ) + + call(Neurow.PublicApi.Endpoint, conn, fn -> + assert_receive {:send_chunked, 200} + publish_message("test_issuer1-test_topic1", 1234, "A message") + + assert_receive {:chunk, first_event} + + assert parse_sse_event(first_event) == %{ + id: "1234", + event: "test-event", + data: "A message" + } + end) + end + end + defp publish_message(topic, id, message) do :ok = - Neurow.ReceiverShardManager.broadcast(topic, %Neurow.InternalApi.Message{ + Neurow.Broker.ReceiverShardManager.broadcast(topic, %Neurow.Broker.Message{ event: "test-event", payload: message, timestamp: id