Skip to content

Commit

Permalink
Use SBP selectors for custom pubsub events
Browse files Browse the repository at this point in the history
  • Loading branch information
snowteamer committed Apr 7, 2021
1 parent 3b50fc2 commit 0095345
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 40 deletions.
31 changes: 23 additions & 8 deletions backend/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
/* globals logger */
'use strict'

/*
* Pub/Sub server implementation using the `ws` library.
* See https://github.com/websockets/ws#api-docs
*/

import {
NOTIFICATION_TYPE,
REQUEST_TYPE,
Expand Down Expand Up @@ -28,6 +33,10 @@ const { ERROR, SUCCESS } = RESPONSE_TYPE
// Re-export some useful things from the shared module.
export { createClient, createMessage, NOTIFICATION_TYPE, REQUEST_TYPE, RESPONSE_TYPE }

export function createErrorResponse (data: JSONType): string {
return JSON.stringify({ type: ERROR, data })
}

export function createNotification (type: NotificationTypeEnum, data: JSONType): string {
return JSON.stringify({ type, data })
}
Expand Down Expand Up @@ -78,7 +87,7 @@ export function createServer (httpServer: Object, options?: Object = {}): Object
customHandler.call(server, ...args)
}
} catch (error) {
logger(error)
server.emit('error', error)
}
})
})
Expand Down Expand Up @@ -148,18 +157,20 @@ const defaultServerHandlers = {
customHandler.call(socket, ...args)
}
} catch (error) {
logger(error)
socket.server.emit('error', error)
socket.terminate()
}
})
})
},
error (error: Error) {
console.log('[pubsub] Server event: error', error)
console.log('[pubsub] Server error:', error)
logger(error)
},
headers () {
},
listening () {
console.log('[pubsub] Server event: listening')
console.log('[pubsub] Server listening')
}
}

Expand All @@ -186,7 +197,8 @@ const defaultSocketEventHandlers = {
msg = messageParser(data)
} catch (error) {
console.error(bold.red(`[pubsub] Malformed message: ${error.message}`))
return this.terminate()
rejectMessageAndTerminateSocket(msg, this)
return
}
this.activeSinceLastPing = true
const handler = this.server.messageHandlers[msg.type]
Expand All @@ -197,12 +209,11 @@ const defaultSocketEventHandlers = {
} catch (error) {
// Log the error message and stack trace but do not send it to the client.
logger(error)
// Should we call 'this.terminate()' instead?
this.send(createResponse(ERROR, { ...msg }), () => this.close())
rejectMessageAndTerminateSocket(msg, this)
}
} else {
console.error(`[pubsub] Unhandled message type: ${msg.type}`)
this.terminate()
rejectMessageAndTerminateSocket(msg, this)
}
}
}
Expand Down Expand Up @@ -293,3 +304,7 @@ const publicMethods = {
}
}
}

const rejectMessageAndTerminateSocket = (request: Message, socket: Object) => {
socket.send(createErrorResponse({ ...request }), () => socket.terminate())
}
6 changes: 6 additions & 0 deletions frontend/utils/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ export const CONTRACT_IS_SYNCING = 'contract-is-syncing'

export const CAPTURED_LOGS = 'captured-logs'
export const SET_APP_LOGS_FILTER = 'set-app-logs-filter'

export const PUBSUB_ERROR = 'pubsub-error'
export const PUBSUB_RECONNECTION_ATTEMPT = 'pubsub-reconnection-attempt'
export const PUBSUB_RECONNECTION_FAILED = 'pubsub-reconnection-failed'
export const PUBSUB_RECONNECTION_SCHEDULED = 'pubsub-reconnection-scheduled'
export const PUBSUB_RECONNECTION_SUCCEEDED = 'pubsub-reconnection-succeeded'
64 changes: 32 additions & 32 deletions shared/pubsub.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
'use strict'

import sbp from '~/shared/sbp.js'
import type { JSONObject, JSONType } from '~/shared/types.js'
import {
PUBSUB_ERROR,
PUBSUB_RECONNECTION_ATTEMPT,
PUBSUB_RECONNECTION_FAILED,
PUBSUB_RECONNECTION_SCHEDULED,
PUBSUB_RECONNECTION_SUCCEEDED
} from '~/frontend/utils/events.js'

// ====== Types ====== //

