From 46214c6941d85158e288b9065da19dd5350d9754 Mon Sep 17 00:00:00 2001 From: Ed Fletcher Date: Sat, 17 Jun 2023 00:46:54 -0700 Subject: [PATCH] Add serialize_writes option --- src/client.js | 3 +- src/transports/net.js | 35 +++++++++++++-- test/rawWriteOrdering.js | 93 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 test/rawWriteOrdering.js diff --git a/src/client.js b/src/client.js index dfe220e4..a173ce92 100644 --- a/src/client.js +++ b/src/client.js @@ -56,7 +56,8 @@ module.exports = class IrcClient extends EventEmitter { message_max_length: 350, sasl_disconnect_on_fail: false, transport: default_transport, - websocket_protocol: 'text.ircv3.net' + websocket_protocol: 'text.ircv3.net', + serialize_writes: false }; const props = Object.keys(defaults); diff --git a/src/transports/net.js b/src/transports/net.js index 301525cf..3a6de213 100644 --- a/src/transports/net.js +++ b/src/transports/net.js @@ -26,18 +26,31 @@ module.exports = class Connection extends EventEmitter { this.socket_events = []; this.encoding = 'utf8'; + + this.write_queue = null; + this.write_queue_servicer = () => {}; } isConnected() { return this.state === SOCK_CONNECTED; } + _writeLineConnected(line, cb) { + if (this.encoding !== 'utf8') { + this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb); + } else { + this.socket.write(line + '\r\n', cb); + } + } + writeLine(line, cb) { if (this.socket && this.isConnected()) { - if (this.encoding !== 'utf8') { - this.socket.write(iconv.encode(line + '\r\n', this.encoding), cb); + if (this.options.serialize_writes && this.write_queue) { + if (this.write_queue.push({ line, cb }) === 1) { + this.write_queue_servicer(); + } } else { - this.socket.write(line + '\r\n', cb); + this._writeLineConnected(line, cb); } } else { this.debugOut('writeLine() called when not connected'); @@ -77,6 +90,22 @@ module.exports = class Connection extends EventEmitter { this.requested_disconnect = false; this.incoming_buffer = Buffer.from(''); + if (options.serialize_writes) { + this.write_queue = []; + this.write_queue_servicer = () => { + if (this.write_queue.length) { + this._writeLineConnected(this.write_queue[0].line, () => { + if (this.write_queue[0].cb) { + this.write_queue[0].cb(); + } + + this.write_queue = this.write_queue.slice(1); + process.nextTick(this.write_queue_servicer); + }); + } + }; + } + // Include server name (SNI) if provided host is not an IP address if (!this.getAddressFamily(ircd_host)) { sni = ircd_host; diff --git a/test/rawWriteOrdering.js b/test/rawWriteOrdering.js new file mode 100644 index 00000000..f5cca8f9 --- /dev/null +++ b/test/rawWriteOrdering.js @@ -0,0 +1,93 @@ +'use strict'; + +/* globals describe, it */ + +const net = require('net'); +const Connection = require('../src/transports/net'); + +const chai = require('chai'); + +chai.use(require('chai-subset')); + +async function runTest(serialize_writes) { + const numLines = 12; + const timeSlice = 100; + + return new Promise((resolve) => { + let conn; + let server; // eslint-disable-line prefer-const + let wroteLines = []; + const bufferedLines = []; + + const clientHandler = (client) => { + client.on('data', (data) => { + const dataStr = data.toString('utf8'); + bufferedLines.push(dataStr); + + if (wroteLines.length && wroteLines.length === bufferedLines.length) { + conn.close(); + server.close(); + resolve({ wroteLines, bufferedLines }); + } + }); + }; + + server = net.createServer(clientHandler); + server.listen(0, '0.0.0.0', () => { + conn = new Connection({ + host: server.address().address, + port: server.address().port, + tls: false, + serialize_writes, + }); + + wroteLines = Array.from({ length: numLines }).map((_, i) => i).map(String); + let delay = wroteLines.length / timeSlice; + const rudeHandler = { + get(target, prop) { + if (prop === 'write') { + return (data, cb) => { + setTimeout(() => target[prop](data, cb), delay * 1000); + delay -= 1 / timeSlice; + }; + } else { + return target[prop]; + } + } + }; + + conn.on('open', () => { + conn.socket = new Proxy(conn.socket, rudeHandler); + wroteLines.forEach((line) => conn.writeLine(line)); + }); + + conn.connect(); + }); + }); +} + +function compareLines(wroteLines, bufferedLines) { + return bufferedLines.map((l) => l.trim()).every((line, index) => line === wroteLines[index]); +} + +describe('src/transports/net.js', function() { + it('should recieve messages in reverse of the order sent when serialize_writes is false', function(done) { + runTest(false).then(({ wroteLines, bufferedLines }) => { + let error = null; + if (compareLines(wroteLines, bufferedLines) === true) { + error = new Error('Line order matches when it should not!'); + } + done(error); + }); + }); + + it('should recieve messages in the order sent when serialize_writes is true', function(done) { + runTest(true).then(({ wroteLines, bufferedLines }) => { + let error = null; + if (compareLines(wroteLines, bufferedLines) === false) { + error = new Error('Line order does not match when it should!'); + } + done(error); + }); + }); +});