diff --git a/src/socket/index.ts b/src/socket/index.ts index d28d1aa..16b4980 100644 --- a/src/socket/index.ts +++ b/src/socket/index.ts @@ -1,14 +1,14 @@ import { WebSocketServer } from "ws"; import { buildMessageStreammingWebSocket } from "./message-streaming"; import { authenticateWebSocket } from "./authentication"; -import { syncEntriesViaSocket } from "./sync"; +import { buildSyncingWebSocket, syncEntries } from "./sync"; export function setupWebsocketServer(server: WebSocketServer) { server.on("connection", async (socket, request) => { // Enable message streaming const messageSocket = buildMessageStreammingWebSocket(socket); - // Authenticate user and node + // Authenticate user const authenticatedSocket = authenticateWebSocket(messageSocket, request); if (!authenticatedSocket) { messageSocket.close(); @@ -16,6 +16,11 @@ export function setupWebsocketServer(server: WebSocketServer) { } // Sync entries - syncEntriesViaSocket(authenticatedSocket); + try { + const syncingSocket = buildSyncingWebSocket(authenticatedSocket); + await syncEntries(syncingSocket); + } finally { + socket.close(); + } }); } diff --git a/src/socket/sync-error.ts b/src/socket/sync-error.ts new file mode 100644 index 0000000..b7c6df4 --- /dev/null +++ b/src/socket/sync-error.ts @@ -0,0 +1,3 @@ +import { ServerError } from "../error"; + +export class MalformedMessageError extends ServerError {} diff --git a/src/socket/sync-handler-misc.ts b/src/socket/sync-handler-misc.ts new file mode 100644 index 0000000..e69de29 diff --git a/src/socket/sync-service.ts b/src/socket/sync-service.ts new file mode 100644 index 0000000..8074606 --- /dev/null +++ b/src/socket/sync-service.ts @@ -0,0 +1,23 @@ +import { EntityManager } from "typeorm"; +import { getManager } from "../db"; +import { UserService } from "../service/user"; +import Container from "typedi"; +import { HistoryService } from "../service/history"; +import { NodeService } from "../service/node"; +import { EntryService } from "../service/entry"; + +export class SyncService { + public manager: EntityManager; + public user: UserService; + public history: HistoryService; + public node: NodeService; + public entry: EntryService; + + public constructor() { + this.manager = getManager(); + this.user = Container.get(UserService); + this.history = Container.get(HistoryService); + this.node = Container.get(NodeService); + this.entry = Container.get(EntryService); + } +} diff --git a/src/socket/sync-utils.ts b/src/socket/sync-utils.ts new file mode 100644 index 0000000..3787e3d --- /dev/null +++ b/src/socket/sync-utils.ts @@ -0,0 +1,63 @@ +import { ControlMessage, Message } from "./message"; +import { BadParameterError } from "../error"; +import { RawData } from "ws"; +import { MalformedMessageError } from "./sync-error"; +import { SyncingWebSocket } from "./sync"; + +function tryParseMessageJSON(socket: SyncingWebSocket, data: RawData): Message { + let message; + + try { + message = JSON.parse(data.toString()); + } catch (err) { + if (err instanceof SyntaxError) { + socket.sendMessage( + new ControlMessage([new BadParameterError("Bad JSON syntax.")]) + ); + socket.close(); + } + + socket.log(err); + throw err; + } + + return message; +} +function checkMessageIntegrity( + socket: SyncingWebSocket, + message: Message +): void { + if (typeof message.session !== "string" || message.session === "") { + socket.sendMessage( + new ControlMessage([ + new BadParameterError( + "Messsage should contain an non-empty session of string type." + ), + ]) + ); + socket.close(); + throw new MalformedMessageError(); + } + + if (!message.type) { + socket.replyMessage( + message, + new ControlMessage([ + new BadParameterError( + "Each message must contain a valid `type` field." + ), + ]) + ); + throw new MalformedMessageError(); + } +} + +export function parseMessage( + socket: SyncingWebSocket, + data: RawData +): Message | null { + const message = tryParseMessageJSON(socket, data); + checkMessageIntegrity(socket, message); + + return message; +} diff --git a/src/socket/sync.ts b/src/socket/sync.ts index 86a6729..e1770b5 100644 --- a/src/socket/sync.ts +++ b/src/socket/sync.ts @@ -3,7 +3,6 @@ import { GoodbyeMessage, ControlMessage, DebugNodeUpdateMessage, - Message, SyncModeFullEntriesQueryMessage, SyncModeFullEntriesResponseMessage, SyncModeFullMetaQueryMessage, @@ -11,10 +10,11 @@ import { SyncModeRecentRequestMessage, SyncModeRecentResponseMessage, HandshakeMessage, + Message, } from "./message"; import { Container } from "typedi"; import { TRANSMISSION_PAGING_SIZE } from "../env"; -import { BadParameterError, HistoryCursorInvalidError } from "../error"; +import { HistoryCursorInvalidError } from "../error"; import { getManager } from "../db"; import { Node } from "../entity/node"; import { EntryHistory } from "../entity/entry-history"; @@ -23,184 +23,146 @@ import { In, MoreThan } from "typeorm"; import { Entry } from "../entity/entry"; import { NodeService } from "../service/node"; import { EntryService } from "../service/entry"; -import { RawData } from "ws"; import { checkGcInDevelopmentEnvironment, isDevelopmentEnvironment, -} from "../util"; +} from "../utils"; import { SyncState } from "./sync-state"; +import { parseMessage } from "./sync-utils"; +import { SyncService } from "./sync-service"; -const manager = getManager(); -const entryService = Container.get(EntryService); -const historyService = Container.get(HistoryService); -const nodeService = Container.get(NodeService); +export type MessageHandler = ( + this: SyncingWebSocket, + message: any +) => Promise; -type MessageHandler = (socket: SyncingWebSocket, message: any) => Promise; +const handlers: Map = new Map(); -type SyncingWebSocket = AuthenticatedWebSocket & { - syncState: SyncState; - handlers: Map; +export type SyncingWebSocket = AuthenticatedWebSocket & { + state: SyncState; + service: SyncService; }; -function parseMessage(socket: SyncingWebSocket, data: RawData): Message | null { - let message; - try { - message = JSON.parse(data.toString()); - } catch (err) { - if (err instanceof SyntaxError) { - socket.sendMessage( - new ControlMessage([new BadParameterError("Bad JSON syntax.")]) - ); - socket.close(); - return null; - } else { - socket.log(err); - throw err; - } - } - - if (typeof message.session !== "string" || message.session === "") { - socket.sendMessage( - new ControlMessage([ - new BadParameterError( - "Messsage should contain an non-empty session of string type." - ), - ]) - ); - socket.close(); - return null; - } - - if (!message.type) { - socket.replyMessage( - message, - new ControlMessage([ - new BadParameterError( - "Each message must contain a valid `type` field." - ), - ]) - ); - return null; - } - - return message; -} - /** Sync entries from peer node, ie. client to server. */ -async function initSyncFromPeerNode(socket: SyncingWebSocket) { - const { userId, nodeUuid } = socket.authState; +async function initSyncFromPeerNode(this: SyncingWebSocket) { + const { userId, nodeUuid } = this.authState; - socket.log("Initializing synchronization from peer node."); + this.log("Initializing synchronization from peer node."); - socket.log("Looking up for node sync record."); - const node = await manager.findOne(Node, { + this.log("Looking up for node sync record."); + const node = await this.service.manager.findOne(Node, { where: { uuid: nodeUuid }, }); if (node) { - socket.log("Node sync record found, performing a recent-sync."); - const session = socket.sendMessage( + this.log("Node sync record found, performing a recent-sync."); + const session = this.sendMessage( new SyncModeRecentRequestMessage(node.historyCursor) ); - socket.log( - `Sending SyncModeRecentRequestMessage #${++socket.syncState.sent[ + this.log( + `Sending SyncModeRecentRequestMessage #${++this.state.sent[ "sync-recent-request-count" ]} [${session}].` ); } else { - socket.log( + this.log( "No node record found, recording this node and performing a full-sync." ); - const client = manager.create(Node, { + const client = this.service.manager.create(Node, { uuid: nodeUuid, user: { id: userId, }, }); - await manager.save(client); + await this.service.manager.save(client); - const session = socket.sendMessage(new SyncModeFullMetaQueryMessage(0)); - socket.log( - `Sending SyncModeFullMetaQueryMessage #${++socket.syncState.sent[ + const session = this.sendMessage(new SyncModeFullMetaQueryMessage(0)); + this.log( + `Sending SyncModeFullMetaQueryMessage #${++this.state.sent[ "sync-full-meta-query-count" ]} [${session}].` ); } - socket.syncState.hasSyncBegun = true; + this.state.hasSyncBegun = true; } -export async function syncEntriesViaSocket(socket: AuthenticatedWebSocket) { +export function buildSyncingWebSocket( + socket: AuthenticatedWebSocket +): SyncingWebSocket { const syncingSocket = socket as SyncingWebSocket; - syncingSocket.syncState = new SyncState(); - syncingSocket.handlers = new Map(); + syncingSocket.state = new SyncState(); + return syncingSocket; +} + +export async function syncEntries(socket: SyncingWebSocket) { try { - await doSync(syncingSocket); + await doSync(socket); } catch (err) { socket.log(err); socket.log("Sync aborted due to error."); } } -const controlMessageHandler: MessageHandler = async ( - _socket, +export function controlMessageHandler( + this: SyncingWebSocket, _message: ControlMessage -) => { +) { return; // Do nothing -}; +} -const handshakeMessageHandler: MessageHandler = async ( - _socket, +export function handshakeMessageHandler( + this: SyncingWebSocket, _message: HandshakeMessage -) => { +) { return; // Do nothing -}; +} -const goodbyeMessageHandler: MessageHandler = async ( - socket, +export function goodbyeMessageHandler( + this: SyncingWebSocket, _message: GoodbyeMessage -) => { - socket.syncState.received["goodbye"] = true; - socket.log("Receiving goodbye message from client."); -}; +) { + this.state.received["goodbye"] = true; + this.log("Receiving goodbye message from client."); +} -const syncModeRecentRequestMessageHandler: MessageHandler = async ( - socket, +export async function syncModeRecentRequestMessageHandler( + this: SyncingWebSocket, message: SyncModeRecentRequestMessage -) => { - const { userId } = socket.authState; +) { + const { userId } = this.authState; const { historyCursor } = message.payload; - socket.log(`Receiving SyncModeRecentRequestMessage.`); + this.log(`Receiving SyncModeRecentRequestMessage.`); const rejectInvalidCursor = () => { const errorMessage = new SyncModeRecentResponseMessage(null, []); errorMessage.errors.push(new HistoryCursorInvalidError()); - socket.replyMessage(message, errorMessage); + this.replyMessage(message, errorMessage); }; if (!historyCursor) { // Invalid history cursor rejectInvalidCursor(); - socket.log("History cursor is empty, rejecting."); + this.log("History cursor is empty, rejecting."); return; } - const history = await historyService.locateHistoryCursorOfUser( + const history = await this.service.history.locateHistoryCursorOfUser( historyCursor, userId ); if (!history) { // Broken history cursor, which indicates client to fall back to full sync. - socket.log("History cursor mismatched, rejecting."); + this.log("History cursor mismatched, rejecting."); rejectInvalidCursor(); return; } // Valid history cursor, get histories after that cursor and send relating entries to client - const histories = await manager.find(EntryHistory, { + const histories = await this.service.manager.find(EntryHistory, { where: { id: MoreThan(history.id), }, @@ -214,7 +176,7 @@ const syncModeRecentRequestMessageHandler: MessageHandler = async ( const entries = histories.length > 0 - ? await manager.find(Entry, { + ? await this.service.manager.find(Entry, { where: { uuid: In(histories.map((history) => history.entryUuid)), }, @@ -222,22 +184,22 @@ const syncModeRecentRequestMessageHandler: MessageHandler = async ( : []; const plainEntries = entries.map((entry) => entry.toPlain()); - socket.replyMessage( + this.replyMessage( message, new SyncModeRecentResponseMessage(nextHistroyCursor, plainEntries) ); - socket.log("Replying SyncModeRecentResponseMessage."); -}; + this.log("Replying SyncModeRecentResponseMessage."); +} -const syncModeRecentResponseMessageHandler: MessageHandler = async ( - socket, +async function syncModeRecentResponseMessageHandler( + this: SyncingWebSocket, message: SyncModeRecentResponseMessage -) => { +) { const { session } = message; - const { userId, nodeUuid } = socket.authState; + const { userId, nodeUuid } = this.authState; - socket.log( - `Receiving SyncModeRecentResponseMessage #${++socket.syncState.received[ + this.log( + `Receiving SyncModeRecentResponseMessage #${++this.state.received[ "sync-recent-response-count" ]} [${session}].` ); @@ -247,12 +209,12 @@ const syncModeRecentResponseMessageHandler: MessageHandler = async ( if (errors.length > 0) { // If errors, sync-recent won't work. // We have to fallback to sync-full. - socket.log(`Sync-recent failed due to errors: ${errors.join(" ")}.`); - socket.log("Fallback to sync-full."); + this.log(`Sync-recent failed due to errors: ${errors.join(" ")}.`); + this.log("Fallback to sync-full."); - const session = socket.sendMessage(new SyncModeFullMetaQueryMessage(0)); - socket.log( - `Sending SyncModeFullMetaQueryMessage #${++socket.syncState.sent[ + const session = this.sendMessage(new SyncModeFullMetaQueryMessage(0)); + this.log( + `Sending SyncModeFullMetaQueryMessage #${++this.state.sent[ "sync-full-meta-query-count" ]} [${session}].` ); @@ -262,7 +224,7 @@ const syncModeRecentResponseMessageHandler: MessageHandler = async ( const { historyCursor, entries } = message.payload; if (historyCursor) { - await nodeService.updateClientHistoryCursor( + await this.service.node.updateClientHistoryCursor( userId, nodeUuid, historyCursor @@ -271,39 +233,39 @@ const syncModeRecentResponseMessageHandler: MessageHandler = async ( await Promise.allSettled( entries.map((entry) => { - return entryService.saveEntryIfNewOrFresher(userId, entry); + return this.service.entry.saveEntryIfNewOrFresher(userId, entry); }) ); if (entries.length === 0) { // If `entries` are empty, we must reach the end of history and sync-recent has completed. - socket.log( + this.log( `Entries payload of received SyncModeFullMetaQueryMessage is empty. Sync-recent finishes.` ); } else { // Continue to request client for sync-recent with lastest histroy cursor - const session = socket.sendMessage( + const session = this.sendMessage( new SyncModeRecentRequestMessage(historyCursor) ); - socket.log( - `Sending SyncModeRecentRequestMessage #${++socket.syncState.sent[ + this.log( + `Sending SyncModeRecentRequestMessage #${++this.state.sent[ "sync-recent-request-count" ]} [${session}].` ); } return; -}; +} -const syncModeFullMetaQueryMessageHandler: MessageHandler = async ( - socket, +async function syncModeFullMetaQueryMessageHandler( + this: SyncingWebSocket, message: SyncModeFullMetaQueryMessage -) => { - const { userId } = socket.authState; +) { + const { userId } = this.authState; const { skip } = message.payload; - const cursor = await historyService.getLastestCursor(userId); - const entries = await manager.find(Entry, { + const cursor = await this.service.history.getLastestCursor(userId); + const entries = await this.service.manager.find(Entry, { where: { user: { id: userId, @@ -314,76 +276,75 @@ const syncModeFullMetaQueryMessageHandler: MessageHandler = async ( }); const meta = entries.map((entry) => entry.getMetadata()); - socket.replyMessage( + this.replyMessage( message, new SyncModeFullMetaResponseMessage(skip, cursor, meta) ); return; -}; +} -const syncModeFullMetaResponseMessageHandler: MessageHandler = async ( - socket, +async function syncModeFullMetaResponseMessageHandler( + this: SyncingWebSocket, message: SyncModeFullMetaResponseMessage -) => { +) { const { session } = message; const { skip, currentCursor, entryMetadata } = message.payload; - socket.log( - `Receiving SyncModeFullMetaResponseMessage #${++socket.syncState.received[ + this.log( + `Receiving SyncModeFullMetaResponseMessage #${++this.state.received[ "sync-full-meta-response-count" ]} [${session}].` ); if (currentCursor) { - if (!socket.syncState.received["sync-full-entries-response-first-cursor"]) { - socket.syncState.received["sync-full-entries-response-first-cursor"] = + if (!this.state.received["sync-full-entries-response-first-cursor"]) { + this.state.received["sync-full-entries-response-first-cursor"] = currentCursor; - socket.log( + this.log( "Updating cursor from received SyncModeFullMetaResponseMessage." ); } } if (entryMetadata.length === 0) { - socket.log("No entry metadata received. Sync-full finished."); + this.log("No entry metadata received. Sync-full finished."); return; } - const metaQueryMessageSession = socket.sendMessage( + const metaQueryMessageSession = this.sendMessage( new SyncModeFullMetaQueryMessage(skip + entryMetadata.length) ); - socket.log( - `Sending SyncModeFullMetaQueryMessage #${++socket.syncState.sent[ + this.log( + `Sending SyncModeFullMetaQueryMessage #${++this.state.sent[ "sync-full-meta-query-count" ]} [${metaQueryMessageSession}].` ); - const fresherEntryMetadata = await entryService.filterFresherEntryMetadata( - entryMetadata - ); - const entriesQuerySession = socket.sendMessage( + const fresherEntryMetadata = + await this.service.entry.filterFresherEntryMetadata(entryMetadata); + const entriesQuerySession = this.sendMessage( new SyncModeFullEntriesQueryMessage( fresherEntryMetadata.map((meta) => meta.uuid) ) ); - socket.log( - `Sending SyncModeFullEntriesQueryMessage #${++socket.syncState.sent[ + this.log( + `Sending SyncModeFullEntriesQueryMessage #${++this.state.sent[ "sync-full-entries-query-count" ]} [${entriesQuerySession}].` ); return; -}; +} -const syncModeFullEntriesQueryMessageHandler: MessageHandler = async ( - socket, +async function syncModeFullEntriesQueryMessageHandler( + this: SyncingWebSocket, message: SyncModeFullEntriesQueryMessage -) => { - const { userId } = socket.authState; +) { + const { userId } = this.authState; const { uuids: uuids } = message.payload; - const entries = await manager.find(Entry, { + const entries = await this.service.manager.find(Entry, { where: { uuid: In(uuids), user: { @@ -394,42 +355,45 @@ const syncModeFullEntriesQueryMessageHandler: MessageHandler = async ( const plainEntries = entries.map((entry) => entry.toPlain()); - socket.replyMessage( + this.replyMessage( message, new SyncModeFullEntriesResponseMessage(plainEntries) ); - socket.log("Sending SyncModeFullEntriesResponseMessage."); -}; + this.log("Sending SyncModeFullEntriesResponseMessage."); +} -const syncModeFullEntriesResponseMessageHandler: MessageHandler = async ( - socket, +async function syncModeFullEntriesResponseMessageHandler( + this: SyncingWebSocket, message: SyncModeFullEntriesResponseMessage -) => { - const { userId } = socket.authState; +) { + const { userId } = this.authState; const { session } = message; const { entries } = message.payload; - socket.log( - `Receiving SyncModeFullEntriesResponseMessage #${++socket.syncState - .received["sync-full-entries-response-count"]} [${session}].` + this.log( + `Receiving SyncModeFullEntriesResponseMessage #${++this.state.received[ + "sync-full-entries-response-count" + ]} [${session}].` ); await Promise.allSettled( - entries.map((entry) => entryService.saveEntryIfNewOrFresher(userId, entry)) + entries.map((entry) => + this.service.entry.saveEntryIfNewOrFresher(userId, entry) + ) ); return; -}; +} -const debugNodeUpdateMessageHandler: MessageHandler = async ( - socket, +async function debugNodeUpdateMessageHandler( + this: SyncingWebSocket, message: DebugNodeUpdateMessage -) => { - const { nodeUuid } = socket.authState; +) { + const { nodeUuid } = this.authState; const { historyCursor } = message.payload as DebugNodeUpdateMessage["payload"]; - const manager = getManager(); + const manager = this.service.manager; await manager .createQueryBuilder() .update(Node, { @@ -440,24 +404,24 @@ const debugNodeUpdateMessageHandler: MessageHandler = async ( }) .execute(); - socket.replyMessage(message, new ControlMessage()); + this.replyMessage(message, new ControlMessage()); return; -}; +} async function cleanUpAfterSyncFull(socket: SyncingWebSocket) { const { userId, nodeUuid } = socket.authState; // If a sync-full has completed successfully, // update cursor so that next sync could be accelerated. - if (socket.syncState.isSyncFullExecuted()) { + if (socket.state.isSyncFullExecuted()) { if ( - socket.syncState.isSyncFullSucceed() && - socket.syncState.received["sync-full-entries-response-first-cursor"] + socket.state.isSyncFullSucceed() && + socket.state.received["sync-full-entries-response-first-cursor"] ) { - await nodeService.updateClientHistoryCursor( + await socket.service.node.updateClientHistoryCursor( userId, nodeUuid, - socket.syncState.received["sync-full-entries-response-first-cursor"] + socket.state.received["sync-full-entries-response-first-cursor"] ); socket.log("Sync full succeed and cursor is updated."); } else { @@ -468,9 +432,7 @@ async function cleanUpAfterSyncFull(socket: SyncingWebSocket) { } } -export async function doSync(socket: SyncingWebSocket) { - const { handlers } = socket; - +(function installHandlers() { const type2Handlers = [ ["ctrl", controlMessageHandler], ["handshake", handshakeMessageHandler], @@ -490,7 +452,21 @@ export async function doSync(socket: SyncingWebSocket) { for (const [type, handler] of type2Handlers) { handlers.set(type, handler); } +})(); + +async function dispatchMessage(socket: SyncingWebSocket, message: Message) { + const handler = handlers.get(message.type)?.bind(socket); + if (handler) { + socket.state.processingMessageCount++; + await handler(message); + socket.state.processingMessageCount--; + } else { + socket.replyUnrecognizedMessage(message); + socket.log(`Unable to recognize client message type: ${message.type}.`); + } +} +export async function doSync(socket: SyncingWebSocket) { socket.on("message", async (data) => { const message = parseMessage(socket, data); if (!message) { @@ -498,26 +474,18 @@ export async function doSync(socket: SyncingWebSocket) { } // Dispatch message to handler accroding to its type. - const handler = handlers.get(message.type); - if (handler) { - socket.syncState.processingMessageCount++; - await handler(socket, message); - socket.syncState.processingMessageCount--; - } else { - socket.replyUnrecognizedMessage(message); - socket.log(`Unable to recognize client message type: ${message.type}.`); - } + dispatchMessage(socket, message); // If we are not syncing anything from client, then say goodbye to client. if ( - socket.syncState.hasSyncBegun && - socket.syncState.processingMessageCount === 0 && - !socket.syncState.isSyncRecentOngoing() && - !socket.syncState.isSyncFullOngoing() && - !socket.syncState.sent["goodbye"] + socket.state.hasSyncBegun && + socket.state.processingMessageCount === 0 && + !socket.state.isSyncRecentOngoing() && + !socket.state.isSyncFullOngoing() && + !socket.state.sent["goodbye"] ) { socket.sendMessage(new GoodbyeMessage()); - socket.syncState.sent["goodbye"] = true; + socket.state.sent["goodbye"] = true; cleanUpAfterSyncFull(socket); socket.log( "We are not syncing anything from client, and it's time to say goodbye." @@ -526,12 +494,12 @@ export async function doSync(socket: SyncingWebSocket) { // If both client and server have said goodbye, then disconnect. if ( - socket.syncState.processingMessageCount === 0 && - socket.syncState.received["goodbye"] && - socket.syncState.sent["goodbye"] + socket.state.processingMessageCount === 0 && + socket.state.received["goodbye"] && + socket.state.sent["goodbye"] ) { - if (!socket.syncState.isClosing) { - socket.syncState.isClosing = true; + if (!socket.state.isClosing) { + socket.state.isClosing = true; socket.close(); socket.log("Two-way synchronization finished.");