diff --git a/.github/workflows/build_container.yaml b/.github/workflows/build_container.yaml new file mode 100644 index 0000000..8646d51 --- /dev/null +++ b/.github/workflows/build_container.yaml @@ -0,0 +1,31 @@ +name: ci-build-container + +on: + pull_request: + paths: + - "example/backend/**" + - ".github/workflows/build_container.yaml" + +jobs: + build: + timeout-minutes: 20 + runs-on: ubuntu-latest + + steps: + - name: Check out the repo + uses: actions/checkout@v4 + + - name: Log in to Docker Hub + uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a + with: + username: ${{ secrets.DOCKER_USERNAME }} + password: ${{ secrets.DOCKER_PASSWORD }} + + - name: Build and push Docker image + uses: docker/build-push-action@3b5e8027fcad23fda98b2e3ac259d8d67585f671 + with: + context: ./example/backend/ + file: ./example/backend/Dockerfile + push: true + tags: braverhq/phoenix-dart-server:latest + diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index cdff780..d4f6e52 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -5,7 +5,7 @@ on: paths: - "lib/**" - "test/**" - - ".github/workflows/**" + - ".github/workflows/test.yaml" jobs: test: @@ -17,6 +17,7 @@ jobs: image: braverhq/phoenix-dart-server ports: - 4001:4001 + - 4002:4002 steps: diff --git a/example/backend/config/config.exs b/example/backend/config/config.exs index 3c6a2b7..ede81e2 100644 --- a/example/backend/config/config.exs +++ b/example/backend/config/config.exs @@ -12,7 +12,12 @@ config :backend, BackendWeb.Endpoint, url: [host: "localhost"], secret_key_base: "LkOAHmBjZB9uu1CTg3Z28ZnvysCl8LhqRGBxwq32eIR7P10XuMSmLIft/QgG1b8D", render_errors: [view: BackendWeb.ErrorView, accepts: ~w(html json)], - pubsub: [name: Backend.PubSub, adapter: Phoenix.PubSub.PG2] + pubsub_server: Backend.PubSub + +config :backend, BackendWeb.ControlEndpoint, + url: [host: "localhost"], + secret_key_base: "LkOAHmBjZB9uu1CTg3Z28ZnvysCl8LhqRGBxwq32eIR7P10XuMSmLIft/QgG1b8D", + render_errors: [view: BackendWeb.ErrorView, accepts: ~w(html json)] # Configures Elixir's Logger config :logger, :console, diff --git a/example/backend/config/dev.exs b/example/backend/config/dev.exs index 3db566b..3b5cfc2 100644 --- a/example/backend/config/dev.exs +++ b/example/backend/config/dev.exs @@ -13,6 +13,13 @@ config :backend, BackendWeb.Endpoint, check_origin: false, watchers: [] +config :backend, BackendWeb.ControlEndpoint, + http: [port: 4002], + debug_errors: true, + code_reloader: true, + check_origin: false, + watchers: [] + # ## SSL Support # # In order to use HTTPS in development, a self-signed diff --git a/example/backend/lib/backend/application.ex b/example/backend/lib/backend/application.ex index 9955f89..85d1a21 100644 --- a/example/backend/lib/backend/application.ex +++ b/example/backend/lib/backend/application.ex @@ -6,17 +6,11 @@ defmodule Backend.Application do use Application def start(_type, _args) do - # List all child processes to be supervised children = [ - # Start the endpoint when the application starts - BackendWeb.Endpoint, - BackendWeb.Presence - # Starts a worker by calling: Backend.Worker.start_link(arg) - # {Backend.Worker, arg}, + BackendWeb.Supervisor, + BackendWeb.ControlEndpoint ] - # See https://hexdocs.pm/elixir/Supervisor.html - # for other strategies and supported options opts = [strategy: :one_for_one, name: Backend.Supervisor] Supervisor.start_link(children, opts) end diff --git a/example/backend/lib/backend_web/channels/channel1.ex b/example/backend/lib/backend_web/channels/channel1.ex index 54ca13c..3fff1f5 100644 --- a/example/backend/lib/backend_web/channels/channel1.ex +++ b/example/backend/lib/backend_web/channels/channel1.ex @@ -10,7 +10,7 @@ defmodule BackendWeb.Channel1 do {:ok, socket} end - def join("channel1:" <> _name, %{"password" => _}, socket) do + def join("channel1:" <> _name, %{"password" => _}, _socket) do {:error, "wrong password"} end diff --git a/example/backend/lib/backend_web/channels/presence_channel.ex b/example/backend/lib/backend_web/channels/presence_channel.ex index 7980d1c..2c13d18 100644 --- a/example/backend/lib/backend_web/channels/presence_channel.ex +++ b/example/backend/lib/backend_web/channels/presence_channel.ex @@ -3,33 +3,9 @@ defmodule BackendWeb.PresenceChannel do alias BackendWeb.Presence @impl true - def join("presence:lobby", payload, socket) do - if authorized?(payload) do - send(self(), :after_join) - {:ok, socket} - else - {:error, %{reason: "unauthorized"}} - end - end - - # # Channels can be used in a request/response fashion - # # by sending replies to requests from the client - # @impl true - # def handle_in("ping", payload, socket) do - # {:reply, {:ok, payload}, socket} - # end - - # # It is also common to receive messages from the client and - # # broadcast to everyone in the current topic (presence:lobby). - # @impl true - # def handle_in("shout", payload, socket) do - # broadcast socket, "shout", payload - # {:noreply, socket} - # end - - # Add authorization logic here as required. - defp authorized?(_payload) do - true + def join("presence:lobby", _payload, socket) do + send(self(), :after_join) + {:ok, socket} end @impl true diff --git a/example/backend/lib/backend_web/control/control.ex b/example/backend/lib/backend_web/control/control.ex new file mode 100644 index 0000000..f802eca --- /dev/null +++ b/example/backend/lib/backend_web/control/control.ex @@ -0,0 +1,5 @@ +defmodule BackendWeb.ControlEndpoint do + use Phoenix.Endpoint, otp_app: :backend + + plug(BackendWeb.ControlRouter) +end diff --git a/example/backend/lib/backend_web/control/router.ex b/example/backend/lib/backend_web/control/router.ex new file mode 100644 index 0000000..6e12c03 --- /dev/null +++ b/example/backend/lib/backend_web/control/router.ex @@ -0,0 +1,35 @@ +defmodule BackendWeb.ControlRouter do + use Phoenix.Router + + get "/stop", BackendWeb.Control, :stop + get "/start", BackendWeb.Control, :start +end + +defmodule BackendWeb.Control do + @moduledoc """ + Control endpoint + """ + + use Phoenix.Controller, namespace: BackendWeb + + import Plug.Conn + + def stop(conn, _args) do + Supervisor.terminate_child(Backend.Supervisor, BackendWeb.Supervisor) + send_resp(conn, 200, "OK") + end + + def start(conn, _args) do + Supervisor.which_children(Backend.Supervisor) + |> Enum.find(&(elem(&1, 0) == BackendWeb.Supervisor)) + |> case do + {BackendWeb.Supervisor, :undefined, :supervisor, _rest} -> + Supervisor.restart_child(Backend.Supervisor, BackendWeb.Supervisor) + + _ -> + :ok + end + + send_resp(conn, 200, "OK") + end +end diff --git a/example/backend/lib/backend_web/supervisor.ex b/example/backend/lib/backend_web/supervisor.ex new file mode 100644 index 0000000..01bf128 --- /dev/null +++ b/example/backend/lib/backend_web/supervisor.ex @@ -0,0 +1,25 @@ +defmodule BackendWeb.Supervisor do + use Supervisor + + def start_link(opts) do + Supervisor.start_link(__MODULE__, opts ++ [name: Backend.Supervisor]) + end + + def init(_) do + children = [ + {Phoenix.PubSub, [name: Backend.PubSub, adapter: Phoenix.PubSub.PG2]}, + BackendWeb.Endpoint, + BackendWeb.Presence + ] + + Supervisor.init(children, strategy: :one_for_one) + end + + def child_spec(init_arg) do + %{ + id: BackendWeb.Supervisor, + start: {BackendWeb.Supervisor, :start_link, [init_arg]}, + type: :supervisor + } + end +end diff --git a/lib/src/socket.dart b/lib/src/socket.dart index 58232b6..441441c 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -170,15 +170,11 @@ class PhoenixSocket { /// Whether the underlying socket is connected of not. bool get isConnected => _ws != null && _socketState == SocketState.connected; - /// Attempts to make a WebSocket connection to the Phoenix backend. - /// - /// If the attempt fails, retries will be triggered at intervals specified - /// by retryAfterIntervalMS - Future connect() async { + void _connect(Completer completer) async { if (_ws != null) { _logger.warning( 'Calling connect() on already connected or connecting socket.'); - return this; + completer.complete(this); } _shouldReconnect = true; @@ -190,8 +186,6 @@ class PhoenixSocket { _mountPoint = await _buildMountPoint(_endpoint, _options); _logger.finest(() => 'Attempting to connect to $_mountPoint'); - final completer = Completer(); - try { _ws = _webSocketChannelFactory != null ? _webSocketChannelFactory!(_mountPoint) @@ -226,7 +220,18 @@ class PhoenixSocket { completer.complete(_delayedReconnect()); } + } + /// Attempts to make a WebSocket connection to the Phoenix backend. + /// + /// If the attempt fails, retries will be triggered at intervals specified + /// by retryAfterIntervalMS + Future connect() async { + final completer = Completer(); + runZonedGuarded( + () => _connect(completer), + (error, stack) {}, + ); return completer.future; } diff --git a/phoenix_socket.code-workspace b/phoenix_socket.code-workspace index 3d3faf3..c28bc7a 100644 --- a/phoenix_socket.code-workspace +++ b/phoenix_socket.code-workspace @@ -2,9 +2,13 @@ "folders": [ { "path": "." + }, + { + "path": "example/backend" } ], "settings": { "editor.formatOnSave": true, + "elixirLS.fetchDeps": true } } diff --git a/pubspec.yaml b/pubspec.yaml index 261b814..d7e9019 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -27,3 +27,4 @@ dev_dependencies: test: ^1.11.1 async: ^2.11.0 stream_channel: ^2.1.2 + http: any diff --git a/test/channel_integration_test.dart b/test/channel_integration_test.dart index 159ebce..11a10fd 100644 --- a/test/channel_integration_test.dart +++ b/test/channel_integration_test.dart @@ -3,10 +3,16 @@ import 'dart:async'; import 'package:phoenix_socket/phoenix_socket.dart'; import 'package:test/test.dart'; +import 'control.dart'; + void main() { const addr = 'ws://localhost:4001/socket/websocket'; group('PhoenixChannel', () { + setUp(() async { + await restartBackend(); + }); + test('can join a channel through a socket', () async { final socket = PhoenixSocket(addr); final completer = Completer(); @@ -20,6 +26,43 @@ void main() { await completer.future; }); + test('can join a channel through a socket that starts closed then connects', + () async { + final socket = PhoenixSocket(addr); + final completer = Completer(); + + await stopThenRestartBackend(); + await socket.connect(); + + socket.addChannel(topic: 'channel1').join().onReply('ok', (reply) { + expect(reply.status, equals('ok')); + completer.complete(); + }); + + await completer.future; + }); + + test( + 'can join a channel through a socket that disconnects before join but reconnects', + () async { + final socket = PhoenixSocket(addr); + final completer = Completer(); + + await socket.connect(); + + await stopBackend(); + final joinFuture = socket.addChannel(topic: 'channel1').join(); + Future.delayed(const Duration(milliseconds: 300)) + .then((value) => restartBackend()); + + joinFuture.onReply('ok', (reply) { + expect(reply.status, equals('ok')); + completer.complete(); + }); + + await completer.future; + }); + test('can join a channel through an unawaited socket', () async { final socket = PhoenixSocket(addr); final completer = Completer(); @@ -93,6 +136,23 @@ void main() { expect(reply.response, equals({'name': 'bar'})); }); + test( + 'can send messages to channels that got transiently disconnected and receive a reply', + () async { + final socket = PhoenixSocket(addr); + + await socket.connect(); + + final channel1 = socket.addChannel(topic: 'channel1'); + await channel1.join().future; + + await stopThenRestartBackend(); + + final reply = await channel1.push('hello!', {'foo': 'bar'}).future; + expect(reply.status, equals('ok')); + expect(reply.response, equals({'name': 'bar'})); + }); + test('only emits reply messages that are channel replies', () async { final socket = PhoenixSocket(addr); diff --git a/test/control.dart b/test/control.dart new file mode 100644 index 0000000..6291b17 --- /dev/null +++ b/test/control.dart @@ -0,0 +1,27 @@ +import 'package:http/http.dart'; + +Future stopBackend() { + return get(Uri.parse('http://localhost:4002/stop')).then((response) { + if (response.statusCode != 200) { + throw Exception('Failed to stop backend'); + } + }); +} + +Future restartBackend() { + return get(Uri.parse('http://localhost:4002/start')).then((response) { + if (response.statusCode != 200) { + throw Exception('Failed to start backend'); + } + }); +} + +Future stopThenRestartBackend( + [Duration delay = const Duration(milliseconds: 200)]) { + return get(Uri.parse('http://localhost:4002/stop')).then((response) { + if (response.statusCode != 200) { + throw Exception('Failed to stop backend'); + } + Future.delayed(delay).then((_) => restartBackend()); + }); +}