Skip to content

Commit

Permalink
Support a configurable context path on the public api (#20)
Browse files Browse the repository at this point in the history
The context path of the public API is now configurable by a env
variable: `PUBLIC_API_CONTEXT_PATH="/push" mix run --no-halt`

**Note**: Not related to the main purpose of the PR, but I also move the
message brokering logic in a dedicated folder to make more explicit the
3 layers inside Neurow:
- Public api,
- Internal api,
- Broker.
  • Loading branch information
achouippe authored Sep 25, 2024
1 parent ef5db1b commit 1efc737
Show file tree
Hide file tree
Showing 14 changed files with 167 additions and 118 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ Available environment variables are:
| `LOG_LEVEL` | info | Log level |
| `PUBLIC_API_PORT` | 4000 | TCP port of the public API |
| `PUBLIC_API_JWT_MAX_LIFETIME` | 120 | Max lifetime in seconds allowed for JWT tokens issued on the public API |
| `PUBLIC_API_CONTEXT_PATH` | "" | URL prefix for resources of the public API - Useful to mount Neurow on a existing website|
| `PREFLIGHT_MAX_AGE` | 86400 | Value of the `access-control-max-age` headers on CROS preflight responses on the public API |
| `SSE_TIMEOUT` | 900000 | SSE deconnection delay in ms, after the last received message
| `SSE_KEEPALIVE` | 600000 | Neurow periodically send `ping` events on SSE connections to prevent connections from being closed by network devices. This variable defines the delay between two ping events |
| `INTERNAL_API_PORT` | 3000 | TCP port fo the internal API |
| `INTERNAL_API_JWT_MAX_LIFETIME` | 1500 | Max lifetime in seconds allowed for JWT tokens issued on the internal | API |
| `INTERNAL_API_JWT_MAX_LIFETIME` | 1500 | Max lifetime in seconds allowed for JWT tokens issued on the internal API |
| `HISTORY_MIN_DURATION` | 30 | Messages are persisted in the Neurow cluster, so clients can re-fetch recent messages after a short term disconnection by using the `Last-Event-Id` on SSE connections. Messages are only persisted for a limited time. `HISTORY_MIN_DURATION` defines the minimum retention guaranteed by the Neurow server.


Expand Down
1 change: 1 addition & 0 deletions neurow/config/runtime.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ config :neurow,
String.to_integer(System.get_env("PUBLIC_API_JWT_MAX_LIFETIME") || "120"),
public_api_allowed_origins: [~r/^https:\/\/.*\.doctolib\.(fr|it|de)(:3000)?$/],
public_api_preflight_max_age: String.to_integer(System.get_env("PREFLIGHT_MAX_AGE") || "86400"),
public_api_context_path: System.get_env("PUBLIC_API_CONTEXT_PATH") || "",
sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000"),
sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000")

Expand Down
6 changes: 3 additions & 3 deletions neurow/integration_test/test_cluster.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule Neurow.IntegrationTest.TestCluster do
A call to this method is expected in the setup block of integration tests
"""
def flush_history do
GenServer.call(__MODULE__, :flush__history)
GenServer.call(__MODULE__, :flush_history)
end

# -- GenServer callbacks, should not be used directly --
Expand Down Expand Up @@ -87,11 +87,11 @@ defmodule Neurow.IntegrationTest.TestCluster do
end
end

def handle_call(:flush__history, _from, state) do
def handle_call(:flush_history, _from, state) do
state.nodes
|> Enum.map(fn {node, _public_api_port, _internal_api_port} ->
Task.async(fn ->
:ok = :rpc.call(node, Neurow.ReceiverShardManager, :flush_history, [])
:ok = :rpc.call(node, Neurow.Broker.ReceiverShardManager, :flush_history, [])
end)
end)
|> Enum.map(fn task -> Task.await(task, 2_000) end)
Expand Down
4 changes: 2 additions & 2 deletions neurow/lib/neurow/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ defmodule Neurow.Application do
scheme: sse_http_scheme, plug: Neurow.PublicApi.Endpoint, options: public_api_http_config},
{Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.Endpoint], shutdown: 20_000},
{StopListener, []},
{Neurow.ReceiverShardManager, [history_min_duration]}
{Neurow.Broker.ReceiverShardManager, [history_min_duration]}
] ++
Neurow.ReceiverShardManager.create_receivers() ++
Neurow.Broker.ReceiverShardManager.create_receivers() ++
if cluster_topologies do
[{Cluster.Supervisor, [cluster_topologies, [name: Neurow.ClusterSupervisor]]}]
else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,6 @@
defmodule Neurow.InternalApi.Message do
defmodule Neurow.Broker.Message do
defstruct [:event, :timestamp, :payload]

def from_json(payload) when is_binary(payload) do
{:ok, from_json(:jiffy.decode(payload, [:return_maps]))}
rescue
exception -> {:error, exception}
end

def from_json(payload) when is_map(payload) do
%Neurow.InternalApi.Message{
event: payload["event"],
timestamp: payload["timestamp"],
payload: payload["payload"]
}
end

def validate(message) do
cond do
message.event == nil ->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Neurow.ReceiverShard do
defmodule Neurow.Broker.ReceiverShard do
use GenServer

def start_link(shard) do
Expand All @@ -23,7 +23,12 @@ defmodule Neurow.ReceiverShard do

@impl true
def init(shard) do
:ok = Phoenix.PubSub.subscribe(Neurow.PubSub, Neurow.ReceiverShardManager.build_topic(shard))
:ok =
Phoenix.PubSub.subscribe(
Neurow.PubSub,
Neurow.Broker.ReceiverShardManager.build_topic(shard)
)

table_0 = table_name(shard, 0)
table_1 = table_name(shard, 1)
create_table(table_0)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Neurow.ReceiverShardManager do
defmodule Neurow.Broker.ReceiverShardManager do
require Logger
use GenServer

Expand Down Expand Up @@ -37,7 +37,7 @@ defmodule Neurow.ReceiverShardManager do

def all_pids(fun) do
Enum.map(0..(@shards - 1), fn shard ->
fun.({shard, Neurow.ReceiverShard.build_name(shard)})
fun.({shard, Neurow.Broker.ReceiverShard.build_name(shard)})
end)
end

Expand All @@ -58,7 +58,7 @@ defmodule Neurow.ReceiverShardManager do
@impl true
def handle_call({:flush_history}, _from, state) do
all_pids(fn {_, pid} ->
pid |> Neurow.ReceiverShard.flush_history()
pid |> Neurow.Broker.ReceiverShard.flush_history()
end)

{:reply, :ok, state}
Expand All @@ -70,12 +70,12 @@ defmodule Neurow.ReceiverShardManager do

# Read from the current process, not from GenServer process
def get_history(topic) do
Neurow.ReceiverShard.get_history(shard_from_topic(topic), topic)
Neurow.Broker.ReceiverShard.get_history(shard_from_topic(topic), topic)
end

def create_receivers() do
all_pids(fn {shard, pid} ->
Supervisor.child_spec({Neurow.ReceiverShard, shard}, id: pid)
Supervisor.child_spec({Neurow.Broker.ReceiverShard, shard}, id: pid)
end)
end

Expand Down
4 changes: 4 additions & 0 deletions neurow/lib/neurow/configuration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ defmodule Neurow.Configuration do
Application.fetch_env!(:neurow, :sse_keepalive)
end

def public_api_context_path do
Application.get_env(:neurow, :public_api_context_path, "")
end

@impl true
def init(_opts) do
{:ok,
Expand Down
6 changes: 3 additions & 3 deletions neurow/lib/neurow/internal_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ defmodule Neurow.InternalApi.Endpoint do
require Node
import Plug.Conn
alias Neurow.InternalApi.PublishRequest
alias Neurow.InternalApi.Message
alias Neurow.Broker.Message

use Plug.Router
plug(MetricsPlugExporter)
Expand Down Expand Up @@ -64,7 +64,7 @@ defmodule Neurow.InternalApi.Endpoint do
end

get "/history/:topic" do
history = Neurow.ReceiverShardManager.get_history(topic)
history = Neurow.Broker.ReceiverShardManager.get_history(topic)

history =
Enum.map(history, fn {_, message} ->
Expand All @@ -87,7 +87,7 @@ defmodule Neurow.InternalApi.Endpoint do
Enum.each(topics, fn topic ->
Enum.each(messages, fn message ->
:ok =
Neurow.ReceiverShardManager.broadcast(topic, %Message{
Neurow.Broker.ReceiverShardManager.broadcast(topic, %Message{
message
| timestamp: message.timestamp || publish_timestamp
})
Expand Down
14 changes: 11 additions & 3 deletions neurow/lib/neurow/internal_api/publish_request.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule Neurow.InternalApi.PublishRequest do
alias Neurow.InternalApi.Message
alias Neurow.Broker.Message

defstruct [:topics, :topic, :message, :messages]

Expand All @@ -16,7 +16,7 @@ defmodule Neurow.InternalApi.PublishRequest do
message:
case payload["message"] do
message when is_map(message) ->
Neurow.InternalApi.Message.from_json(payload["message"])
message_from_json(payload["message"])

_ ->
nil
Expand All @@ -26,7 +26,7 @@ defmodule Neurow.InternalApi.PublishRequest do
messages when is_list(messages) ->
Enum.map(messages, fn message ->
case message do
message when is_map(message) -> Neurow.InternalApi.Message.from_json(message)
message when is_map(message) -> message_from_json(message)
_ -> nil
end
end)
Expand Down Expand Up @@ -116,4 +116,12 @@ defmodule Neurow.InternalApi.PublishRequest do
{nil, messages} -> messages
end
end

defp message_from_json(payload) when is_map(payload) do
%Neurow.Broker.Message{
event: payload["event"],
timestamp: payload["timestamp"],
payload: payload["payload"]
}
end
end
74 changes: 41 additions & 33 deletions neurow/lib/neurow/public_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@ defmodule Neurow.PublicApi.Endpoint do
plug(:match)
plug(:dispatch)

get "/v1/subscribe" do
match _ do
context_path = Neurow.Configuration.public_api_context_path()

case {conn.method, conn.request_path} do
{"GET", ^context_path <> "/v1/subscribe"} ->
subscribe(conn)

_ ->
conn |> send_resp(404, "")
end
end

defp subscribe(conn) do
case conn.assigns[:jwt_payload] do
%{"iss" => issuer, "sub" => sub} ->
topic = "#{issuer}-#{sub}"
Expand Down Expand Up @@ -81,35 +93,7 @@ defmodule Neurow.PublicApi.Endpoint do
send_http_error(conn, :forbidden, error_code, error_message)
end

def send_http_error(conn, http_status, error_code, error_message) do
origin =
case conn |> get_req_header("origin") do
[origin] -> origin
_ -> "*"
end

response =
:jiffy.encode(%{
errors: [
%{error_code: error_code, error_message: error_message}
]
})

now = :os.system_time(:seconds)

{:ok, conn} =
conn
|> put_resp_header("content-type", "text/event-stream")
|> put_resp_header("access-control-allow-origin", origin)
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "close")
|> send_chunked(http_status)
|> chunk("id:#{now}\nevent: neurow_error_#{http_status}\ndata: #{response}\n\n")

conn
end

def preflight_request(conn, _options) do
def preflight_request(conn, _opts) do
case conn.method do
"OPTIONS" ->
with(
Expand Down Expand Up @@ -140,8 +124,32 @@ defmodule Neurow.PublicApi.Endpoint do
end
end

match _ do
send_resp(conn, 404, "")
defp send_http_error(conn, http_status, error_code, error_message) do
origin =
case conn |> get_req_header("origin") do
[origin] -> origin
_ -> "*"
end

response =
:jiffy.encode(%{
errors: [
%{error_code: error_code, error_message: error_message}
]
})

now = :os.system_time(:seconds)

{:ok, conn} =
conn
|> put_resp_header("content-type", "text/event-stream")
|> put_resp_header("access-control-allow-origin", origin)
|> put_resp_header("cache-control", "no-cache")
|> put_resp_header("connection", "close")
|> send_chunked(http_status)
|> chunk("id:#{now}\nevent: neurow_error_#{http_status}\ndata: #{response}\n\n")

conn
end

defp extract_last_event_id(conn) do
Expand All @@ -162,7 +170,7 @@ defmodule Neurow.PublicApi.Endpoint do
end

defp import_history(conn, topic, last_event_id) do
history = Neurow.ReceiverShardManager.get_history(topic)
history = Neurow.Broker.ReceiverShardManager.get_history(topic)

{conn, sent} = process_history(conn, last_event_id, 0, history)

Expand Down
Loading

0 comments on commit 1efc737

Please sign in to comment.