From 126da833144d5ef19fc5f3c675a007c8b56417ea Mon Sep 17 00:00:00 2001 From: Marius Melzer Date: Sat, 20 Jun 2020 19:48:44 +0200 Subject: [PATCH 1/4] Add TURN support If SIGNALTOWER_TURN_SECRET is set, the signaltower add to each joined_room message a turn_user and turn_password field that allows the client to use the TURN server for 3 hours. In the following 3h, the client will always get the same login/token with the same end time back when it joins another room. --- README.md | 15 +++++ lib/signal_tower/room.ex | 79 ++++++++++++++++++--------- lib/signal_tower/session.ex | 34 ++++++------ lib/signal_tower/websocket_handler.ex | 20 +++---- test/room_test.exs | 60 +++++++++++++++++--- test/session_test.exs | 22 ++++---- 6 files changed, 158 insertions(+), 72 deletions(-) diff --git a/README.md b/README.md index ee791e7..baa4a54 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,19 @@ mix test.watch mix release production ``` +**Options:** + +To use a Turn Server, generate a secret key string, e.g. via `openssl rand -base64 30` and set: +``` +export SIGNALTOWER_TURN_SECRET= +``` +The same secret key must be configured in the turn server. +For example for coturn, use the following configuration in turnserver.conf: +``` +use-auth-secret +static-auth-secret= +``` + By default, the websocket port 4233 is used, you can change it via: ``` export SIGNALTOWER_PORT=1234 @@ -47,6 +60,8 @@ By default, the websocket is bound to all interfaces (0.0.0.0), you can also bin export SIGNALTOWER_LOCALHOST ``` +## References + [palava protocol]: https://github.com/palavatv/palava-client/wiki/Protocol [palava client]: https://github.com/palavatv/palava-client/ [palava project]: https://github.com/palavatv/palava diff --git a/lib/signal_tower/room.ex b/lib/signal_tower/room.ex index 0386f74..90435b5 100644 --- a/lib/signal_tower/room.ex +++ b/lib/signal_tower/room.ex @@ -9,7 +9,7 @@ defmodule SignalTower.Room do def start_link(room_id) do name = "room_#{room_id}" |> String.to_atom() - GenServer.start_link(__MODULE__, room_id, name: name) + GenServer.start_link(__MODULE__, :ok, name: name) end def create(room_id) do @@ -19,32 +19,36 @@ defmodule SignalTower.Room do end end - def join_and_monitor(room_id, status) do + def join_and_monitor(room_id, status, last_turn_timestamp) do room_pid = create(room_id) Process.monitor(room_pid) - own_id = GenServer.call(room_pid, {:join, self(), status}) - %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status} + + {own_id, new_turn_timestamp} = + GenServer.call(room_pid, {:join, self(), status, last_turn_timestamp}) + + membership = %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status} + {membership, new_turn_timestamp} end ## Callbacks ## @impl GenServer - def init(room_id) do + def init(_) do GenServer.cast(Stats, {:room_created, self()}) - {:ok, {room_id, %{}}} + {:ok, %{}} end @impl GenServer - def handle_call({:join, pid, status}, _, {room_id, members}) do + def handle_call({:join, pid, status, last_turn_timestamp}, _, members) do GenServer.cast(Stats, {:peer_joined, self(), map_size(members) + 1}) Process.monitor(pid) peer_id = UUID.uuid1() - send_joined_room(pid, peer_id, members) + new_turn_timestamp = send_joined_room(pid, peer_id, members, last_turn_timestamp) send_new_peer(members, peer_id, status) new_member = %Member{peer_id: peer_id, pid: pid, status: status} - {:reply, peer_id, {room_id, Map.put(members, peer_id, new_member)}} + {:reply, {peer_id, new_turn_timestamp}, Map.put(members, peer_id, new_member)} end @impl GenServer @@ -62,16 +66,16 @@ defmodule SignalTower.Room do end @impl GenServer - def handle_cast({:send_to_peer, peer_id, msg, sender_id}, state = {_, members}) do + def handle_cast({:send_to_peer, peer_id, msg, sender_id}, members) do if members[sender_id] && members[peer_id] do send(members[peer_id].pid, {:to_user, Map.put(msg, :sender_id, sender_id)}) end - {:noreply, state} + {:noreply, members} end @impl GenServer - def handle_cast({:update_status, sender_id, status}, state = {_, members}) do + def handle_cast({:update_status, sender_id, status}, members) do if members[sender_id] do update_status = %{ event: "peer_updated_status", @@ -83,41 +87,41 @@ defmodule SignalTower.Room do |> send_to_all(update_status) end - {:noreply, state} + {:noreply, members} end # invoked when a user session exits @impl GenServer - def handle_info({:DOWN, _ref, _, pid, _}, state = {_, members}) do + def handle_info({:DOWN, _ref, _, pid, _}, members) do members |> Enum.find(fn {_, member} -> pid == member.pid end) |> case do {id, _} -> - case leave(id, state) do + case leave(id, members) do {:ok, state} -> {:noreply, state} {:error, state} -> {:noreply, state} {:stop, state} -> {:stop, :normal, state} end _ -> - {:noreply, state} + {:noreply, members} end end - defp leave(peer_id, state = {room_id, members}) do + defp leave(peer_id, members) do if members[peer_id] do GenServer.cast(Stats, {:peer_left, self()}) next_members = Map.delete(members, peer_id) if map_size(next_members) > 0 do send_peer_left(next_members, peer_id) - {:ok, {room_id, next_members}} + {:ok, next_members} else GenServer.cast(Stats, {:room_closed, self()}) - {:stop, {room_id, next_members}} + {:stop, next_members} end else - {:error, state} + {:error, members} end end @@ -128,14 +132,35 @@ defmodule SignalTower.Room do end) end - defp send_joined_room(pid, peer_id, members) do - response_for_joined_peer = %{ - event: "joined_room", - own_id: peer_id, - peers: members |> Map.values() - } + defp send_joined_room(pid, own_id, members, last_turn_timestamp) do + now = System.os_time(:second) + + {turn_response, next_turn_timestamp} = + if System.get_env("SIGNALTOWER_TURN_SECRET") && last_turn_timestamp < now do + next_timestamp = now + 3 * 60 * 60 + user = to_string(next_timestamp) <> ":" <> own_id + secret = System.get_env("SIGNALTOWER_TURN_SECRET") + + response = %{ + turn_user: user, + turn_password: + :crypto.mac(:hmac, :sha, to_charlist(secret), to_charlist(user)) |> Base.encode64() + } + + {response, next_timestamp} + else + {%{}, last_turn_timestamp} + end + + joined_response = + Map.merge(turn_response, %{ + event: "joined_room", + own_id: own_id, + peers: members |> Map.values() + }) - send(pid, {:to_user, response_for_joined_peer}) + send(pid, {:to_user, joined_response}) + next_turn_timestamp end defp send_new_peer(members, peer_id, status) do diff --git a/lib/signal_tower/session.ex b/lib/signal_tower/session.ex index 4d2e7c3..ce9fbc7 100644 --- a/lib/signal_tower/session.ex +++ b/lib/signal_tower/session.ex @@ -11,10 +11,10 @@ defmodule SignalTower.Session do |> (&Process.register(self(), &1)).() end - def handle_message(msg, room) do + def handle_message(msg, {room, ltt}) do case MsgIntegrity.check(msg, room) do {:ok, msg} -> - incoming_message(msg, room) + incoming_message(msg, {room, ltt}) {:error, error} -> send_error(error, msg) @@ -22,37 +22,37 @@ defmodule SignalTower.Session do end end - defp incoming_message(msg = %{"event" => "join_room"}, _) do - Room.join_and_monitor(msg["room_id"], msg["status"]) + defp incoming_message(msg = %{"event" => "join_room"}, {_, last_turn_timestamp}) do + Room.join_and_monitor(msg["room_id"], msg["status"], last_turn_timestamp) end - defp incoming_message(msg = %{"event" => "leave_room"}, room) do + defp incoming_message(msg = %{"event" => "leave_room"}, {room, ltt}) do if room do case GenServer.call(room.pid, {:leave, room.own_id}) do :ok -> - nil + {nil, ltt} :error -> send_error("You are not currently in a room, so you can not leave it", msg) - room + {room, ltt} end else send_error("You are not currently in a room, so you can not leave it", msg) - room + {room, ltt} end end - defp incoming_message(msg = %{"event" => "send_to_peer"}, room) do + defp incoming_message(msg = %{"event" => "send_to_peer"}, {room, ltt}) do GenServer.cast(room.pid, {:send_to_peer, msg["peer_id"], msg["data"], room.own_id}) - room + {room, ltt} end - defp incoming_message(msg = %{"event" => "update_status"}, room) do + defp incoming_message(msg = %{"event" => "update_status"}, {room, ltt}) do GenServer.cast(room.pid, {:update_status, room.own_id, msg["status"]}) - room + {room, ltt} end - defp incoming_message(%{"event" => "ping"}, room) do + defp incoming_message(%{"event" => "ping"}, state) do send( self(), {:to_user, @@ -61,16 +61,16 @@ defmodule SignalTower.Session do }} ) - room + state end # invoked when a room exits - def handle_exit_message(pid, room, status) do + def handle_exit_message(pid, room, status, ltt) do if room && pid == room.pid && status != :normal do # current room died => automatic rejoin - Room.join_and_monitor(room.id, room.own_status) + Room.join_and_monitor(room.id, room.own_status, ltt) else - nil + {nil, ltt} end end diff --git a/lib/signal_tower/websocket_handler.ex b/lib/signal_tower/websocket_handler.ex index 171a452..37e8698 100644 --- a/lib/signal_tower/websocket_handler.ex +++ b/lib/signal_tower/websocket_handler.ex @@ -6,25 +6,25 @@ defmodule SignalTower.WebsocketHandler do @impl :cowboy_websocket def init(req, _state) do - {:cowboy_websocket, req, nil, %{idle_timeout: :timer.seconds(30)}} + {:cowboy_websocket, req, {nil, 0}, %{idle_timeout: :timer.seconds(30)}} end @impl :cowboy_websocket - def websocket_init(state) do + def websocket_init({room, last_turn_timestamp}) do Session.init() :timer.send_interval(:timer.seconds(5), :send_ping) - {:ok, state} + {:ok, {room, last_turn_timestamp}} end @impl :cowboy_websocket - def websocket_handle({:text, msg}, room) do + def websocket_handle({:text, msg}, state) do case Poison.decode(msg) do {:ok, parsed_msg} -> - {:ok, Session.handle_message(parsed_msg, room)} + {:ok, Session.handle_message(parsed_msg, state)} _ -> answer = Poison.encode!(%{event: "error", description: "invalid json", received_msg: msg}) - {:reply, {:text, answer}, room} + {:reply, {:text, answer}, state} end end @@ -53,13 +53,13 @@ defmodule SignalTower.WebsocketHandler do end @impl :cowboy_websocket - def websocket_info({:DOWN, _, _, pid, status}, room) do - {:ok, Session.handle_exit_message(pid, room, status)} + def websocket_info({:DOWN, _, _, pid, status}, {room, ltt}) do + {:ok, Session.handle_exit_message(pid, room, status, ltt)} end @impl :cowboy_websocket - def websocket_info(:send_ping, status) do - {:reply, {:ping, "server ping"}, status} + def websocket_info(:send_ping, state) do + {:reply, {:ping, "server ping"}, state} end @impl :cowboy_websocket diff --git a/test/room_test.exs b/test/room_test.exs index 435905c..f003ac7 100644 --- a/test/room_test.exs +++ b/test/room_test.exs @@ -23,7 +23,7 @@ defmodule RoomTest do _user2 = spawn_user_no_join(fn -> - GenServer.call(room_pid, {:join, self(), %{user: "1"}}) + GenServer.call(room_pid, {:join, self(), %{user: "1"}, 0}) assert_receive {:to_user, %{ @@ -39,9 +39,55 @@ defmodule RoomTest do wait_for_breaks(2) end - test "send to peer" do + test "join room with turn" do + System.put_env("SIGNALTOWER_TURN_SECRET", "verysecretpassphrase1234") room_pid = create_room("r-room3") + go = fn timestamp_before -> + spawn_user_no_join(fn -> + GenServer.call(room_pid, {:join, self(), %{user: "1"}, 0}) + + assert_receive( + {:to_user, + %{ + event: "joined_room", + own_id: own_id, + turn_user: user, + turn_password: pw + }}, + 1000 + ) + + [timestamp_str, id] = String.split(user, ":") + {timestamp, ""} = Integer.parse(timestamp_str) + assert own_id == id + assert System.os_time(:second) < timestamp + assert timestamp < System.os_time(:second) + 3 * 60 * 60 + 10 + assert timestamp_before <= timestamp + + assert pw == + :crypto.mac( + :hmac, + :sha, + to_charlist("verysecretpassphrase1234"), + to_charlist(user) + ) + |> Base.encode64() + end) + end + + go.(0) + # is depleted + go.(System.os_time(:second) - 200) + # is still valid + go.(System.os_time(:second) + 200) + + wait_for_breaks(2) + end + + test "send to peer" do + room_pid = create_room("r-room4") + user1 = spawn_user(room_pid, fn own_id -> assert_receive {:to_user, %{event: "new_peer"}}, 1000 @@ -67,7 +113,7 @@ defmodule RoomTest do end test "update status" do - room_pid = create_room("r-room4") + room_pid = create_room("r-room5") join_room(self(), room_pid) spawn_user(room_pid, fn own_id -> @@ -87,7 +133,7 @@ defmodule RoomTest do end test "leave room" do - room_pid = create_room("r-room5") + room_pid = create_room("r-room6") join_room(self(), room_pid) spawn_user(room_pid, fn own_id -> @@ -108,7 +154,7 @@ defmodule RoomTest do # session process dies test "user leaves room when his session dies" do - room_pid = create_room("r-room6") + room_pid = create_room("r-room7") join_room(self(), room_pid) spawn_link(fn -> @@ -128,7 +174,7 @@ defmodule RoomTest do end test "room exits when last active user is gone" do - room_pid = create_room("r-room7") + room_pid = create_room("r-room8") Process.monitor(room_pid) own_id = join_room(self(), room_pid) GenServer.call(room_pid, {:leave, own_id}) @@ -140,7 +186,7 @@ defmodule RoomTest do end defp join_room(pid, room_pid) do - GenServer.call(room_pid, {:join, pid, %{standard: "status"}}) + GenServer.call(room_pid, {:join, pid, %{standard: "status"}, 0}) assert_receive {:to_user, %{event: "joined_room", own_id: own_id}}, 1000 own_id end diff --git a/test/session_test.exs b/test/session_test.exs index b2b7cc9..5ddc915 100644 --- a/test/session_test.exs +++ b/test/session_test.exs @@ -15,7 +15,7 @@ defmodule SessionTest do "room_id" => "s-room1", "status" => %{user: "0"} }, - nil + {nil, 0} ) assert_receive {:to_user, @@ -54,7 +54,7 @@ defmodule SessionTest do "room_id" => "s-room1", "status" => %{user: "1"} }, - nil + {nil, 0} ) assert_receive {:to_user, @@ -77,7 +77,7 @@ defmodule SessionTest do "event" => "leave_room", "room_id" => "s-room13" }, - room + {room, 0} ) end) @@ -100,7 +100,7 @@ defmodule SessionTest do "peer_id" => peer_id, "data" => %{some: "data"} }, - room + {room, 0} ) end end) @@ -128,7 +128,7 @@ defmodule SessionTest do "event" => "update_status", "status" => %{some: "status"} }, - room + {room, 0} ) end) @@ -155,7 +155,7 @@ defmodule SessionTest do "event" => "update_status", "status" => %{new: "status"} }, - nil + {nil, 0} ) assert_receive {:to_user, m = %{event: "error"}} @@ -167,7 +167,7 @@ defmodule SessionTest do "peer_id" => "some_peer", "data" => %{some: "data"} }, - room + {room, 0} ) assert_receive {:to_user, m = %{event: "error"}} @@ -182,7 +182,7 @@ defmodule SessionTest do %{ "event" => "ping" }, - nil + {nil, 0} ) assert_receive {:to_user, %{event: "pong"}} @@ -193,7 +193,7 @@ defmodule SessionTest do %{ "event" => "unknown" }, - nil + {nil, 0} ) assert_receive {:to_user, %{event: "error"}} @@ -224,14 +224,14 @@ defmodule SessionTest do end defp join_room(room_id, host) do - room = + {room, _} = Session.handle_message( %{ "event" => "join_room", "room_id" => room_id, "status" => %{local: "status"} }, - nil + {nil, 0} ) if host, do: send(host, :start) From 5c570f0c101547ebfc5818311b2c8b3a747b4793 Mon Sep 17 00:00:00 2001 From: Marius Melzer Date: Thu, 25 Jun 2020 16:31:37 +0200 Subject: [PATCH 2/4] Improve readibility of session state Make the session state a map and document in the websocket init the members. --- lib/signal_tower/room.ex | 29 +++++++++++------------ lib/signal_tower/session.ex | 34 +++++++++++++-------------- lib/signal_tower/websocket_handler.ex | 17 ++++++++++---- test/session_test.exs | 26 ++++++++++---------- 4 files changed, 57 insertions(+), 49 deletions(-) diff --git a/lib/signal_tower/room.ex b/lib/signal_tower/room.ex index 90435b5..969833e 100644 --- a/lib/signal_tower/room.ex +++ b/lib/signal_tower/room.ex @@ -19,15 +19,14 @@ defmodule SignalTower.Room do end end - def join_and_monitor(room_id, status, last_turn_timestamp) do + def join_and_monitor(room_id, status, turn_timeout) do room_pid = create(room_id) Process.monitor(room_pid) - {own_id, new_turn_timestamp} = - GenServer.call(room_pid, {:join, self(), status, last_turn_timestamp}) + {own_id, new_turn_timeout} = GenServer.call(room_pid, {:join, self(), status, turn_timeout}) membership = %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status} - {membership, new_turn_timestamp} + %{room: membership, turn_timeout: new_turn_timeout} end ## Callbacks ## @@ -39,16 +38,16 @@ defmodule SignalTower.Room do end @impl GenServer - def handle_call({:join, pid, status, last_turn_timestamp}, _, members) do + def handle_call({:join, pid, status, turn_timeout}, _, members) do GenServer.cast(Stats, {:peer_joined, self(), map_size(members) + 1}) Process.monitor(pid) peer_id = UUID.uuid1() - new_turn_timestamp = send_joined_room(pid, peer_id, members, last_turn_timestamp) + new_turn_timeout = send_joined_room(pid, peer_id, members, turn_timeout) send_new_peer(members, peer_id, status) new_member = %Member{peer_id: peer_id, pid: pid, status: status} - {:reply, {peer_id, new_turn_timestamp}, Map.put(members, peer_id, new_member)} + {:reply, {peer_id, new_turn_timeout}, Map.put(members, peer_id, new_member)} end @impl GenServer @@ -132,13 +131,13 @@ defmodule SignalTower.Room do end) end - defp send_joined_room(pid, own_id, members, last_turn_timestamp) do + defp send_joined_room(pid, own_id, members, turn_timeout) do now = System.os_time(:second) - {turn_response, next_turn_timestamp} = - if System.get_env("SIGNALTOWER_TURN_SECRET") && last_turn_timestamp < now do - next_timestamp = now + 3 * 60 * 60 - user = to_string(next_timestamp) <> ":" <> own_id + {turn_response, next_turn_timeout} = + if System.get_env("SIGNALTOWER_TURN_SECRET") && turn_timeout < now do + next_timeout = now + 3 * 60 * 60 + user = to_string(next_timeout) <> ":" <> own_id secret = System.get_env("SIGNALTOWER_TURN_SECRET") response = %{ @@ -147,9 +146,9 @@ defmodule SignalTower.Room do :crypto.mac(:hmac, :sha, to_charlist(secret), to_charlist(user)) |> Base.encode64() } - {response, next_timestamp} + {response, next_timeout} else - {%{}, last_turn_timestamp} + {%{}, turn_timeout} end joined_response = @@ -160,7 +159,7 @@ defmodule SignalTower.Room do }) send(pid, {:to_user, joined_response}) - next_turn_timestamp + next_turn_timeout end defp send_new_peer(members, peer_id, status) do diff --git a/lib/signal_tower/session.ex b/lib/signal_tower/session.ex index ce9fbc7..2673e83 100644 --- a/lib/signal_tower/session.ex +++ b/lib/signal_tower/session.ex @@ -11,45 +11,45 @@ defmodule SignalTower.Session do |> (&Process.register(self(), &1)).() end - def handle_message(msg, {room, ltt}) do - case MsgIntegrity.check(msg, room) do + def handle_message(msg, state) do + case MsgIntegrity.check(msg, state.room) do {:ok, msg} -> - incoming_message(msg, {room, ltt}) + incoming_message(msg, state) {:error, error} -> send_error(error, msg) - room + state end end - defp incoming_message(msg = %{"event" => "join_room"}, {_, last_turn_timestamp}) do - Room.join_and_monitor(msg["room_id"], msg["status"], last_turn_timestamp) + defp incoming_message(msg = %{"event" => "join_room"}, state) do + Room.join_and_monitor(msg["room_id"], msg["status"], state.turn_timeout) end - defp incoming_message(msg = %{"event" => "leave_room"}, {room, ltt}) do + defp incoming_message(msg = %{"event" => "leave_room"}, state = %{room: room}) do if room do case GenServer.call(room.pid, {:leave, room.own_id}) do :ok -> - {nil, ltt} + %{state | room: nil} :error -> send_error("You are not currently in a room, so you can not leave it", msg) - {room, ltt} + state end else send_error("You are not currently in a room, so you can not leave it", msg) - {room, ltt} + state end end - defp incoming_message(msg = %{"event" => "send_to_peer"}, {room, ltt}) do + defp incoming_message(msg = %{"event" => "send_to_peer"}, state = %{room: room}) do GenServer.cast(room.pid, {:send_to_peer, msg["peer_id"], msg["data"], room.own_id}) - {room, ltt} + state end - defp incoming_message(msg = %{"event" => "update_status"}, {room, ltt}) do + defp incoming_message(msg = %{"event" => "update_status"}, state = %{room: room}) do GenServer.cast(room.pid, {:update_status, room.own_id, msg["status"]}) - {room, ltt} + state end defp incoming_message(%{"event" => "ping"}, state) do @@ -65,12 +65,12 @@ defmodule SignalTower.Session do end # invoked when a room exits - def handle_exit_message(pid, room, status, ltt) do + def handle_exit_message(pid, status, state = %{room: room, turn_timeout: turn_timeout}) do if room && pid == room.pid && status != :normal do # current room died => automatic rejoin - Room.join_and_monitor(room.id, room.own_status, ltt) + Room.join_and_monitor(room.id, room.own_status, turn_timeout) else - {nil, ltt} + state end end diff --git a/lib/signal_tower/websocket_handler.ex b/lib/signal_tower/websocket_handler.ex index 37e8698..deedbde 100644 --- a/lib/signal_tower/websocket_handler.ex +++ b/lib/signal_tower/websocket_handler.ex @@ -6,14 +6,21 @@ defmodule SignalTower.WebsocketHandler do @impl :cowboy_websocket def init(req, _state) do - {:cowboy_websocket, req, {nil, 0}, %{idle_timeout: :timer.seconds(30)}} + initial_state = %{ + # room membership + room: nil, + # initialize turn timeout with 0, it will be properly initialized on first room join + turn_timeout: 0 + } + + {:cowboy_websocket, req, initial_state, %{idle_timeout: :timer.seconds(30)}} end @impl :cowboy_websocket - def websocket_init({room, last_turn_timestamp}) do + def websocket_init(initial_state) do Session.init() :timer.send_interval(:timer.seconds(5), :send_ping) - {:ok, {room, last_turn_timestamp}} + {:ok, initial_state} end @impl :cowboy_websocket @@ -53,8 +60,8 @@ defmodule SignalTower.WebsocketHandler do end @impl :cowboy_websocket - def websocket_info({:DOWN, _, _, pid, status}, {room, ltt}) do - {:ok, Session.handle_exit_message(pid, room, status, ltt)} + def websocket_info({:DOWN, _, _, pid, status}, state) do + {:ok, Session.handle_exit_message(pid, status, state)} end @impl :cowboy_websocket diff --git a/test/session_test.exs b/test/session_test.exs index 5ddc915..0998299 100644 --- a/test/session_test.exs +++ b/test/session_test.exs @@ -4,6 +4,8 @@ defmodule SessionTest do alias SignalTower.Session + @initial_state %{room: nil, turn_timeout: 0} + test "join and leave with registered users" do host_pid = self() @@ -15,7 +17,7 @@ defmodule SessionTest do "room_id" => "s-room1", "status" => %{user: "0"} }, - {nil, 0} + @initial_state ) assert_receive {:to_user, @@ -54,7 +56,7 @@ defmodule SessionTest do "room_id" => "s-room1", "status" => %{user: "1"} }, - {nil, 0} + @initial_state ) assert_receive {:to_user, @@ -77,7 +79,7 @@ defmodule SessionTest do "event" => "leave_room", "room_id" => "s-room13" }, - {room, 0} + %{room: room, turn_timeout: 0} ) end) @@ -100,7 +102,7 @@ defmodule SessionTest do "peer_id" => peer_id, "data" => %{some: "data"} }, - {room, 0} + %{room: room, turn_timeout: 0} ) end end) @@ -128,7 +130,7 @@ defmodule SessionTest do "event" => "update_status", "status" => %{some: "status"} }, - {room, 0} + %{room: room, turn_timeout: 0} ) end) @@ -149,13 +151,13 @@ defmodule SessionTest do test "not possible to use certain events when not in a room" do _client1 = create_client(fn _, _ -> - room = + %{room: room} = Session.handle_message( %{ "event" => "update_status", "status" => %{new: "status"} }, - {nil, 0} + @initial_state ) assert_receive {:to_user, m = %{event: "error"}} @@ -167,7 +169,7 @@ defmodule SessionTest do "peer_id" => "some_peer", "data" => %{some: "data"} }, - {room, 0} + %{room: room, turn_timeout: 0} ) assert_receive {:to_user, m = %{event: "error"}} @@ -182,7 +184,7 @@ defmodule SessionTest do %{ "event" => "ping" }, - {nil, 0} + @initial_state() ) assert_receive {:to_user, %{event: "pong"}} @@ -193,7 +195,7 @@ defmodule SessionTest do %{ "event" => "unknown" }, - {nil, 0} + @initial_state ) assert_receive {:to_user, %{event: "error"}} @@ -224,14 +226,14 @@ defmodule SessionTest do end defp join_room(room_id, host) do - {room, _} = + %{room: room} = Session.handle_message( %{ "event" => "join_room", "room_id" => room_id, "status" => %{local: "status"} }, - {nil, 0} + @initial_state ) if host, do: send(host, :start) From a83d762556d3b9c530d67be3c16bdffeac06f591 Mon Sep 17 00:00:00 2001 From: Marius Melzer Date: Sat, 11 Jul 2020 13:47:56 +0200 Subject: [PATCH 3/4] Rename turn_timout to *expiry, make its validity_period a constant --- lib/signal_tower/room.ex | 32 +++++++++++++++------------ lib/signal_tower/session.ex | 10 ++++++--- lib/signal_tower/websocket_handler.ex | 4 ++-- test/session_test.exs | 10 ++++----- 4 files changed, 32 insertions(+), 24 deletions(-) diff --git a/lib/signal_tower/room.ex b/lib/signal_tower/room.ex index 969833e..924aad3 100644 --- a/lib/signal_tower/room.ex +++ b/lib/signal_tower/room.ex @@ -5,6 +5,9 @@ defmodule SignalTower.Room do alias SignalTower.Room.{Member, Membership, Supervisor} alias SignalTower.Stats + # a turn token is valid for three hours + @turn_validity_period 3 * 60 * 60 + ## API ## def start_link(room_id) do @@ -19,14 +22,15 @@ defmodule SignalTower.Room do end end - def join_and_monitor(room_id, status, turn_timeout) do + def join_and_monitor(room_id, status, turn_token_expiry) do room_pid = create(room_id) Process.monitor(room_pid) - {own_id, new_turn_timeout} = GenServer.call(room_pid, {:join, self(), status, turn_timeout}) + {own_id, new_turn_token_expiry} = + GenServer.call(room_pid, {:join, self(), status, turn_token_expiry}) membership = %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status} - %{room: membership, turn_timeout: new_turn_timeout} + %{room: membership, turn_token_expiry: new_turn_token_expiry} end ## Callbacks ## @@ -38,16 +42,16 @@ defmodule SignalTower.Room do end @impl GenServer - def handle_call({:join, pid, status, turn_timeout}, _, members) do + def handle_call({:join, pid, status, turn_token_expiry}, _, members) do GenServer.cast(Stats, {:peer_joined, self(), map_size(members) + 1}) Process.monitor(pid) peer_id = UUID.uuid1() - new_turn_timeout = send_joined_room(pid, peer_id, members, turn_timeout) + new_turn_token_expiry = send_joined_room(pid, peer_id, members, turn_token_expiry) send_new_peer(members, peer_id, status) new_member = %Member{peer_id: peer_id, pid: pid, status: status} - {:reply, {peer_id, new_turn_timeout}, Map.put(members, peer_id, new_member)} + {:reply, {peer_id, new_turn_token_expiry}, Map.put(members, peer_id, new_member)} end @impl GenServer @@ -131,13 +135,13 @@ defmodule SignalTower.Room do end) end - defp send_joined_room(pid, own_id, members, turn_timeout) do + defp send_joined_room(pid, own_id, members, turn_token_expiry) do now = System.os_time(:second) - {turn_response, next_turn_timeout} = - if System.get_env("SIGNALTOWER_TURN_SECRET") && turn_timeout < now do - next_timeout = now + 3 * 60 * 60 - user = to_string(next_timeout) <> ":" <> own_id + {turn_response, next_turn_token_expiry} = + if System.get_env("SIGNALTOWER_TURN_SECRET") && turn_token_expiry < now do + next_expiry = now + @turn_validity_period + user = to_string(next_expiry) <> ":" <> own_id secret = System.get_env("SIGNALTOWER_TURN_SECRET") response = %{ @@ -146,9 +150,9 @@ defmodule SignalTower.Room do :crypto.mac(:hmac, :sha, to_charlist(secret), to_charlist(user)) |> Base.encode64() } - {response, next_timeout} + {response, next_expiry} else - {%{}, turn_timeout} + {%{}, turn_token_expiry} end joined_response = @@ -159,7 +163,7 @@ defmodule SignalTower.Room do }) send(pid, {:to_user, joined_response}) - next_turn_timeout + next_turn_token_expiry end defp send_new_peer(members, peer_id, status) do diff --git a/lib/signal_tower/session.ex b/lib/signal_tower/session.ex index 2673e83..98d51c7 100644 --- a/lib/signal_tower/session.ex +++ b/lib/signal_tower/session.ex @@ -23,7 +23,7 @@ defmodule SignalTower.Session do end defp incoming_message(msg = %{"event" => "join_room"}, state) do - Room.join_and_monitor(msg["room_id"], msg["status"], state.turn_timeout) + Room.join_and_monitor(msg["room_id"], msg["status"], state.turn_token_expiry) end defp incoming_message(msg = %{"event" => "leave_room"}, state = %{room: room}) do @@ -65,10 +65,14 @@ defmodule SignalTower.Session do end # invoked when a room exits - def handle_exit_message(pid, status, state = %{room: room, turn_timeout: turn_timeout}) do + def handle_exit_message( + pid, + status, + state = %{room: room, turn_token_expiry: turn_token_expiry} + ) do if room && pid == room.pid && status != :normal do # current room died => automatic rejoin - Room.join_and_monitor(room.id, room.own_status, turn_timeout) + Room.join_and_monitor(room.id, room.own_status, turn_token_expiry) else state end diff --git a/lib/signal_tower/websocket_handler.ex b/lib/signal_tower/websocket_handler.ex index deedbde..a6c7775 100644 --- a/lib/signal_tower/websocket_handler.ex +++ b/lib/signal_tower/websocket_handler.ex @@ -9,8 +9,8 @@ defmodule SignalTower.WebsocketHandler do initial_state = %{ # room membership room: nil, - # initialize turn timeout with 0, it will be properly initialized on first room join - turn_timeout: 0 + # initialize turn token expiry with 0, it will be properly initialized on first room join + turn_token_expiry: 0 } {:cowboy_websocket, req, initial_state, %{idle_timeout: :timer.seconds(30)}} diff --git a/test/session_test.exs b/test/session_test.exs index 0998299..bbbec92 100644 --- a/test/session_test.exs +++ b/test/session_test.exs @@ -4,7 +4,7 @@ defmodule SessionTest do alias SignalTower.Session - @initial_state %{room: nil, turn_timeout: 0} + @initial_state %{room: nil, turn_token_expiry: 0} test "join and leave with registered users" do host_pid = self() @@ -79,7 +79,7 @@ defmodule SessionTest do "event" => "leave_room", "room_id" => "s-room13" }, - %{room: room, turn_timeout: 0} + %{room: room, turn_token_expiry: 0} ) end) @@ -102,7 +102,7 @@ defmodule SessionTest do "peer_id" => peer_id, "data" => %{some: "data"} }, - %{room: room, turn_timeout: 0} + %{room: room, turn_token_expiry: 0} ) end end) @@ -130,7 +130,7 @@ defmodule SessionTest do "event" => "update_status", "status" => %{some: "status"} }, - %{room: room, turn_timeout: 0} + %{room: room, turn_token_expiry: 0} ) end) @@ -169,7 +169,7 @@ defmodule SessionTest do "peer_id" => "some_peer", "data" => %{some: "data"} }, - %{room: room, turn_timeout: 0} + %{room: room, turn_token_expiry: 0} ) assert_receive {:to_user, m = %{event: "error"}} From 45ad2d58fe6dd45612223fa63d017df53418551c Mon Sep 17 00:00:00 2001 From: Marius Melzer Date: Sun, 12 Jul 2020 16:22:20 +0200 Subject: [PATCH 4/4] Refactor turn tests, add session turn test --- test/room_test.exs | 53 ++++++++++++++----------------------------- test/session_test.exs | 42 ++++++++++++++++++++++++++++++++++ test/test_helper.exs | 29 +++++++++++++++++++++++ 3 files changed, 88 insertions(+), 36 deletions(-) diff --git a/test/room_test.exs b/test/room_test.exs index f003ac7..b269939 100644 --- a/test/room_test.exs +++ b/test/room_test.exs @@ -43,44 +43,25 @@ defmodule RoomTest do System.put_env("SIGNALTOWER_TURN_SECRET", "verysecretpassphrase1234") room_pid = create_room("r-room3") - go = fn timestamp_before -> - spawn_user_no_join(fn -> - GenServer.call(room_pid, {:join, self(), %{user: "1"}, 0}) + # no token produced yet + spawn_user_no_join(fn -> + GenServer.call(room_pid, {:join, self(), %{user: "1"}, 0}) + receive_and_check_turn_credentials(0) + end) - assert_receive( - {:to_user, - %{ - event: "joined_room", - own_id: own_id, - turn_user: user, - turn_password: pw - }}, - 1000 - ) - - [timestamp_str, id] = String.split(user, ":") - {timestamp, ""} = Integer.parse(timestamp_str) - assert own_id == id - assert System.os_time(:second) < timestamp - assert timestamp < System.os_time(:second) + 3 * 60 * 60 + 10 - assert timestamp_before <= timestamp - - assert pw == - :crypto.mac( - :hmac, - :sha, - to_charlist("verysecretpassphrase1234"), - to_charlist(user) - ) - |> Base.encode64() - end) - end + # previous token is depleted + spawn_user_no_join(fn -> + previous_expiry = System.os_time(:second) - 2000 + GenServer.call(room_pid, {:join, self(), %{user: "1"}, previous_expiry}) + receive_and_check_turn_credentials(previous_expiry) + end) - go.(0) - # is depleted - go.(System.os_time(:second) - 200) - # is still valid - go.(System.os_time(:second) + 200) + # previous token is still valid + spawn_user_no_join(fn -> + previous_expiry = System.os_time(:second) + 2000 + GenServer.call(room_pid, {:join, self(), %{user: "1"}, previous_expiry}) + receive_and_check_turn_credentials(previous_expiry) + end) wait_for_breaks(2) end diff --git a/test/session_test.exs b/test/session_test.exs index bbbec92..eb7fdc3 100644 --- a/test/session_test.exs +++ b/test/session_test.exs @@ -71,6 +71,48 @@ defmodule SessionTest do wait_for_breaks(2) end + test "join and check turn expiry" do + System.put_env("SIGNALTOWER_TURN_SECRET", "verysecretpassphrase1234") + + # no token produced yet + Session.handle_message( + %{ + "event" => "join_room", + "room_id" => "s-room1", + "status" => %{user: "0"} + }, + @initial_state + ) + + receive_and_check_turn_credentials(0) + + # previous token is depleted + previous_expiry = System.os_time(:second) - 2000 + + Session.handle_message( + %{ + "event" => "join_room", + "room_id" => "s-room1", + "status" => %{user: "0"} + }, + %{room: nil, turn_token_expiry: previous_expiry} + ) + + receive_and_check_turn_credentials(previous_expiry) + + # previous token is still valid + Session.handle_message( + %{ + "event" => "join_room", + "room_id" => "s-room1", + "status" => %{user: "0"} + }, + %{room: nil, turn_token_expiry: previous_expiry} + ) + + receive_and_check_turn_credentials(previous_expiry) + end + test "leave explicitly" do _client1 = create_client("s-room13", fn room, _ -> diff --git a/test/test_helper.exs b/test/test_helper.exs index b8f47a2..30cfe1f 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -6,4 +6,33 @@ defmodule TestHelper do def wait_for_breaks(n) when n > 0 do 1..n |> Enum.each(fn _ -> assert_receive :break, 10_000 end) end + + def receive_and_check_turn_credentials(expiry_before) do + assert_receive( + {:to_user, + %{ + event: "joined_room", + own_id: own_id, + turn_user: user, + turn_password: pw + }}, + 1000 + ) + + [expiry_str, id] = String.split(user, ":") + expiry = String.to_integer(expiry_str) + assert own_id == id + assert System.os_time(:second) < expiry + assert expiry < System.os_time(:second) + 3 * 60 * 60 + 10 + assert expiry_before <= expiry + + assert pw == + :crypto.mac( + :hmac, + :sha, + to_charlist("verysecretpassphrase1234"), + to_charlist(user) + ) + |> Base.encode64() + end end