Skip to content

Commit

Permalink
Stop SSE connections when the JWT token expires (#24)
Browse files Browse the repository at this point in the history
With this PR, the SSE connection is closed when the JWT token used to
authenticate actually expires. Previously, the JWT token expiration was
only checked when the SSE connection is established.

Benefits:
- Improve overall security by preventing over-usage of JWT tokens,
- Allow backends that issue tokens to have their own expiration policy
(which must still be lower than the global max lifetime configured in
Neurow). For exemple, it allows the backend to use an `exp` that matches
the duration when users are actually considered as active on the
frontend.
  • Loading branch information
achouippe authored Sep 28, 2024
1 parent 3355772 commit cadbb68
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
2 changes: 1 addition & 1 deletion neurow/integration_test/sse_livecycle_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ defmodule Neurow.IntegrationTest.SseLifecycleTest do
{"transfer-encoding", "chunked"}
])

assert_receive %HTTPoison.AsyncChunk{chunk: timeout_sse_event}, 4_000
assert_receive %HTTPoison.AsyncChunk{chunk: timeout_sse_event}, 4_200

assert "timeout" == parse_sse_event(timeout_sse_event).event

Expand Down
28 changes: 14 additions & 14 deletions neurow/lib/neurow/public_api/endpoint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule Neurow.PublicApi.Endpoint do

defp subscribe(conn) do
case conn.assigns[:jwt_payload] do
%{"iss" => issuer, "sub" => sub} ->
%{"iss" => issuer, "sub" => sub, "exp" => exp} ->
topic = "#{issuer}-#{sub}"

timeout =
Expand Down Expand Up @@ -79,7 +79,7 @@ defmodule Neurow.PublicApi.Endpoint do
Logger.debug("Client subscribed to #{topic}")

last_message = :os.system_time(:millisecond)
conn |> loop(timeout, keep_alive, last_message, last_message)
conn |> loop(timeout, keep_alive, last_message, last_message, exp)
Logger.debug("Client disconnected from #{topic}")
conn
end
Expand Down Expand Up @@ -185,11 +185,6 @@ defmodule Neurow.PublicApi.Endpoint do
{_, message} = first

if message.timestamp > last_event_id do
if sent == 0 do
# Workaround: avoid to loose messages in tests
Process.sleep(1)
end

conn = write_chunk(conn, message)
process_history(conn, last_event_id, sent + 1, rest)
else
Expand All @@ -201,28 +196,33 @@ defmodule Neurow.PublicApi.Endpoint do
{conn, sent}
end

defp loop(conn, sse_timeout, keep_alive, last_message, last_ping) do
defp loop(conn, sse_timeout, keep_alive, last_message, last_ping, jwt_exp) do
receive do
{:pubsub_message, message} ->
conn = write_chunk(conn, message)
Stats.inc_msg_published()
new_last_message = :os.system_time(:millisecond)
loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message)
loop(conn, sse_timeout, keep_alive, new_last_message, new_last_message, jwt_exp)
after
1000 ->
now = :os.system_time(:millisecond)
now_ms = :os.system_time(:millisecond)

cond do
# SSE Timeout
now - last_message > sse_timeout ->
now_ms - last_message > sse_timeout ->
Logger.debug("Client disconnected due to inactivity")
chunk(conn, "event: timeout\n\n")
:timeout

# SSE Keep alive, send a ping
now - last_ping > keep_alive ->
now_ms - last_ping > keep_alive ->
chunk(conn, "event: ping\n\n")
loop(conn, sse_timeout, keep_alive, last_message, now)
loop(conn, sse_timeout, keep_alive, last_message, now_ms, jwt_exp)

# JWT token expired
jwt_exp * 1000 < now_ms ->
chunk(conn, "event: credentials_expired\n\n")
:close

# We need to stop
StopListener.close_connections?() ->
Expand All @@ -231,7 +231,7 @@ defmodule Neurow.PublicApi.Endpoint do

# Nothing
true ->
loop(conn, sse_timeout, keep_alive, last_message, last_ping)
loop(conn, sse_timeout, keep_alive, last_message, last_ping, jwt_exp)
end
end
end
Expand Down
7 changes: 5 additions & 2 deletions neurow/test/jwt_helper.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ defmodule JwtHelper do
signed_jwt_token(jwt_payload, key)
end

def compute_jwt_token_in_req_header_public_api(topic, issuer \\ "test_issuer1") do
def compute_jwt_token_in_req_header_public_api(topic, options \\ []) do
issuer = Keyword.get(options, :issuer, "test_issuer1")
duration_s = Keyword.get(options, :duration_s, 2 * 60 - 1)

key = JOSE.JWK.from_oct("966KljJz--KyzyBnMOrFXfAkq9XMqWwPgdBV3cKTxsc")
iat = :os.system_time(:second)
exp = iat + (2 * 60 - 1)
exp = iat + duration_s

jwt_payload = %{
"iss" => issuer,
Expand Down
18 changes: 18 additions & 0 deletions neurow/test/neurow/public_api/endpoint_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,24 @@ defmodule Neurow.PublicApi.EndpointTest do
assert event_2.event == "ping"
end)
end

test "the client is disconnected when the JWT token expires" do
conn =
conn(:get, "/v1/subscribe")
|> put_req_header(
"authorization",
"Bearer #{compute_jwt_token_in_req_header_public_api("test_topic1", duration_s: 3)}"
)

call(Neurow.PublicApi.Endpoint, conn, fn ->
assert_receive {:send_chunked, 200}

assert_receive {:chunk, body}, 5_000
assert parse_sse_event(body).event == "credentials_expired"

assert_receive {:DOWN, _reference, :process, _pid, :normal}
end)
end
end

describe "preflight requests" do
Expand Down

0 comments on commit cadbb68

Please sign in to comment.