From 0095345a85163136d81352a7d83a2faa4bce33f9 Mon Sep 17 00:00:00 2001 From: snowteamer <64228468+snowteamer@users.noreply.github.com> Date: Wed, 7 Apr 2021 10:01:40 +0200 Subject: [PATCH] Use SBP selectors for custom pubsub events --- backend/pubsub.js | 31 ++++++++++++++----- frontend/utils/events.js | 6 ++++ shared/pubsub.js | 64 ++++++++++++++++++++-------------------- 3 files changed, 61 insertions(+), 40 deletions(-) diff --git a/backend/pubsub.js b/backend/pubsub.js index 81b315b351..91a93a9f47 100644 --- a/backend/pubsub.js +++ b/backend/pubsub.js @@ -1,6 +1,11 @@ /* globals logger */ 'use strict' +/* + * Pub/Sub server implementation using the `ws` library. + * See https://github.com/websockets/ws#api-docs + */ + import { NOTIFICATION_TYPE, REQUEST_TYPE, @@ -28,6 +33,10 @@ const { ERROR, SUCCESS } = RESPONSE_TYPE // Re-export some useful things from the shared module. export { createClient, createMessage, NOTIFICATION_TYPE, REQUEST_TYPE, RESPONSE_TYPE } +export function createErrorResponse (data: JSONType): string { + return JSON.stringify({ type: ERROR, data }) +} + export function createNotification (type: NotificationTypeEnum, data: JSONType): string { return JSON.stringify({ type, data }) } @@ -78,7 +87,7 @@ export function createServer (httpServer: Object, options?: Object = {}): Object customHandler.call(server, ...args) } } catch (error) { - logger(error) + server.emit('error', error) } }) }) @@ -148,18 +157,20 @@ const defaultServerHandlers = { customHandler.call(socket, ...args) } } catch (error) { - logger(error) + socket.server.emit('error', error) + socket.terminate() } }) }) }, error (error: Error) { - console.log('[pubsub] Server event: error', error) + console.log('[pubsub] Server error:', error) + logger(error) }, headers () { }, listening () { - console.log('[pubsub] Server event: listening') + console.log('[pubsub] Server listening') } } @@ -186,7 +197,8 @@ const defaultSocketEventHandlers = { msg = messageParser(data) } catch (error) { console.error(bold.red(`[pubsub] Malformed message: ${error.message}`)) - return this.terminate() + rejectMessageAndTerminateSocket(msg, this) + return } this.activeSinceLastPing = true const handler = this.server.messageHandlers[msg.type] @@ -197,12 +209,11 @@ const defaultSocketEventHandlers = { } catch (error) { // Log the error message and stack trace but do not send it to the client. logger(error) - // Should we call 'this.terminate()' instead? - this.send(createResponse(ERROR, { ...msg }), () => this.close()) + rejectMessageAndTerminateSocket(msg, this) } } else { console.error(`[pubsub] Unhandled message type: ${msg.type}`) - this.terminate() + rejectMessageAndTerminateSocket(msg, this) } } } @@ -293,3 +304,7 @@ const publicMethods = { } } } + +const rejectMessageAndTerminateSocket = (request: Message, socket: Object) => { + socket.send(createErrorResponse({ ...request }), () => socket.terminate()) +} diff --git a/frontend/utils/events.js b/frontend/utils/events.js index b0b5a9ab7a..4f33a7920c 100644 --- a/frontend/utils/events.js +++ b/frontend/utils/events.js @@ -20,3 +20,9 @@ export const CONTRACT_IS_SYNCING = 'contract-is-syncing' export const CAPTURED_LOGS = 'captured-logs' export const SET_APP_LOGS_FILTER = 'set-app-logs-filter' + +export const PUBSUB_ERROR = 'pubsub-error' +export const PUBSUB_RECONNECTION_ATTEMPT = 'pubsub-reconnection-attempt' +export const PUBSUB_RECONNECTION_FAILED = 'pubsub-reconnection-failed' +export const PUBSUB_RECONNECTION_SCHEDULED = 'pubsub-reconnection-scheduled' +export const PUBSUB_RECONNECTION_SUCCEEDED = 'pubsub-reconnection-succeeded' diff --git a/shared/pubsub.js b/shared/pubsub.js index 5140a51290..9726a41dc7 100644 --- a/shared/pubsub.js +++ b/shared/pubsub.js @@ -1,6 +1,14 @@ 'use strict' +import sbp from '~/shared/sbp.js' import type { JSONObject, JSONType } from '~/shared/types.js' +import { + PUBSUB_ERROR, + PUBSUB_RECONNECTION_ATTEMPT, + PUBSUB_RECONNECTION_FAILED, + PUBSUB_RECONNECTION_SCHEDULED, + PUBSUB_RECONNECTION_SUCCEEDED +} from '~/frontend/utils/events.js' // ====== Types ====== // @@ -39,7 +47,6 @@ export type PubSubClient = { clearAllTimers(): void, connect(): void, destroy(): void, - emit(type: string, detail?: any): void, pub(contractID: string, data: JSONType): void, scheduleConnectionAttempt(): void, sub(contractID: string): void, @@ -96,11 +103,11 @@ export type ResponseTypeEnum = $Values * {boolean?} manual - Whether the factory should call 'connect()' automatically. * Also named 'autoConnect' or 'startClosed' in other libraries. * {object?} messageHandlers - Custom handlers for different message types. - * {number?} pingTimeout - How long to wait for the server to send a ping, in milliseconds. - * {boolean?} reconnectOnDisconnection - Whether to reconnect after a server-side disconnection. - * {boolean?} reconnectOnOnline - Whether to reconnect after coming back online. - * {boolean?} reconnectOnTimeout - Whether to reconnect after a connection timeout. - * {number?} timeout - Connection timeout duration in milliseconds. + * {number?} pingTimeout=45_000 - How long to wait for the server to send a ping, in milliseconds. + * {boolean?} reconnectOnDisconnection=true - Whether to reconnect after a server-side disconnection. + * {boolean?} reconnectOnOnline=true - Whether to reconnect after coming back online. + * {boolean?} reconnectOnTimeout=false - Whether to reconnect after a connection timeout. + * {number?} timeout=5_000 - Connection timeout duration in milliseconds. * @returns {PubSubClient} */ export function createClient (url: string, options?: Object = {}): PubSubClient { @@ -150,7 +157,7 @@ export function createClient (url: string, options?: Object = {}): PubSubClient } } catch (error) { // Do not throw any error but emit an `error` event instead. - client.emit('error', error.message) + sbp('okTurtles.events/emit', PUBSUB_ERROR, client, error.message) } } } @@ -223,7 +230,9 @@ const defaultClientEventHandlers = { const { data } = event if (typeof data !== 'string') { - this.emit('error', `Critical error! Wrong data type: ${typeof data}`) + sbp('okTurtles.events/emit', PUBSUB_ERROR, this, { + message: `Wrong data type: ${typeof data}` + }) return this.destroy() } let msg: Message = { type: '' } @@ -231,7 +240,9 @@ const defaultClientEventHandlers = { try { msg = messageParser(data) } catch (error) { - this.emit('error', `Critical error! Malformed message: ${error.message}`) + sbp('okTurtles.events/emit', PUBSUB_ERROR, this, { + message: `Malformed message: ${error.message}` + }) return this.destroy() } const handler = this.messageHandlers[msg.type] @@ -268,7 +279,7 @@ const defaultClientEventHandlers = { open (event: Event) { console.log('[pubsub] Event: open') if (!this.isNew) { - this.emit('reconnection-succeeded') + sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_SUCCEEDED, this) } this.clearAllTimers() // Set it to -1 so that it becomes 0 on the next `close` event. @@ -384,12 +395,6 @@ const defaultOptions = { timeout: 5_000 } -const customEventNames = [ - 'reconnection-attempt', - 'reconnection-failed', - 'reconnection-scheduled', - 'reconnection-succeeded' -] const globalEventNames = ['offline', 'online'] const socketEventNames = ['close', 'error', 'message', 'open'] @@ -478,20 +483,6 @@ const publicMethods = { this.shouldReconnect = false }, - // Emits a custom event or an `error` event. - // Other fake native events are not allowed so as to not break things. - emit (type: string, detail?: mixed) { - if (!customEventNames.includes(type) && type !== 'error') { - throw new Error(`emit(): argument 'type' must not be '${type}'.`) - } - // This event object partially implements the `CustomEvent` interface. - const event = { type, detail } - const listener = this.listeners[type] - if (listener) { - listener(event) - } - }, - getNextRandomDelay (): number { const { maxReconnectionDelay, @@ -515,11 +506,11 @@ const publicMethods = { const nth = this.failedConnectionAttempts + 1 this.nextConnectionAttemptDelayID = setTimeout(() => { - this.emit('reconnection-attempt') + sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_ATTEMPT, this) this.nextConnectionAttemptDelayID = undefined this.connect() }, delay) - this.emit('reconnection-scheduled', { delay, nth }) + sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_SCHEDULED, this, { delay, nth }) }, // Unused for now. @@ -569,6 +560,15 @@ const publicMethods = { } } +// Register custom SBP event listeners before the first connection. +for (const name of Object.keys(defaultClientEventHandlers)) { + if (name === 'error' || !socketEventNames.includes(name)) { + sbp('okTurtles.events/on', `pubsub-${name}`, (target, detail?: Object) => { + target.listeners[name]({ type: name, target, detail }) + }) + } +} + export default { NOTIFICATION_TYPE, REQUEST_TYPE,