Expand Down Expand Up @@ -39,7 +47,6 @@ export type PubSubClient = {
clearAllTimers(): void,
connect(): void,
destroy(): void,
emit(type: string, detail?: any): void,
pub(contractID: string, data: JSONType): void,
scheduleConnectionAttempt(): void,
sub(contractID: string): void,
Expand Down Expand Up @@ -96,11 +103,11 @@ export type ResponseTypeEnum = $Values<typeof RESPONSE_TYPE>
* {boolean?} manual - Whether the factory should call 'connect()' automatically.
* Also named 'autoConnect' or 'startClosed' in other libraries.
* {object?} messageHandlers - Custom handlers for different message types.
* {number?} pingTimeout - How long to wait for the server to send a ping, in milliseconds.
* {boolean?} reconnectOnDisconnection - Whether to reconnect after a server-side disconnection.
* {boolean?} reconnectOnOnline - Whether to reconnect after coming back online.
* {boolean?} reconnectOnTimeout - Whether to reconnect after a connection timeout.
* {number?} timeout - Connection timeout duration in milliseconds.
* {number?} pingTimeout=45_000 - How long to wait for the server to send a ping, in milliseconds.
* {boolean?} reconnectOnDisconnection=true - Whether to reconnect after a server-side disconnection.
* {boolean?} reconnectOnOnline=true - Whether to reconnect after coming back online.
* {boolean?} reconnectOnTimeout=false - Whether to reconnect after a connection timeout.
* {number?} timeout=5_000 - Connection timeout duration in milliseconds.
* @returns {PubSubClient}
*/
export function createClient (url: string, options?: Object = {}): PubSubClient {
Expand Down Expand Up @@ -150,7 +157,7 @@ export function createClient (url: string, options?: Object = {}): PubSubClient
}
} catch (error) {
// Do not throw any error but emit an `error` event instead.
client.emit('error', error.message)
sbp('okTurtles.events/emit', PUBSUB_ERROR, client, error.message)
}
}
}
Expand Down Expand Up @@ -223,15 +230,19 @@ const defaultClientEventHandlers = {
const { data } = event

if (typeof data !== 'string') {
this.emit('error', `Critical error! Wrong data type: ${typeof data}`)
sbp('okTurtles.events/emit', PUBSUB_ERROR, this, {
message: `Wrong data type: ${typeof data}`
})
return this.destroy()
}
let msg: Message = { type: '' }

try {
msg = messageParser(data)
} catch (error) {
this.emit('error', `Critical error! Malformed message: ${error.message}`)
sbp('okTurtles.events/emit', PUBSUB_ERROR, this, {
message: `Malformed message: ${error.message}`
})
return this.destroy()
}
const handler = this.messageHandlers[msg.type]
Expand Down Expand Up @@ -268,7 +279,7 @@ const defaultClientEventHandlers = {
open (event: Event) {
console.log('[pubsub] Event: open')
if (!this.isNew) {
this.emit('reconnection-succeeded')
sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_SUCCEEDED, this)
}
this.clearAllTimers()
// Set it to -1 so that it becomes 0 on the next `close` event.
Expand Down Expand Up @@ -384,12 +395,6 @@ const defaultOptions = {
timeout: 5_000
}

const customEventNames = [
'reconnection-attempt',
'reconnection-failed',
'reconnection-scheduled',
'reconnection-succeeded'
]
const globalEventNames = ['offline', 'online']
const socketEventNames = ['close', 'error', 'message', 'open']

Expand Down Expand Up @@ -478,20 +483,6 @@ const publicMethods = {
this.shouldReconnect = false
},

// Emits a custom event or an `error` event.
// Other fake native events are not allowed so as to not break things.
emit (type: string, detail?: mixed) {
if (!customEventNames.includes(type) && type !== 'error') {
throw new Error(`emit(): argument 'type' must not be '${type}'.`)
}
// This event object partially implements the `CustomEvent` interface.
const event = { type, detail }
const listener = this.listeners[type]
if (listener) {
listener(event)
}
},

getNextRandomDelay (): number {
const {
maxReconnectionDelay,
Expand All @@ -515,11 +506,11 @@ const publicMethods = {
const nth = this.failedConnectionAttempts + 1

this.nextConnectionAttemptDelayID = setTimeout(() => {
this.emit('reconnection-attempt')
sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_ATTEMPT, this)
this.nextConnectionAttemptDelayID = undefined
this.connect()
}, delay)
this.emit('reconnection-scheduled', { delay, nth })
sbp('okTurtles.events/emit', PUBSUB_RECONNECTION_SCHEDULED, this, { delay, nth })
},

// Unused for now.
Expand Down Expand Up @@ -569,6 +560,15 @@ const publicMethods = {
}
}

// Register custom SBP event listeners before the first connection.
for (const name of Object.keys(defaultClientEventHandlers)) {
if (name === 'error' || !socketEventNames.includes(name)) {
sbp('okTurtles.events/on', `pubsub-${name}`, (target, detail?: Object) => {
target.listeners[name]({ type: name, target, detail })
})
}
}

export default {
NOTIFICATION_TYPE,
REQUEST_TYPE,
Expand Down

0 comments on commit 0095345

Please sign in to comment.