Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] FIX migrate MongoDB driver to 4.x #689

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
2 changes: 1 addition & 1 deletion CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
@@ -1 +1 @@

- Upgrade mongodb dep from 3.6.12 to 4.7.0
4 changes: 3 additions & 1 deletion config.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ config.mongo = {
// The URI to use for the database connection. It supports replica set URIs.
// mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]][/[database][?options]]
// I.e.: 'mongodb://user:pass@host1:27017,host2:27018,host3:27019/cep?replicaSet=myrep'
url: 'mongodb://localhost:27017/cep'
url: 'mongodb://localhost:27017/cep',
// MongoDB connection timeout. Default is 30000ms
connectTimeoutMS: 30000
};

/**
Expand Down
11 changes: 7 additions & 4 deletions docs/API/plain_rules.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ fields:
- **checkInterval**: _mandatory_, time in minutes for checking the attribute. Min value is 0.5 and max is 35791, other
values are truncated to them (a warning log message is generated if such truncation occurs)
- **attribute**: _mandatory_, attribute for watch
- **reportInterval**: _mandatory_, time in seconds to see an entity as silent (in other words, how much time has to pass since last entity update to consider that entity is in "no signal" situation so the rule is triggered)
- **reportInterval**: _mandatory_, time in seconds to see an entity as silent (in other words, how much time has to
pass since last entity update to consider that entity is in "no signal" situation so the rule is triggered)
- **id** or **idRegexp**: _mandatory_ (but not both at the same time), ID or regular expression of the entity to watch
- type: _optional_, type of entities to watch

Expand All @@ -181,10 +182,12 @@ checkInterval could impact on performance.
## Non signal actions

The following virtual attributes are available to be used in non signal template actions:

- **service**: service of rule
- **subservice**: subservice of rule
- **ruleName**: name of the rule
- **reportInterval**: time to see an entity as silent (in other words, how much time has to pass since last entity update to consider that entity is in "no signal" situation so the rule is triggered)
- **reportInterval**: time to see an entity as silent (in other words, how much time has to pass since last entity
update to consider that entity is in "no signal" situation so the rule is triggered)
- **id**: entity id
- **type**: entity type
- **internalCurrentTime**: current time
Expand Down Expand Up @@ -1384,7 +1387,7 @@ will send to core the "event"
"subservice": "/",
"service": "unknownt",
"myJsonValue__color": "blue",
"myJsonValue": {"type":"myType1","value":{"color":"blue"}},
"myJsonValue": { "type": "myType1", "value": { "color": "blue" } },
"myArrayValue__0": "green",
"myArrayValue__1": "black",
"myArrayValue": { "type": "myType2", "value": ["green", "blue"] },
Expand Down Expand Up @@ -1412,7 +1415,7 @@ will send to core the "event"
"location__lon": 53.120405283,
"location__x": 642009.4673614734,
"location__y": 5883931.8311913265,
"location": {"type":"Point","coordinates":[53.120405283,53.0859375]}
"location": { "type": "Point", "coordinates": [53.120405283, 53.0859375] }
}
```

Expand Down
4 changes: 1 addition & 3 deletions docs/roadmap.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ The following list contains all features that were in the roadmap and have alrea
- Software quality improvement based on ISO25010 recommendations
(<https://github.com/telefonicaid/perseo-fe/issues/428>)
([1.12.0](https://github.com/telefonicaid/perseo-fe/releases/tag/1.12.0))
- Nosignal actions supported in HA scenarios
(<https://github.com/telefonicaid/perseo-fe/issues/624>)
- Nosignal actions supported in HA scenarios (<https://github.com/telefonicaid/perseo-fe/issues/624>)
([1.12.0](https://github.com/telefonicaid/perseo-fe/releases/tag/1.12.0))
- Full support for pagination in APIs /rules and /vrules (<https://github.com/telefonicaid/perseo-fe/issues/364>)
([1.10.0](https://github.com/telefonicaid/perseo-fe/releases/tag/1.10.0))
Expand All @@ -81,4 +80,3 @@ The following list contains all features that were in the roadmap and have alrea
- Add astronomic clock support to timed rules (included sunrise and sunset functions for EPL statements)
(<https://github.com/telefonicaid/perseo-core/issues/130>)
([Core 1.4.0](https://github.com/telefonicaid/perseo-core/releases/tag/1.4.0))

43 changes: 23 additions & 20 deletions lib/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,17 @@ function getDbAux(url, component, callback) {
client.connect(
url,
{
bufferMaxEntries: config.checkDB.bufferMaxEntries,
domainsEnabled: true,
reconnectTries: config.checkDB.reconnectTries,
reconnectInterval: config.checkDB.reconnectInterval
// connectTimeoutMS is no longer supported in MongoDB 4.x
// (see https://stackoverflow.com/q/72699235/1485926)
// we keep connectTimeoutMS as configuration parameter, but
// the driver parameters are now socketTimeoutMS and
// serverSelectionTimeoutMS
socketTimeoutMS: config.mongo.connectTimeoutMS,
serverSelectionTimeoutMS: config.mongo.connectTimeoutMS
//bufferMaxEntries: config.checkDB.bufferMaxEntries,
//domainsEnabled: true,
//reconnectTries: config.checkDB.reconnectTries,
//reconnectInterval: config.checkDB.reconnectInterval
//useUnifiedTopology: true
},
function(err, client) {
Expand All @@ -69,10 +76,10 @@ function getDbAux(url, component, callback) {
// The driver has given up getting a connection, so we will die (restart perseo usually)
// and re-try from scratch.
// The ReplSet does not emit 'reconnectFailed'
db.serverConfig.on('reconnectFailed', function() {
logger.fatal('too many tries to reconnect to database, dying ...');
process.exit(-2);
});
//db.serverConfig.on('reconnectFailed', function() {
// logger.fatal('too many tries to reconnect to database, dying ...');
// process.exit(-2);
//});
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From https://stackoverflow.com/questions/72699235/mongodb-node-driver-connect-ignores-connecttimeoutms-and-sockettimeoutms-set/73214803#73214803

You are always considered "connected" by the driver and then under the hood it auto reconnects for you

So all the reconnection logic in our code can be just removed. The driver will do it for us.


checkDbHealthFunc = function checkDbHealth() {
pingAux(db, component, function(err, result) {
Expand All @@ -98,21 +105,17 @@ function getOrionDb(callback) {
}

function ensureIndex(collection, fields, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection, context);
collection.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { unique: true }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}
function ensureIndexTTL(collection, fields, ttl, callback) {
database.collection(collection, function(err, collection) {
myutils.logErrorIf(err, collection);
collection.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
var col = database.collection(collection);
col.createIndex(fields, { expireAfterSeconds: ttl }, function(err, indexName) {
myutils.logErrorIf(err, 'ensureIndex ' + collection, context);
callback(err, indexName);
});
}

Expand Down
58 changes: 31 additions & 27 deletions lib/models/entitiesStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
*/
'use strict';

