From 5eb696891a300c0dc3a647ba2eb3ac068c07d7ea Mon Sep 17 00:00:00 2001 From: Cihad Tekin Date: Fri, 18 Oct 2024 16:18:10 +0300 Subject: [PATCH] [core] cache: updated deprecated api of lru-cache, cleaned up mongodb capped collection and change stream related code --- api/api.js | 4 +- api/parts/data/cache.js | 286 ++++------------------------------------ 2 files changed, 28 insertions(+), 262 deletions(-) diff --git a/api/api.js b/api/api.js index 4a0b51eeffc..af5ea02fded 100755 --- a/api/api.js +++ b/api/api.js @@ -265,7 +265,7 @@ plugins.connectToAllDatabases().then(function() { if (cluster.isMaster) { plugins.installMissingPlugins(common.db); common.runners = require('./parts/jobs/runner'); - common.cache = new CacheMaster(common.db); + common.cache = new CacheMaster(); common.cache.start().then(() => { setImmediate(() => { plugins.dispatch('/cache/init', {}); @@ -333,7 +333,7 @@ plugins.connectToAllDatabases().then(function() { console.log("Starting worker", process.pid, "parent:", process.ppid); const taskManager = require('./utils/taskmanager.js'); - common.cache = new CacheWorker(common.db); + common.cache = new CacheWorker(); common.cache.start(); //since process restarted mark running tasks as errored diff --git a/api/parts/data/cache.js b/api/parts/data/cache.js index 181b6f6e121..a2da942aaf1 100644 --- a/api/parts/data/cache.js +++ b/api/parts/data/cache.js @@ -66,7 +66,7 @@ class DataStore { * @return {int} current store size */ get length() { - return this.lru.length; + return this.lru.size; } /** @@ -97,10 +97,10 @@ class DataStore { return data; } else if (!id) { - this.lru.reset(); + this.lru.clear(); } else if (this.read(id) !== null) { - this.lru.del(id.toString()); + this.lru.delete(id.toString()); } } @@ -155,11 +155,9 @@ class CacheWorker { /** * Constructor * - * @param {Mongo} db database instance * @param {Number} size max number of cache groups */ - constructor(db, size = 100) { - this.db = db; + constructor(size = 100) { this.data = new DataStore(size); this.started = false; @@ -317,7 +315,7 @@ class CacheWorker { } /** - * Remove a record from cache and database. + * Remove a record from cache. * * @param {String} group group key * @param {String} id data key @@ -441,7 +439,7 @@ class CacheWorker { /** * Just a handy method which returns an object with partials with given group. - * + * * @param {String} group group name * @return {Object} object with all the {@code CacheWorker} methods without group */ @@ -469,54 +467,18 @@ class CacheWorker { /** * Cache instance for master process: - * - (1) listen for requests from workers; - * - (2) listen for updates from capped collection; - * - send (1) to (2) and vice versa; - * - call group operators to read / write / udpate data from db / whatever. + * - listen for requests from workers; + * - call group operators to read/write/udpate */ class CacheMaster { /** * Constructor * - * @param {Mongo} db database instance * @param {Number} size max number of cache groups */ - constructor(db, size = 100) { + constructor(size = 100) { this.data = new DataStore(size, Number.MAX_SAFE_INTEGER); this.operators = {}; - this.col = new StreamedCollection(db, CENTRAL, doc => { - log.d('collection doc %j', doc); - if (doc.o === OP.READ) { - return; - } - - let store = this.data.read(doc.g); - if (!store) { - log.w(`No store for group ${doc.g}`); - return; - } - if (doc.o === OP.PURGE && !doc.k) { // purgeAll - store.iterate(id => store.write(id, null)); - } - if (doc.o === OP.PURGE || doc.o === OP.REMOVE) { - store.write(doc.k, null); - } - else if (doc.o === OP.WRITE) { - store.write(doc.k, doc.d); - } - else if (doc.o === OP.READ) { - log.w('Reading from collection instruction shouldn\'t happen'); - store.write(doc.k, doc.d); - } - else if (doc.o === OP.UPDATE) { - store.update(doc.k, doc.d); - } - else { - throw new Error('Bad op ' + doc.o + ': ' + JSON.stringify(doc)); - } - this.ipc.send(0, doc); - }); - this.initialized = {}; this.delayed_messages = []; this.ipc = new CentralMaster(CENTRAL, ({o, g, k, d}, reply, from) => { @@ -584,24 +546,19 @@ class CacheMaster { } /** - * Attach to IPC & tailable cursor. + * Attach to IPC * - * @return {Promise} with cursor open result + * @return {Promise} void */ async start() { - log.d('starting master'); this.ipc.attach(); - await this.col.start().then(() => new Promise(res => setTimeout(() => { - log.d('started master'); - res(); - }, 100))); + log.d('started master'); } /** - * Stop cursor. + * Detaches IPC instance */ stop() { - this.col.stop(); this.ipc.detach(); } @@ -652,8 +609,8 @@ class CacheMaster { } /** - * Write data to the cache & db - * + * Write data to the cache + * * @param {String} group group key * @param {String} id data key * @param {Object} data data to store @@ -679,10 +636,8 @@ class CacheMaster { rc = rc.json; } this.data.read(group)[data === null ? 'remove' : 'write'](id, rc); - return this.col.put(OP.WRITE, group, id, rc).then(() => { - this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: rc}); - return data === null ? true : rc; - }); + this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: rc}); + return data === null ? true : rc; } else { return null; @@ -696,7 +651,7 @@ class CacheMaster { /** * Update data in the cache - * + * * @param {String} group group key * @param {String} id data key * @param {Object} update data to store @@ -715,10 +670,8 @@ class CacheMaster { if (group in this.operators) { return this.operators[group].update(id, update).then(() => { this.data.read(group).update(id, update); - return this.col.put(OP.UPDATE, group, id, update).then(() => { - this.ipc.send(-from, {o: OP.UPDATE, g: group, k: id, d: update}); - return update; - }); + this.ipc.send(-from, {o: OP.UPDATE, g: group, k: id, d: update}); + return update; }); } else { @@ -727,7 +680,7 @@ class CacheMaster { } /** - * Remove a record from cache and database. + * Remove a record from cache. * * @param {String} group group key * @param {String} id data key @@ -747,10 +700,8 @@ class CacheMaster { return this.operators[group].remove(id).then(rc => { if (rc) { this.data.read(group).remove(id); - return this.col.put(OP.WRITE, group, id, null).then(() => { - this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: null}); - return true; - }); + this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: null}); + return true; } else { return null; @@ -781,9 +732,7 @@ class CacheMaster { this.data.read(group).write(id, null); this.ipc.send(-from, {o: OP.PURGE, g: group, k: id}); - return this.col.put(OP.PURGE, group, id).then(() => { - return true; - }); + return true; } /** @@ -802,9 +751,7 @@ class CacheMaster { let grp = this.data.read(group); grp.iterate(k => grp.write(k, null)); this.ipc.send(-from, {o: OP.PURGE, g: group}); - return this.col.put(OP.PURGE, group).then(() => { - return true; - }); + return true; } /** @@ -869,7 +816,7 @@ class CacheMaster { /** * Just a handy method which returns an object with partials with given group. - * + * * @param {String} group group name * @return {Object} object with all the {@code CacheWorker} methods without group */ @@ -896,187 +843,6 @@ class CacheMaster { } - -/** - * Ensure capped collection exists and return latest document in it to start streaming from. - * - * @param {Mongo} db db object - * @param {String} name collection name - * @param {Number} size collection size - * @return {Promise} resolves to array of [collection, _id of last record] - */ -function createCollection(db, name, size = 1e7) { - return new Promise((resolve, reject) => { - db.createCollection(name, {capped: true, size: size}, (e) => { - if (e && e.codeName !== "NamespaceExists") { - log.e(`Error while creating capped collection ${name}:`, e); - return reject(e); - } - - let col = db.collection(name); - - col.find().sort({_id: -1}).limit(1).toArray((err, arr) => { - if (err) { - log.e(`Error while looking for last record in ${name}:`, err); - return reject(err); - } - if (arr && arr.length) { - log.d('Last change id %s', arr[0]._id); - resolve([col, arr[0]._id]); - } - else { - col.insertOne({first: true}, (error, res) => { - if (error) { - log.e(`Error while looking for last record in ${name}:`, error); - return reject(e); - } - log.d('Inserted first change id %s', res.insertedId); - resolve([col, res.insertedId]); - }); - } - }); - }); - }); -} - -/** - * Class encapsulating a capped collection watching a particular cache modifications. - */ -class StreamedCollection { - /** - * Constructor - * @param {Mongo} db db object - * @param {String} name collection name - * @param {Function} handler a function handling IPC requests - */ - constructor(db, name, handler) { - this.db = db; - this.name = name; - this.handler = handler; - this.inserts = []; - } - - /** - * Start change stream - */ - async start() { - if (this.stream) { - log.w('Stream already started'); - return; - } - - log.i('Starting watcher stream in %d', process.pid); - - try { - let [col, last] = await createCollection(this.db, this.name, 1e7); - - this.col = col; - this.stream = col.find({_id: {$gt: last}}, {tailable: true, awaitData: true, numberOfRetries: -1}).stream(); - - this.stream.on('data', doc => { - if (this.inserts.indexOf(doc._id.toString()) !== -1) { - return this.inserts.splice(this.inserts.indexOf(doc._id.toString()), 1); - } - log.d('new in the collection', doc); - if (doc.d !== undefined && doc.d !== null) { - try { - doc.d = JSON.parse(doc.d); - this.handler(doc); - } - catch (e) { - log.e(e); - } - } - else { - this.handler(doc); - } - }); - - this.stream.on('end', () => { - log.w('Stream ended'); - this.close(); - }); - - this.stream.on('close', () => { - log.d('Stream closed'); - this.stream = undefined; - setImmediate(() => { - if (!this.stopped) { - this.start().catch(e => { - log.e('Cannot start watcher', e); - }); - } - }); - }); - - this.stream.on('error', error => { - log.e('Stream error', error); - this.close(); - }); - - } - catch (e) { - setTimeout(() => { - try { - if (!this.stopped) { - this.start(); - } - } - catch (ignored) { - // ignored - } - }, 1000); - } - } - - /** - * Close change stream - */ - stop() { - this.stopped = true; - this.close(); - } - - /** - * Close change stream - */ - close() { - if (this.stream) { - this.stream.destroy(); - this.stream = undefined; - log.d('Stream closedd'); - } - } - - /** - * Add a record to the collection - * @param {int} o operation type (see OP above) - * @param {String} g group name - * @param {String} k key - document key - * @param {Object} d data - * @return {Promise} resolves to the inserted change document - */ - put(o, g, k, d) { - let doc = { - _id: new this.db.ObjectID(), - o: o, - g: g, - k: k, - d: d ? JSON.stringify(d) : d - }; - log.d('putting to the collection', d); - this.inserts.push(doc._id.toString()); - return new Promise((res, rej) => this.col.insertOne(doc, err => { - if (err) { - this.inserts.splice(this.inserts.indexOf(doc._id.toString()), 1); - rej(err); - } - else { - res(doc); - } - })); - } -} /** * Data class for tests */