Skip to content

Commit

Permalink
Refactor some parts of backend/pubsub.js
Browse files Browse the repository at this point in the history
  • Loading branch information
snowteamer committed Mar 21, 2021
1 parent b90675e commit 77d2f41
Showing 1 changed file with 43 additions and 44 deletions.
87 changes: 43 additions & 44 deletions backend/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const WebSocket = require('ws')

// ====== API ====== //

const { PONG, PUB, SUB, UNSUB } = NOTIFICATION_TYPE
const { PING, PONG, PUB, SUB, UNSUB } = NOTIFICATION_TYPE
const { ERROR, SUCCESS } = RESPONSE_TYPE

// Re-export some useful things from the shared module.
Expand All @@ -41,11 +41,10 @@ export function createResponse (type: ResponseTypeEnum, data: JSONType): string
*
* @param {(http.Server|https.Server)} server - A Node.js HTTP/S server to attach to.
* @param {Object?} options
* {object?} clientHandlers - Custom handlers for socket events.
* {object?} messageHandlers - Custom handlers for different message types.
* {object?} serverHandlers - Custom handlers for server events.
* {object?} socketHandlers - Custom handlers for socket events.
* {number} backlog - The maximum length of the queue of pending connections.
* {boolean} clientTracking - Specifies whether or not to track clients.
* {Function} handleProtocols - A function which can be used to handle the WebSocket subprotocols.
* {number} maxPayload - The maximum allowed message size in bytes.
* {string} path - Accept only connections matching this path.
Expand All @@ -56,21 +55,21 @@ export function createServer (httpServer: Object, options?: Object = {}): Object
const server = new WebSocket.Server({
...defaultOptions,
...options,
...{ clientTracking: true },
server: httpServer
})
server.customServerEventHandlers = options.serverHandlers || {}
server.customSocketEventHandlers = options.clientHandlers || {}
server.customSocketEventHandlers = options.socketHandlers || {}
server.messageHandlers = { ...defaultMessageHandlers, ...options.messageHandlers }
server.pingInterval = undefined
server.pingIntervalID = undefined
server.subscribersByContractID = Object.create(null)

// Add listeners for server events, i.e. events emitted on the server object.
;['close', 'connection', 'error', 'headers', 'listening'].forEach((name) => {
Object.keys(defaultServerHandlers).forEach((name) => {
server.on(name, (...args) => {
console.log('[pubsub] Server event:', name)
try {
const customHandler = server.customServerEventHandlers[name]
const defaultHandler = (defaultServerHandlers: Object)[name]
const defaultHandler = defaultServerHandlers[name]
// Always call the default handler first.
if (defaultHandler) {
defaultHandler.call(server, ...args)
Expand All @@ -85,40 +84,36 @@ export function createServer (httpServer: Object, options?: Object = {}): Object
})
// Setup a ping interval if required.
if (server.options.pingInterval > 0) {
server.pingInterval = setInterval(() => {
server.pingIntervalID = setInterval(() => {
console.debug('[pubsub] Pinging clients')
server.clients.forEach((client) => {
if (client.pinged && !client.activeSinceLastPing) {
console.log(`[pubsub] Closing irresponsive client ${client.id}`)
return client.terminate()
}
if (client.readyState === WebSocket.OPEN) {
client.send(createMessage('ping', Date.now()), () => {
client.send(createMessage(PING, Date.now()), () => {
client.activeSinceLastPing = false
client.pinged = true
})
}
})
}, server.options.pingInterval)
}
return Object.assign(server, serverMethods)
return Object.assign(server, publicMethods)
}

const defaultOptions = {
clientTracking: true,
maxPayload: 6 * 1024 * 1024,
pingInterval: 30_000
}

const generateSocketID = (() => {
let counter = 0

return (debugID) => String(counter++) + (debugID ? '-' + debugID : '')
})()

// Default handlers for server events.
// The `this` binding refers to the server object.
const defaultServerHandlers = {
close () {
console.log('[pubsub] Server event: close')
},
/**
* Emitted when a connection handshake completes.
*
Expand Down Expand Up @@ -157,24 +152,31 @@ const defaultServerHandlers = {
}
})
})
},
error (error: Error) {
console.log('[pubsub] Server event: error', error)
},
headers () {
},
listening () {
console.log('[pubsub] Server event: listening')
}
}

// Default handlers for server-side client socket events.
// The `this` binding refers to the connected `ws` socket object.
const defaultSocketEventHandlers = {
close (code: string, reason: string) {
const socketID = this.id
const { server } = this
// Notify other client sockets that this one has left any room they shared.
for (const contractID of this.subscriptions) {
const notification = createNotification(UNSUB, { contractID, socketID: this.id })
for (const client of this.server.clients) {
// This should exclude this client.
if (client.readyState === WebSocket.OPEN && client.subscriptions.has(contractID)) {
client.send(notification)
}
}
const subscribers = server.subscribersByContractID[contractID]
// Remove this socket from the subscribers of the given contract.
subscribers.delete(this)
const notification = createNotification(UNSUB, { contractID, socketID })
server.broadcast(notification, { to: subscribers })
}
this.activeSinceLastPing = false
this.subscriptions.clear()
},

Expand All @@ -195,9 +197,8 @@ const defaultSocketEventHandlers = {
} catch (error) {
// Log the error message and stack trace but do not send it to the client.
logger(error)
const failure = createResponse(ERROR, { ...msg })
// Should we call 'this.terminate()' instead?
this.send(failure, () => this.close())
this.send(createResponse(ERROR, { ...msg }), () => this.close())
}
} else {
console.error(`[pubsub] Unhandled message type: ${msg.type}`)
Expand Down Expand Up @@ -231,16 +232,12 @@ const defaultMessageHandlers = {
}
const subscribers = server.subscribersByContractID[contractID]
// Add this socket to the subscribers of the given contract.
subscribers.add(this)
subscribers.add(socket)
// Broadcast a notification to every other open subscriber.
const notification = createNotification(type, { contractID, socketID })
for (const subscriber of subscribers) {
if (subscriber.readyState === WebSocket.OPEN) {
subscriber.send(notification)
}
}
server.broadcast(notification, { to: subscribers })
}
this.send(createResponse(SUCCESS, { type, contractID }))
socket.send(createResponse(SUCCESS, { type, contractID }))
},

[UNSUB] ({ contractID, type }: UnsubMessage) {
Expand All @@ -254,21 +251,23 @@ const defaultMessageHandlers = {
if (server.subscribersByContractID[contractID]) {
const subscribers = server.subscribersByContractID[contractID]
// Remove this socket from the subscribers of the given contract.
subscribers.delete(this)
// Broadcast a notification to every other open subscriber.
subscribers.delete(socket)
// Broadcast a notification to every remaining open subscriber.
const notification = createNotification(type, { contractID, socketID })
for (const subscriber of subscribers) {
if (subscriber.readyState === WebSocket.OPEN) {
subscriber.send(notification)
}
}
server.broadcast(notification, { to: subscribers })
}
}
this.send(createResponse(SUCCESS, { type, contractID }))
socket.send(createResponse(SUCCESS, { type, contractID }))
}
}

const serverMethods = {
const generateSocketID = (() => {
let counter = 0

return (debugID) => String(counter++) + (debugID ? '-' + debugID : '')
})()

const publicMethods = {
/**
* Broadcasts a message, ignoring clients which are not open.
*
Expand Down

0 comments on commit 77d2f41

Please sign in to comment.