diff --git a/README.md b/README.md index cb44e1f..80f757d 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,16 @@ export default [ // form of element is {subject,predicate,object} // predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" } }, + extendedMatch: { + // list of elements in form as above + [{ + predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }, + subject: { type: "variable", "value": "var" } + },{ + predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }, + object: { type: "variable", "value": "var" } + }] + } callback: { url: "http://resource/.mu/delta", method: "POST" }, @@ -62,6 +72,8 @@ The exported property contains an array of definitions, each linking a match to - `match.subject`: Matches the subject. Both `type` and `value` may be specified. - `match.predicate`: Matches the predicade. Both `type` and `value` may be specified. - `match.object`: Matches the object. Both `type` and `value` may be specified. + - `extendedMatch`: A list of patterns to match against. All supplied patterns must match in the delta. + A special type `"variable"` can be used in patterns, variables with the same `value` will match to the same value. - `callback`: The place to inform about a matched delta - `callback.url`: URL to inform about a match - `callback.method`: Method to use when informing about a match @@ -70,6 +82,14 @@ The exported property contains an array of definitions, each linking a match to - `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future. - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). +### Delta messages cache + +The incoming delta messages are cached, since a match can cross multiple incoming delta messages. +The timeout of this cache can be configured in ms using the `CACHE_TIMEOUT` environment variable. + +When matches can't be found in the cache they will be fetched from the database unless `FETCH_MISSING_MATCHES` is set to `false` + + ## Delta formats The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat. diff --git a/app.js b/app.js index 97c710a..5f6049b 100644 --- a/app.js +++ b/app.js @@ -1,8 +1,10 @@ -import { app, uuid } from 'mu'; +import { app, uuid, query } from 'mu'; import request from 'request'; import services from '/config/rules.js'; import bodyParser from 'body-parser'; import dns from 'dns'; +import ExpireArray from 'expire-array'; +import ObjectSet from 'object-set-js' // Also parse application/json as json app.use( bodyParser.json( { @@ -45,46 +47,91 @@ app.post( '/', function( req, res ) { res.status(204).send(); } ); +let cacheTimeout = parseInt(process.env.CACHE_TIMEOUT || 2500); +const changeSetsCache = new ExpireArray(cacheTimeout); + async function informWatchers( changeSets, res, muCallIdTrail ){ + changeSets.forEach(s=>{changeSetsCache.push(s)}) + let usedChangeSets = changeSetsCache.all() + services.map( async (entry) => { // for each entity if( process.env["DEBUG_DELTA_MATCH"] ) console.log(`Checking if we want to send to ${entry.callback.url}`); - const matchSpec = entry.match; - - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + const originFilteredChangeSets = await filterMatchesForOrigin( + usedChangeSets, + entry + ); + if ( + process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && + entry.options.ignoreFromSelf + ) + console.log( + `There are ${ + originFilteredChangeSets.length + } changes sets not from ${hostnameForEntry(entry)}` + ); let allInserts = []; let allDeletes = []; - originFilteredChangeSets.forEach( (change) => { + originFilteredChangeSets.forEach((change) => { allInserts = [...allInserts, ...change.insert]; allDeletes = [...allDeletes, ...change.delete]; - } ); + }); + const changedTriples = [...allInserts, ...allDeletes]; + let matchedSets = []; + + if (entry.extendedMatch) { + let variableNames = new Set(); + let variableOccurences = {} + for (let matchSpec of entry.extendedMatch) + for (let key in matchSpec) + if (matchSpec[key].type === "variable") { + let name = matchSpec[key].value + variableNames.add(name) + variableOccurences[name] = (variableOccurences[name] || 0) + 1 + } + for (let key in variableOccurences) + if (variableOccurences[key] === 1) + console.log(`Variable "${key}" is probably misconfigured since it only occurs once`) + if (variableNames.size === 0) { + console.log(`There are no variables configured`) + } + + // check inserts + let fetchMissing = process.env["FETCH_MISSING_MATCHES"] !== "false"; + if (await triplesMatchExtendedSpec(allInserts, entry.extendedMatch, variableNames, fetchMissing)) { + matchedSets = originFilteredChangeSets; + } - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); + // Check deletes + if (await triplesMatchExtendedSpec(allDeletes, entry.extendedMatch, variableNames, false)) { + matchedSets = originFilteredChangeSets; + } + } else { + if (changedTriples.some((triple) => tripleMatchesSpec(triple, entry.match))) + matchedSets = originFilteredChangeSets; + } - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) + console.log(`How many triples match spec? ${matchedSets.length}`); - if( someTripleMatchedSpec ) { + if (matchedSets.length) { // inform matching entities if( process.env["DEBUG_DELTA_SEND"] ) console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); if( entry.options && entry.options.gracePeriod ) { setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); + () => sendRequest(entry, matchedSets, muCallIdTrail), + entry.options.gracePeriod + ); } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + sendRequest(entry, matchedSets, muCallIdTrail); } } } ); @@ -103,6 +150,9 @@ function tripleMatchesSpec( triple, matchSpec ) { if( subMatchSpec && !subMatchValue ) return false; + if (subMatchSpec.type === "variable") + continue; + for( let subKey in subMatchSpec ) // we're now matching something like {type: "url", value: "http..."} if( subMatchSpec[subKey] !== subMatchValue[subKey] ) @@ -111,6 +161,113 @@ function tripleMatchesSpec( triple, matchSpec ) { return true; // no false matches found, let's send a response } +async function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames, fetchMissing) { + let changedTriplesPerMatch = []; + for (let spec of matchSpec) { + let localMatches = allDelta.filter((triple) => + tripleMatchesSpec(triple, spec) + ); + changedTriplesPerMatch.push({spec: spec, matches: localMatches}); + } + let variables = {} + for (let variable of variableNames) { + variables[variable] = new ObjectSet(); + } + changedTriplesPerMatch.forEach((o)=>{ + for (let e in o.spec) { + let s = o.spec[e] + if (s.type === "variable") for (let triple of o.matches) { + variables[s.value].add({value: triple[e], graph: triple.graph}) + } + } + }) + let combinations = allCombinations(variables) + let validCombinations = []; + for (let combination of combinations){ + let combinationIsValid = true; + for (let idx in matchSpec){ + let localSpec = Object.assign({}, matchSpec[idx]) + for (let key in localSpec) { + let type = localSpec[key].type; + let varName = localSpec[key].value; + if (type === "variable"){ + localSpec[key] = combination[varName].value + localSpec.graph = combination[varName].graph + } + } + if (![...changedTriplesPerMatch[idx].matches].some(t => tripleMatchesSpec(t, localSpec))) { + let matched = await fetchMissingMatch(localSpec, fetchMissing) + if (matched === null) + combinationIsValid = false; + } + } + if (combinationIsValid) { + validCombinations.push(combination) + } + } + console.log(`The number of valid combinations is ${validCombinations.length}`) + return validCombinations.length > 0 +} + +async function fetchMissingMatch(match, fetchMissing) { + if (!fetchMissing) + return null + else { + let qMatch = {} + let selectParts = [] + for (let key of ["subject", "predicate", "object", "graph"]) { + let matchObject = match[key] || {type: ""} + if (matchObject.type === "uri") { + qMatch[key] = `<${matchObject.value}>` + } else { + qMatch[key] = `?${key}` + selectParts.push(qMatch[key]) + } + } + let q=` + SELECT ${selectParts.join(" ")} + WHERE { + GRAPH ${qMatch.graph} { + ${qMatch.subject} ${qMatch.predicate} ${qMatch.object} . + } + } + LIMIT 1 + ` + return await query(q) + .then((result) => + { + let results = result["results"] + if ("bindings" in results) { + let values = results.bindings[0] + for (let key in values) { + console.log(`Key is ${key}`) + match[key] = values[key] + } + } + return match + } + ).catch((err) => + { + console.error(`Error ${err} happened`) + return null + } + ) + } +} + +function allCombinations(obj) { + let combos = [{}]; + Object.entries(obj).forEach(([key, values]) => { + let all = []; + values.forEach((value) => { + combos.forEach((combo) => { + all.push(Object.assign({[key]: value}, combo)); + }); + }); + combos = all; + }); + return combos; +} function formatChangesetBody( changeSets, options ) { if( options.resourceFormat == "v0.0.1" ) { diff --git a/package.json b/package.json index 89a224e..a65d901 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,8 @@ }, "homepage": "https://github.com/mu-semtech/delta-notifier#readme", "dependencies": { + "expire-array": "^1.1.0", + "object-set-js": "^1.0.3", "request": "^2.88.0" } }