var async = require('async'),
appContext = require('../appContext'),
var appContext = require('../appContext'),
config = require('../../config'),
entitiesCollectionName = require('../../config').orionDb.collection,
myutils = require('../myutils'),
Expand All @@ -39,6 +38,11 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
context = { op: 'checkNoSignal', comp: constants.COMPONENT_NAME },
criterion = {};

var cb = function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);
};

db = orionServiceDb(service);
criterion['attrs.' + ruleData.attribute + '.modDate'] = {
$lt: Date.now() / 1000 - ruleData.reportInterval
Expand All @@ -60,32 +64,32 @@ function findSilentEntities(service, subservice, ruleData, func, callback) {
criterion['_id.type'] = ruleData.type;
}
logger.debug(context, 'findSilentEntities criterion %j', criterion);
async.waterfall(
[
db.collection.bind(db, entitiesCollectionName, { strict: true }),
function(col, cb) {
var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.each(function(err, one) {
if (err) {
return cb(err, null);
}
if (one === null) {
//cursor exhausted
return cb(err, 'silent ones count ' + count);
}
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
});
}
],
function(err, result) {
logger.debug(context, 'findSilentEntities %s', myutils.firstChars(result));
return callback(err, result);

myutils.collectionExists(db, entitiesCollectionName, function(exists) {
if (!exists) {
return cb('collection ' + entitiesCollectionName + ' does not exist');
}
);

var col = db.collection(entitiesCollectionName);

var count = 0;
col.find(criterion)
.batchSize(config.orionDb.batchSize)
.forEach(
function(one) {
logger.debug(context, 'silent entity %j', one._id);
func(one);
count++;
},
function(err) {
if (err) {
return cb(err, null);
} else {
return cb(null, 'silent ones count ' + count);
}
}
);
});
}

module.exports = {
Expand Down
97 changes: 49 additions & 48 deletions lib/models/executionsStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
*/
'use strict';

var async = require('async'),
appContext = require('../appContext'),
var appContext = require('../appContext'),
logger = require('logops'),
execCollectionName = require('../../config').collections.executions,
myutils = require('../myutils');
Expand All @@ -37,11 +36,13 @@ module.exports = {
id = task.event.id,
index = task.action.index;

db.collection(execCollectionName, { strict: true }, function(err, col) {
if (err) {
myutils.logErrorIf(err);
return callback(err, null);
myutils.collectionExists(db, execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}

const col = db.collection(execCollectionName);

var cursor = col
.find(
{
Expand All @@ -66,18 +67,19 @@ module.exports = {
});
},
AlreadyDone: function AlreadyDone(task, callback) {
var db = appContext.Db(),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
db.collection(execCollectionName, { strict: true }, function(err, col) {
if (err) {
myutils.logErrorIf(err);
return callback(err, null);
myutils.collectionExists(appContext.Db(), execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}

var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;

col.findOne(
{
name: ruleName,
Expand All @@ -98,37 +100,36 @@ module.exports = {
});
},
Update: function Update(task, callback) {
var db = appContext.Db(),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;
async.waterfall(
[
db.collection.bind(db, execCollectionName, { strict: true }),
function(col, cb) {
col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
cb
);
}
],
function(err, result) {
myutils.logErrorIf(err);
logger.info('executionsStore.Update %j', result);
return callback(err, result);
myutils.collectionExists(appContext.Db(), execCollectionName, function(exists) {
if (!exists) {
return callback('collection ' + execCollectionName + ' does not exist');
}
);

var col = appContext.Db().collection(execCollectionName),
service = task.event.service,
subservice = task.event.subservice,
ruleName = task.event.ruleName,
id = task.event.id,
index = task.action.index,
noticeId = task.event.noticeId;

col.update(
{
name: ruleName,
subservice: subservice,
service: service,
id: id,
notice: noticeId,
index: index
},
{ $currentDate: { lastTime: true } },
{ upsert: true },
function(err, result) {
myutils.logErrorIf(err);
logger.info('executionsStore.Update %j', result);
return callback(err, result);
}
);
});
}
};
Loading