diff --git a/lib/src/socket.dart b/lib/src/socket.dart index e2a6d6d..f2adf64 100644 --- a/lib/src/socket.dart +++ b/lib/src/socket.dart @@ -171,10 +171,13 @@ class PhoenixSocket { bool get isConnected => _ws != null && _socketState == SocketState.connected; void _connect(Completer completer) async { - if (_ws != null) { + if (_ws != null && + (_socketState == SocketState.connected || + _socketState == SocketState.connecting)) { _logger.warning( 'Calling connect() on already connected or connecting socket.'); completer.complete(this); + return; } _shouldReconnect = true; @@ -190,6 +193,7 @@ class PhoenixSocket { _ws = _webSocketChannelFactory != null ? _webSocketChannelFactory!(_mountPoint) : WebSocketChannel.connect(_mountPoint); + _ws!.stream .where(_shouldPipeMessage) .listen(_onSocketData, cancelOnError: true) @@ -203,7 +207,13 @@ class PhoenixSocket { _socketState = SocketState.connecting; try { + // Wait for the WebSocket to be ready before continuing. In case of a + // failure to connect, the future will complete with an error and will be + // caught. + await _ws!.ready; + _socketState = SocketState.connected; + _logger.finest('Waiting for initial heartbeat roundtrip'); if (await _sendHeartbeat(ignorePreviousHeartbeat: true)) { _stateStreamController.add(PhoenixSocketOpenEvent()); @@ -508,10 +518,6 @@ class PhoenixSocket { void _onSocketData(message) => onSocketDataCallback(message); void _onSocketError(dynamic error, dynamic stacktrace) { - if (_socketState == SocketState.closing || - _socketState == SocketState.closed) { - return; - } final socketError = PhoenixSocketErrorEvent( error: error, stacktrace: stacktrace,