diff --git a/neurow/config/runtime.exs b/neurow/config/runtime.exs index af8454e..7b8c150 100644 --- a/neurow/config/runtime.exs +++ b/neurow/config/runtime.exs @@ -6,10 +6,19 @@ config :logger, :console, config :neurow, public_api_port: String.to_integer(System.get_env("PUBLIC_API_PORT") || "4000") +config :neurow, + public_api_jwt_max_lifetime: + String.to_integer(System.get_env("PUBLIC_API_JWT_MAX_LIFETIME") || "120") + config :neurow, internal_api_port: String.to_integer(System.get_env("INTERNAL_API_PORT") || "3000") +config :neurow, + internal_api_jwt_max_lifetime: + String.to_integer(System.get_env("INTERNAL_API_JWT_MAX_LIFETIME") || "120") + config :neurow, sse_timeout: String.to_integer(System.get_env("SSE_TIMEOUT") || "900000") +config :neurow, sse_keepalive: String.to_integer(System.get_env("SSE_KEEPALIVE") || "600000") config :neurow, ssl_keyfile: System.get_env("SSL_KEYFILE") config :neurow, ssl_certfile: System.get_env("SSL_CERTFILE") diff --git a/neurow/lib/neurow.ex b/neurow/lib/neurow.ex deleted file mode 100644 index 1de9663..0000000 --- a/neurow/lib/neurow.ex +++ /dev/null @@ -1,18 +0,0 @@ -defmodule Neurow do - @moduledoc """ - Documentation for `Neurow`. - """ - - @doc """ - Hello world. - - ## Examples - - iex> Neurow.hello() - :world - - """ - def hello do - :world - end -end diff --git a/neurow/lib/neurow/application.ex b/neurow/lib/neurow/application.ex index d2aa305..9ba76ad 100644 --- a/neurow/lib/neurow/application.ex +++ b/neurow/lib/neurow/application.ex @@ -48,7 +48,9 @@ defmodule Neurow.Application do name: Neurow.PubSub, options: [adapter: Phoenix.PubSub.PG2, pool_size: 10]}, {Plug.Cowboy, scheme: :http, plug: Neurow.InternalApi, options: [port: internal_api_port]}, {Plug.Cowboy, - scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config} + scheme: sse_http_scheme, plug: Neurow.PublicApi, options: public_api_http_config}, + {Plug.Cowboy.Drainer, refs: [Neurow.PublicApi.HTTP], shutdown: 20_000}, + {StopListener, []} ] MetricsPlugExporter.setup() diff --git a/neurow/lib/neurow/configuration.ex b/neurow/lib/neurow/configuration.ex index 0f05d0e..4bc958c 100644 --- a/neurow/lib/neurow/configuration.ex +++ b/neurow/lib/neurow/configuration.ex @@ -10,11 +10,11 @@ defmodule Neurow.Configuration do end def public_api_audience do - Application.fetch_env!(:neurow, :public_api_authentication)[:audience] + GenServer.call(__MODULE__, {:static_param, :public_api_audience}) end def public_api_verbose_authentication_errors do - Application.fetch_env!(:neurow, :public_api_authentication)[:verbose_authentication_errors] + GenServer.call(__MODULE__, {:static_param, :public_api_verbose_authentication_errors}) end def internal_api_issuer_jwks(issuer_name) do @@ -22,13 +22,27 @@ defmodule Neurow.Configuration do end def internal_api_audience do - Application.fetch_env!(:neurow, :internal_api_authentication)[:audience] + GenServer.call(__MODULE__, {:static_param, :internal_api_audience}) end def internal_api_verbose_authentication_errors do - Application.fetch_env!(:neurow, :internal_api_authentication)[ - :verbose_authentication_errors - ] + GenServer.call(__MODULE__, {:static_param, :internal_api_verbose_authentication_errors}) + end + + def internal_api_jwt_max_lifetime do + GenServer.call(__MODULE__, {:static_param, :internal_api_jwt_max_lifetime}) + end + + def public_api_jwt_max_lifetime do + GenServer.call(__MODULE__, {:static_param, :public_api_jwt_max_lifetime}) + end + + def sse_timeout do + GenServer.call(__MODULE__, {:static_param, :sse_timeout}) + end + + def sse_keepalive do + GenServer.call(__MODULE__, {:static_param, :sse_keepalive}) end @impl true @@ -40,7 +54,23 @@ defmodule Neurow.Configuration do }, internal_api: %{ issuer_jwks: build_issuer_jwks(:internal_api_authentication) - } + }, + sse_keepalive: Application.fetch_env!(:neurow, :sse_keepalive), + sse_timeout: Application.fetch_env!(:neurow, :sse_timeout), + internal_api_jwt_max_lifetime: + Application.fetch_env!(:neurow, :internal_api_jwt_max_lifetime), + public_api_jwt_max_lifetime: Application.fetch_env!(:neurow, :public_api_jwt_max_lifetime), + internal_api_verbose_authentication_errors: + Application.fetch_env!(:neurow, :internal_api_authentication)[ + :verbose_authentication_errors + ], + public_api_verbose_authentication_errors: + Application.fetch_env!(:neurow, :public_api_authentication)[ + :verbose_authentication_errors + ], + internal_api_audience: + Application.fetch_env!(:neurow, :internal_api_authentication)[:audience], + public_api_audience: Application.fetch_env!(:neurow, :public_api_authentication)[:audience] }} end @@ -54,6 +84,11 @@ defmodule Neurow.Configuration do {:reply, state[:internal_api][:issuer_jwks][issuer_name], state} end + @impl true + def handle_call({:static_param, key}, _from, state) do + {:reply, state[key], state} + end + defp build_issuer_jwks(api_authentication_scope) do Application.fetch_env!(:neurow, api_authentication_scope)[:issuers] |> Enum.map(fn {issuer_name, shared_secrets} -> diff --git a/neurow/lib/neurow/internal_api.ex b/neurow/lib/neurow/internal_api.ex index ce6a098..36d04ee 100644 --- a/neurow/lib/neurow/internal_api.ex +++ b/neurow/lib/neurow/internal_api.ex @@ -10,6 +10,8 @@ defmodule Neurow.InternalApi do audience: &Neurow.Configuration.internal_api_audience/0, verbose_authentication_errors: &Neurow.Configuration.internal_api_verbose_authentication_errors/0, + max_lifetime: &Neurow.Configuration.internal_api_jwt_max_lifetime/0, + count_error: &Stats.inc_jwt_errors_internal/0, exclude_path_prefixes: ["/ping", "/nodes", "/cluster_size_above"] ) @@ -49,27 +51,56 @@ defmodule Neurow.InternalApi do |> send_resp((cluster_size >= size && 200) || 404, "Cluster size: #{cluster_size}\n") end - post "v1/publish" do - issuer = conn.assigns[:jwt_payload]["iss"] + post "/v1/publish" do + case extract_params(conn) do + {:ok, message, topic} -> + message_id = to_string(:os.system_time(:millisecond)) - topic = "#{issuer}-#{conn.body_params["topic"]}" - message = conn.body_params["message"] + :ok = + Phoenix.PubSub.broadcast!(Neurow.PubSub, topic, {:pubsub_message, message_id, message}) - {:ok, body, _conn} = Plug.Conn.read_body(conn) - message_id = to_string(:os.system_time(:millisecond)) + Logger.debug("Message published on topic: #{topic}") + Stats.inc_msg_received() - :ok = - Phoenix.PubSub.broadcast!(Neurow.PubSub, topic, {:pubsub_message, message_id, message}) + conn + |> put_resp_header("content-type", "text/html") + |> send_resp(200, "Published #{message} to #{topic}\n") - Logger.debug("Message published on topic: #{topic}") - Stats.inc_msg_received() - - conn - |> put_resp_header("content-type", "text/html") - |> send_resp(200, "Published #{body} to #{topic}\n") + {:error, reason} -> + conn |> resp(:bad_request, reason) + end end match _ do send_resp(conn, 404, "") end + + defp extract_params(conn) do + with( + {:ok, issuer} <- extract_issuer(conn), + {:ok, message} <- extract_param(conn, "message"), + {:ok, topic} <- extract_param(conn, "topic") + ) do + full_topic = "#{issuer}-#{topic}" + {:ok, message, full_topic} + else + error -> error + end + end + + defp extract_issuer(conn) do + case conn.assigns[:jwt_payload]["iss"] do + nil -> {:error, "JWT iss is nil"} + "" -> {:error, "JWT iss is empty"} + issuer -> {:ok, issuer} + end + end + + defp extract_param(conn, key) do + case conn.body_params[key] do + nil -> {:error, "#{key} is nil"} + "" -> {:error, "#{key} is empty"} + output -> {:ok, output} + end + end end diff --git a/neurow/lib/neurow/jwt_auth_plug.ex b/neurow/lib/neurow/jwt_auth_plug.ex index 0e90537..447112d 100644 --- a/neurow/lib/neurow/jwt_auth_plug.ex +++ b/neurow/lib/neurow/jwt_auth_plug.ex @@ -7,8 +7,9 @@ defmodule Neurow.JwtAuthPlug do defstruct [ :jwk_provider, :audience, + :max_lifetime, + :count_error, allowed_algorithm: "HS256", - max_lifetime: 60 * 2, verbose_authentication_errors: false, exclude_path_prefixes: [] ] @@ -56,9 +57,11 @@ defmodule Neurow.JwtAuthPlug do conn |> assign(:jwt_payload, payload.fields) else {:error, code, message} -> + options.count_error.() conn |> forbidden(code, message, options) _ -> + options.count_error.() conn |> forbidden(:authentication_error, "Authentication error", options) end diff --git a/neurow/lib/neurow/public_api.ex b/neurow/lib/neurow/public_api.ex index 2210285..f251cb7 100644 --- a/neurow/lib/neurow/public_api.ex +++ b/neurow/lib/neurow/public_api.ex @@ -9,17 +9,31 @@ defmodule Neurow.PublicApi do jwk_provider: &Neurow.Configuration.public_api_issuer_jwks/1, audience: &Neurow.Configuration.public_api_audience/0, verbose_authentication_errors: - &Neurow.Configuration.public_api_verbose_authentication_errors/0 + &Neurow.Configuration.public_api_verbose_authentication_errors/0, + max_lifetime: &Neurow.Configuration.public_api_jwt_max_lifetime/0, + count_error: &Stats.inc_jwt_errors_public/0 ) plug(:match) plug(:dispatch) - get "v1/subscribe" do + get "/v1/subscribe" do case conn.assigns[:jwt_payload] do %{"iss" => issuer, "sub" => sub} -> topic = "#{issuer}-#{sub}" + timeout = + case conn.req_headers |> List.keyfind("x-sse-timeout", 0) do + nil -> Neurow.Configuration.sse_timeout() + {"x-sse-timeout", timeout} -> String.to_integer(timeout) + end + + keep_alive = + case conn.req_headers |> List.keyfind("x-sse-keepalive", 0) do + nil -> Neurow.Configuration.sse_keepalive() + {"x-sse-keepalive", keepalive} -> String.to_integer(keepalive) + end + conn = conn |> put_resp_header("content-type", "text/event-stream") @@ -27,6 +41,8 @@ defmodule Neurow.PublicApi do |> put_resp_header("connection", "close") |> put_resp_header("access-control-allow-origin", "*") |> put_resp_header("x-sse-server", to_string(node())) + |> put_resp_header("x-sse-timeout", to_string(timeout)) + |> put_resp_header("x-sse-keepalive", to_string(keep_alive)) :ok = Phoenix.PubSub.subscribe(Neurow.PubSub, topic) @@ -34,23 +50,47 @@ defmodule Neurow.PublicApi do Logger.debug("Client subscribed to #{topic}") - conn |> loop(Application.fetch_env!(:neurow, :sse_timeout)) + last_message = :os.system_time(:millisecond) + conn |> loop(timeout, keep_alive, last_message, last_message) Logger.debug("Client disconnected from #{topic}") conn _ -> - conn |> resp(:bad_request, "expected JWT claims are missing") + conn |> resp(:bad_request, "Expected JWT claims are missing") end end - defp loop(conn, sse_timeout) do + defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do receive do {:pubsub_message, msg_id, msg} -> {:ok, conn} = chunk(conn, "id: #{msg_id}\ndata: #{msg}\n\n") Stats.inc_msg_published() - loop(conn, sse_timeout) + new_last_message = :os.system_time(:millisecond) + loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message) after - sse_timeout -> :timeout + 1000 -> + now = :os.system_time(:millisecond) + + cond do + # SSE Timeout + now - last_message > sse_timeout -> + Logger.debug("Client disconnected due to inactivity") + :timeout + + # SSE Keep alive, send a ping + now - last_ping > keep_alive -> + chunk(conn, "event: ping\n\n") + loop(conn, sse_timeout, keep_alive, last_message, now) + + # We need to stop + StopListener.close_connections?() -> + chunk(conn, "event: reconnect\n\n") + :close + + # Nothing + true -> + loop(conn, sse_timeout, keep_alive, last_message, last_ping) + end end end diff --git a/neurow/lib/stats.ex b/neurow/lib/stats.ex index fda81cb..847c348 100644 --- a/neurow/lib/stats.ex +++ b/neurow/lib/stats.ex @@ -18,8 +18,16 @@ defmodule Stats do help: "SSE Messages" ) + Gauge.declare( + name: :jwt_errors, + labels: [:kind], + help: "JWT Errors" + ) + Gauge.set([name: :current_connections], 0) Gauge.set([name: :connections], 0) + Gauge.set([name: :jwt_errors, labels: [:public]], 0) + Gauge.set([name: :jwt_errors, labels: [:internal]], 0) Gauge.set([name: :messages, labels: [:received]], 0) Gauge.set([name: :messages, labels: [:published]], 0) end @@ -40,4 +48,12 @@ defmodule Stats do def inc_msg_published() do Gauge.inc(name: :messages, labels: [:published]) end + + def inc_jwt_errors_public() do + Gauge.inc(name: :jwt_errors, labels: [:public]) + end + + def inc_jwt_errors_internal() do + Gauge.inc(name: :jwt_errors, labels: [:internal]) + end end diff --git a/neurow/lib/stop_listener.ex b/neurow/lib/stop_listener.ex new file mode 100644 index 0000000..cff73b3 --- /dev/null +++ b/neurow/lib/stop_listener.ex @@ -0,0 +1,30 @@ +defmodule StopListener do + use GenServer + require Logger + + def start_link(opts) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @impl true + def init(_) do + :ets.new(__MODULE__, [:set, :named_table, read_concurrency: true]) + Process.flag(:trap_exit, true) + {:ok, %{shutdown_in_progress: false}} + end + + def close_connections?() do + try do + :ets.lookup(__MODULE__, :close_connections?) + false + rescue + ArgumentError -> true + end + end + + @impl GenServer + def terminate(_reason, _state) do + Logger.info("Graceful Shutdown occurring") + :ok + end +end diff --git a/neurow/test/jwt_helper.exs b/neurow/test/jwt_helper.exs new file mode 100644 index 0000000..4719079 --- /dev/null +++ b/neurow/test/jwt_helper.exs @@ -0,0 +1,51 @@ +defmodule JwtHelper do + use Plug.Test + + def signed_jwt_token(jwt, jwk) do + jws = %{ + "alg" => "HS256" + } + + signed = JOSE.JWT.sign(jwk, jws, jwt) + {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) + compact_signed + end + + def put_jwt_token_in_req_header(conn, jwt, jwk) do + jwt_token = signed_jwt_token(jwt, jwk) + conn |> put_req_header("authorization", "Bearer #{jwt_token}") + end + + def put_jwt_token_in_req_header_internal_api(conn, issuer \\ "test_issuer1") do + key = JOSE.JWK.from_oct("nLjJdNLlpdv3W4Xk7MyVCAZKD-hvza6FQ4yhUUFnjmg") + iat = :os.system_time(:second) + exp = iat + (2 * 60 - 1) + + jwt_payload = %{ + "iss" => issuer, + "exp" => exp, + "iat" => iat, + "aud" => "internal_api" + } + + conn + |> put_jwt_token_in_req_header(jwt_payload, key) + |> put_req_header("content-type", "application/json") + end + + def compute_jwt_token_in_req_header_public_api(topic, issuer \\ "test_issuer1") do + key = JOSE.JWK.from_oct("966KljJz--KyzyBnMOrFXfAkq9XMqWwPgdBV3cKTxsc") + iat = :os.system_time(:second) + exp = iat + (2 * 60 - 1) + + jwt_payload = %{ + "iss" => issuer, + "exp" => exp, + "iat" => iat, + "aud" => "public_api", + "sub" => topic + } + + signed_jwt_token(jwt_payload, key) + end +end diff --git a/neurow/test/neurow/internal_api_integration_test.exs b/neurow/test/neurow/internal_api_integration_test.exs new file mode 100644 index 0000000..7dda585 --- /dev/null +++ b/neurow/test/neurow/internal_api_integration_test.exs @@ -0,0 +1,30 @@ +defmodule Neurow.InternalApiIntegrationTest do + use ExUnit.Case + use Plug.Test + import JwtHelper + + test "POST /v1/publish 200" do + :ok = Phoenix.PubSub.subscribe(Neurow.PubSub, "test_issuer1-bar") + + {:ok, body} = Jason.encode(%{message: "foo56", topic: "bar"}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 200 + + {:ok, body} = Jason.encode(%{message: "foo57", topic: "bar"}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 200 + + assert_received {:pubsub_message, _, "foo56"} + assert_received {:pubsub_message, _, "foo57"} + end +end diff --git a/neurow/test/neurow/internal_api_unit_test.exs b/neurow/test/neurow/internal_api_unit_test.exs new file mode 100644 index 0000000..6df3397 --- /dev/null +++ b/neurow/test/neurow/internal_api_unit_test.exs @@ -0,0 +1,105 @@ +defmodule Neurow.InternalApiUnitTest do + use ExUnit.Case + use Plug.Test + import JwtHelper + + test "GET /ping" do + conn = conn(:get, "/ping") + call = Neurow.InternalApi.call(conn, []) + assert call.status == 200 + end + + test "GET /nodes" do + conn = conn(:get, "/nodes") + call = Neurow.InternalApi.call(conn, []) + assert call.status == 200 + end + + test "GET /foo 403" do + conn = conn(:get, "/foo") + call = Neurow.InternalApi.call(conn, []) + assert call.status == 403 + end + + test "GET /foo 404" do + conn = + conn(:get, "/foo") + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 404 + end + + test "GET /v1/publish 403" do + conn = conn(:post, "/v1/publish") + call = Neurow.InternalApi.call(conn, []) + assert call.status == 403 + end + + test "POST /v1/publish 400 nil message" do + conn = + conn(:post, "/v1/publish") + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 400 + assert call.resp_body == "message is nil" + end + + test "POST /v1/publish 400 empty message" do + {:ok, body} = Jason.encode(%{message: ""}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 400 + assert call.resp_body == "message is empty" + end + + test "POST /v1/publish 400 nil topic" do + {:ok, body} = Jason.encode(%{message: "foo"}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 400 + assert call.resp_body == "topic is nil" + end + + test "POST /v1/publish 400 empty topic" do + {:ok, body} = Jason.encode(%{message: "foo", topic: ""}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 400 + assert call.resp_body == "topic is empty" + end + + test "POST /v1/publish 200" do + {:ok, body} = Jason.encode(%{message: "foo", topic: "bar"}) + + conn = + conn(:post, "/v1/publish", body) + |> put_jwt_token_in_req_header_internal_api() + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 200 + assert call.resp_body == "Published foo to test_issuer1-bar\n" + end + + test "POST /v1/publish 403" do + conn = + conn(:post, "/v1/publish") + |> put_jwt_token_in_req_header_internal_api("test_issuer_2") + + call = Neurow.InternalApi.call(conn, []) + assert call.status == 403 + end +end diff --git a/neurow/test/jwt_auth_plug_test.exs b/neurow/test/neurow/jwt_auth_plug_test.exs similarity index 96% rename from neurow/test/jwt_auth_plug_test.exs rename to neurow/test/neurow/jwt_auth_plug_test.exs index 8471570..05e8c7e 100644 --- a/neurow/test/jwt_auth_plug_test.exs +++ b/neurow/test/neurow/jwt_auth_plug_test.exs @@ -1,7 +1,7 @@ defmodule Neurow.JwtAuthPlugTest do - alias Neurow.JwtAuthPlug use ExUnit.Case use Plug.Test + import JwtHelper # Can be generated with `JOSE.JWS.generate_key(%{"alg" => "HS256"}) |> JOSE.JWK.to_map |> elem(1)` @issuer_1_jwk_1 JOSE.JWK.from_oct("r0daWG1tSxMTSzD4MuxwMe46h19_cEhMmrn5mKLncKk") @@ -17,6 +17,7 @@ defmodule Neurow.JwtAuthPlugTest do Neurow.JwtAuthPlug.init(%{ audience: @test_audience, verbose_authentication_errors: true, + max_lifetime: 60 * 2, jwk_provider: fn issuer -> case issuer do "issuer_1" -> [@issuer_1_jwk_1, @issuer_1_jwk_2] @@ -24,6 +25,7 @@ defmodule Neurow.JwtAuthPlugTest do _ -> nil end end, + count_error: fn -> :ok end, exclude_path_prefixes: ["/excluded_path"] })} end @@ -80,7 +82,7 @@ defmodule Neurow.JwtAuthPlugTest do response = Neurow.JwtAuthPlug.call( conn(:get, "/test") |> put_req_header("authorization", "Basic dXNlcjpwYXNzd29yZA=="), - %JwtAuthPlug.Options{opts | verbose_authentication_errors: false} + %Neurow.JwtAuthPlug.Options{opts | verbose_authentication_errors: false} ) assert response.halted @@ -463,11 +465,6 @@ defmodule Neurow.JwtAuthPlugTest do end end - defp error_code(response) do - {:ok, json_body} = Jason.decode(response.resp_body) - json_body["errors"] |> Enum.at(0) |> Map.get("error_code") - end - defp valid_issuer_1_jwt_payload() do iat = :os.system_time(:second) exp = iat + (2 * 60 - 1) @@ -481,18 +478,8 @@ defmodule Neurow.JwtAuthPlugTest do } end - defp signed_jwt_token(jwt, jwk) do - jws = %{ - "alg" => "HS256" - } - - signed = JOSE.JWT.sign(jwk, jws, jwt) - {%{alg: :jose_jws_alg_hmac}, compact_signed} = JOSE.JWS.compact(signed) - compact_signed - end - - defp put_jwt_token_in_req_header(conn, jwt, jwk) do - jwt_token = signed_jwt_token(jwt, jwk) - conn |> put_req_header("authorization", "Bearer #{jwt_token}") + defp error_code(response) do + {:ok, json_body} = Jason.decode(response.resp_body) + json_body["errors"] |> Enum.at(0) |> Map.get("error_code") end end diff --git a/neurow/test/neurow/public_api_integration_test.exs b/neurow/test/neurow/public_api_integration_test.exs new file mode 100644 index 0000000..43268f5 --- /dev/null +++ b/neurow/test/neurow/public_api_integration_test.exs @@ -0,0 +1,162 @@ +defmodule Neurow.PublicApiIntegrationTest do + use ExUnit.Case + use Plug.Test + import JwtHelper + + test "GET /v1/subscribe 403" do + conn = + conn(:get, "/v1/subscribe") + + call = Neurow.PublicApi.call(conn, []) + assert call.status == 403 + end + + defp publish(topic, id, message) do + :ok = Phoenix.PubSub.broadcast!(Neurow.PubSub, topic, {:pubsub_message, id, message}) + end + + def next_message(timeout \\ 100) do + receive do + {:http, {_, {:error, msg}}} -> + raise("Http error: #{inspect(msg)}") + + {:http, {_, :stream, msg}} -> + {:stream, msg} + + {:http, {_, :stream_start, headers}} -> + {:start, headers} + + {:http, {_, :stream_end, _}} -> + {:end} + + msg -> + raise("Unexpected message: #{inspect(msg)}") + after + timeout -> + raise("Timeout waiting for message") + end + end + + defp assert_headers(headers, {key, value}) do + assert {to_charlist(key), to_charlist(value)} in headers + end + + test "GET /v1/subscribe 200 no message" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo56")}"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + :ok = :httpc.cancel_request(request_id) + end + + test "GET /v1/subscribe 200 timeout" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo56")}"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + assert_raise RuntimeError, ~r/^Timeout waiting for message$/, fn -> + next_message() + end + + :ok = :httpc.cancel_request(request_id) + end + + test "GET /v1/subscribe 200 two messages" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + publish("test_issuer1-foo57", "42", "hello") + Process.sleep(10) + publish("test_issuer1-foo57", "43", "hello2") + + {:stream, msg} = next_message() + assert msg == "id: 42\ndata: hello\n\n" + {:stream, msg} = next_message() + assert msg == "id: 43\ndata: hello2\n\n" + :ok = :httpc.cancel_request(request_id) + end + + test "GET /v1/subscribe 200 sse keepalive" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"}, + {["x-sse-keepalive"], "100"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + publish("test_issuer1-foo57", "42", "hello") + Process.sleep(1100) + + {:stream, msg} = next_message() + assert msg == "id: 42\ndata: hello\n\n" + {:stream, msg} = next_message() + assert msg == "event: ping\n\n" + Process.sleep(1100) + {:stream, msg} = next_message() + assert msg == "event: ping\n\n" + :ok = :httpc.cancel_request(request_id) + end + + test "GET /v1/subscribe 200 sse timeout" do + url = "http://localhost:4000/v1/subscribe" + + headers = [ + {["Authorization"], "Bearer #{compute_jwt_token_in_req_header_public_api("foo57")}"}, + {["x-sse-timeout"], "100"} + ] + + {:ok, request_id} = + :httpc.request(:get, {url, headers}, [], [{:sync, false}, {:stream, :self}]) + + {:start, headers} = next_message() + assert_headers(headers, {"content-type", "text/event-stream"}) + assert_headers(headers, {"cache-control", "no-cache"}) + assert_headers(headers, {"connection", "close"}) + + Process.sleep(1100) + + {:stream, msg} = next_message() + assert msg == "" + {:end} = next_message() + :ok = :httpc.cancel_request(request_id) + end +end diff --git a/neurow/test/neurow_test.exs b/neurow/test/neurow_test.exs deleted file mode 100644 index 9003242..0000000 --- a/neurow/test/neurow_test.exs +++ /dev/null @@ -1,8 +0,0 @@ -defmodule NeurowTest do - use ExUnit.Case - doctest Neurow - - test "greets the world" do - assert Neurow.hello() == :world - end -end diff --git a/neurow/test/test_helper.exs b/neurow/test/test_helper.exs index 869559e..e989bad 100644 --- a/neurow/test/test_helper.exs +++ b/neurow/test/test_helper.exs @@ -1 +1,2 @@ +Code.require_file("test/jwt_helper.exs") ExUnit.start()