Skip to content

Commit

Permalink
Merge pull request #30 from palavatv/turn
Browse files Browse the repository at this point in the history
Add TURN support
  • Loading branch information
farao authored Jul 23, 2020
2 parents bcc9832 + 45ad2d5 commit 3230190
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 75 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,19 @@ mix test.watch
mix release production
```

**Options:**

To use a Turn Server, generate a secret key string, e.g. via `openssl rand -base64 30` and set:
```
export SIGNALTOWER_TURN_SECRET=<generated_secret_key>
```
The same secret key must be configured in the turn server.
For example for coturn, use the following configuration in turnserver.conf:
```
use-auth-secret
static-auth-secret=<generated_secret_key>
```

By default, the websocket port 4233 is used, you can change it via:
```
export SIGNALTOWER_PORT=1234
Expand All @@ -47,6 +60,8 @@ By default, the websocket is bound to all interfaces (0.0.0.0), you can also bin
export SIGNALTOWER_LOCALHOST
```

## References

[palava protocol]: https://github.com/palavatv/palava-client/wiki/Protocol
[palava client]: https://github.com/palavatv/palava-client/
[palava project]: https://github.com/palavatv/palava
82 changes: 55 additions & 27 deletions lib/signal_tower/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ defmodule SignalTower.Room do
alias SignalTower.Room.{Member, Membership, Supervisor}
alias SignalTower.Stats

# a turn token is valid for three hours
@turn_validity_period 3 * 60 * 60

## API ##

def start_link(room_id) do
name = "room_#{room_id}" |> String.to_atom()
GenServer.start_link(__MODULE__, room_id, name: name)
GenServer.start_link(__MODULE__, :ok, name: name)
end

def create(room_id) do
Expand All @@ -19,32 +22,36 @@ defmodule SignalTower.Room do
end
end

def join_and_monitor(room_id, status) do
def join_and_monitor(room_id, status, turn_token_expiry) do
room_pid = create(room_id)
Process.monitor(room_pid)
own_id = GenServer.call(room_pid, {:join, self(), status})
%Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status}

{own_id, new_turn_token_expiry} =
GenServer.call(room_pid, {:join, self(), status, turn_token_expiry})

membership = %Membership{id: room_id, pid: room_pid, own_id: own_id, own_status: status}
%{room: membership, turn_token_expiry: new_turn_token_expiry}
end

## Callbacks ##

@impl GenServer
def init(room_id) do
def init(_) do
GenServer.cast(Stats, {:room_created, self()})
{:ok, {room_id, %{}}}
{:ok, %{}}
end

@impl GenServer
def handle_call({:join, pid, status}, _, {room_id, members}) do
def handle_call({:join, pid, status, turn_token_expiry}, _, members) do
GenServer.cast(Stats, {:peer_joined, self(), map_size(members) + 1})

Process.monitor(pid)
peer_id = UUID.uuid1()
send_joined_room(pid, peer_id, members)
new_turn_token_expiry = send_joined_room(pid, peer_id, members, turn_token_expiry)
send_new_peer(members, peer_id, status)

new_member = %Member{peer_id: peer_id, pid: pid, status: status}
{:reply, peer_id, {room_id, Map.put(members, peer_id, new_member)}}
{:reply, {peer_id, new_turn_token_expiry}, Map.put(members, peer_id, new_member)}
end

@impl GenServer
Expand All @@ -62,16 +69,16 @@ defmodule SignalTower.Room do
end

@impl GenServer
def handle_cast({:send_to_peer, peer_id, msg, sender_id}, state = {_, members}) do
def handle_cast({:send_to_peer, peer_id, msg, sender_id}, members) do
if members[sender_id] && members[peer_id] do
send(members[peer_id].pid, {:to_user, Map.put(msg, :sender_id, sender_id)})
end

{:noreply, state}
{:noreply, members}
end

@impl GenServer
def handle_cast({:update_status, sender_id, status}, state = {_, members}) do
def handle_cast({:update_status, sender_id, status}, members) do
if members[sender_id] do
update_status = %{
event: "peer_updated_status",
Expand All @@ -83,41 +90,41 @@ defmodule SignalTower.Room do
|> send_to_all(update_status)
end

{:noreply, state}
{:noreply, members}
end

# invoked when a user session exits
@impl GenServer
def handle_info({:DOWN, _ref, _, pid, _}, state = {_, members}) do
def handle_info({:DOWN, _ref, _, pid, _}, members) do
members
|> Enum.find(fn {_, member} -> pid == member.pid end)
|> case do
{id, _} ->
case leave(id, state) do
case leave(id, members) do
{:ok, state} -> {:noreply, state}
{:error, state} -> {:noreply, state}
{:stop, state} -> {:stop, :normal, state}
end

_ ->
{:noreply, state}
{:noreply, members}
end
end

defp leave(peer_id, state = {room_id, members}) do
defp leave(peer_id, members) do
if members[peer_id] do
GenServer.cast(Stats, {:peer_left, self()})
next_members = Map.delete(members, peer_id)

if map_size(next_members) > 0 do
send_peer_left(next_members, peer_id)
{:ok, {room_id, next_members}}
{:ok, next_members}
else
GenServer.cast(Stats, {:room_closed, self()})
{:stop, {room_id, next_members}}
{:stop, next_members}
end
else
{:error, state}
{:error, members}
end
end

Expand All @@ -128,14 +135,35 @@ defmodule SignalTower.Room do
end)
end

defp send_joined_room(pid, peer_id, members) do
response_for_joined_peer = %{
event: "joined_room",
own_id: peer_id,
peers: members |> Map.values()
}
defp send_joined_room(pid, own_id, members, turn_token_expiry) do
now = System.os_time(:second)

{turn_response, next_turn_token_expiry} =
if System.get_env("SIGNALTOWER_TURN_SECRET") && turn_token_expiry < now do
next_expiry = now + @turn_validity_period
user = to_string(next_expiry) <> ":" <> own_id
secret = System.get_env("SIGNALTOWER_TURN_SECRET")

response = %{
turn_user: user,
turn_password:
:crypto.mac(:hmac, :sha, to_charlist(secret), to_charlist(user)) |> Base.encode64()
}

{response, next_expiry}
else
{%{}, turn_token_expiry}
end

joined_response =
Map.merge(turn_response, %{
event: "joined_room",
own_id: own_id,
peers: members |> Map.values()
})

send(pid, {:to_user, response_for_joined_peer})
send(pid, {:to_user, joined_response})
next_turn_token_expiry
end

defp send_new_peer(members, peer_id, status) do
Expand Down
42 changes: 23 additions & 19 deletions lib/signal_tower/session.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,48 @@ defmodule SignalTower.Session do
|> (&Process.register(self(), &1)).()
end

def handle_message(msg, room) do
case MsgIntegrity.check(msg, room) do
def handle_message(msg, state) do
case MsgIntegrity.check(msg, state.room) do
{:ok, msg} ->
incoming_message(msg, room)
incoming_message(msg, state)

{:error, error} ->
send_error(error, msg)
room
state
end
end

defp incoming_message(msg = %{"event" => "join_room"}, _) do
Room.join_and_monitor(msg["room_id"], msg["status"])
defp incoming_message(msg = %{"event" => "join_room"}, state) do
Room.join_and_monitor(msg["room_id"], msg["status"], state.turn_token_expiry)
end

defp incoming_message(msg = %{"event" => "leave_room"}, room) do
defp incoming_message(msg = %{"event" => "leave_room"}, state = %{room: room}) do
if room do
case GenServer.call(room.pid, {:leave, room.own_id}) do
:ok ->
nil
%{state | room: nil}

:error ->
send_error("You are not currently in a room, so you can not leave it", msg)
room
state
end
else
send_error("You are not currently in a room, so you can not leave it", msg)
room
state
end
end

defp incoming_message(msg = %{"event" => "send_to_peer"}, room) do
defp incoming_message(msg = %{"event" => "send_to_peer"}, state = %{room: room}) do
GenServer.cast(room.pid, {:send_to_peer, msg["peer_id"], msg["data"], room.own_id})
room
state
end

defp incoming_message(msg = %{"event" => "update_status"}, room) do
defp incoming_message(msg = %{"event" => "update_status"}, state = %{room: room}) do
GenServer.cast(room.pid, {:update_status, room.own_id, msg["status"]})
room
state
end

defp incoming_message(%{"event" => "ping"}, room) do
defp incoming_message(%{"event" => "ping"}, state) do
send(
self(),
{:to_user,
Expand All @@ -61,16 +61,20 @@ defmodule SignalTower.Session do
}}
)

room
state
end

# invoked when a room exits
def handle_exit_message(pid, room, status) do
def handle_exit_message(
pid,
status,
state = %{room: room, turn_token_expiry: turn_token_expiry}
) do
if room && pid == room.pid && status != :normal do
# current room died => automatic rejoin
Room.join_and_monitor(room.id, room.own_status)
Room.join_and_monitor(room.id, room.own_status, turn_token_expiry)
else
nil
state
end
end

Expand Down
27 changes: 17 additions & 10 deletions lib/signal_tower/websocket_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,32 @@ defmodule SignalTower.WebsocketHandler do

@impl :cowboy_websocket
def init(req, _state) do
{:cowboy_websocket, req, nil, %{idle_timeout: :timer.seconds(30)}}
initial_state = %{
# room membership
room: nil,
# initialize turn token expiry with 0, it will be properly initialized on first room join
turn_token_expiry: 0
}

{:cowboy_websocket, req, initial_state, %{idle_timeout: :timer.seconds(30)}}
end

@impl :cowboy_websocket
def websocket_init(state) do
def websocket_init(initial_state) do
Session.init()
:timer.send_interval(:timer.seconds(5), :send_ping)
{:ok, state}
{:ok, initial_state}
end

@impl :cowboy_websocket
def websocket_handle({:text, msg}, room) do
def websocket_handle({:text, msg}, state) do
case Poison.decode(msg) do
{:ok, parsed_msg} ->
{:ok, Session.handle_message(parsed_msg, room)}
{:ok, Session.handle_message(parsed_msg, state)}

_ ->
answer = Poison.encode!(%{event: "error", description: "invalid json", received_msg: msg})
{:reply, {:text, answer}, room}
{:reply, {:text, answer}, state}
end
end

Expand Down Expand Up @@ -53,13 +60,13 @@ defmodule SignalTower.WebsocketHandler do
end

@impl :cowboy_websocket
def websocket_info({:DOWN, _, _, pid, status}, room) do
{:ok, Session.handle_exit_message(pid, room, status)}
def websocket_info({:DOWN, _, _, pid, status}, state) do
{:ok, Session.handle_exit_message(pid, status, state)}
end

@impl :cowboy_websocket
def websocket_info(:send_ping, status) do
{:reply, {:ping, "server ping"}, status}
def websocket_info(:send_ping, state) do
{:reply, {:ping, "server ping"}, state}
end

@impl :cowboy_websocket
Expand Down
Loading

0 comments on commit 3230190

Please sign in to comment.