From 3326a252d3891e4611ec12e785f5bc0cb26b2b74 Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Mon, 29 Aug 2022 17:05:53 +0200 Subject: [PATCH 1/7] Add subjectMatch option to delta notifier --- app.js | 71 ++++++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 17 deletions(-) diff --git a/app.js b/app.js index 97c710a..b40fc37 100644 --- a/app.js +++ b/app.js @@ -51,40 +51,77 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ if( process.env["DEBUG_DELTA_MATCH"] ) console.log(`Checking if we want to send to ${entry.callback.url}`); - const matchSpec = entry.match; - - const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry ); - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf ) - console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`); + const originFilteredChangeSets = await filterMatchesForOrigin( + changeSets, + entry + ); + if ( + process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && + entry.options.ignoreFromSelf + ) + console.log( + `There are ${ + originFilteredChangeSets.length + } changes sets not from ${hostnameForEntry(entry)}` + ); let allInserts = []; let allDeletes = []; - originFilteredChangeSets.forEach( (change) => { + originFilteredChangeSets.forEach((change) => { allInserts = [...allInserts, ...change.insert]; allDeletes = [...allDeletes, ...change.delete]; - } ); + }); const changedTriples = [...allInserts, ...allDeletes]; + let matchedSets = []; + + if (entry.subjectMatch) { + let changedTriplesPerMatch = []; + for (let spec of entry.subjectMatch) { + let localMatches = allInserts.filter((triple) => + tripleMatchesSpec(triple, spec) + ); + changedTriplesPerMatch.push(localMatches); + } + let subjectSets = changedTriplesPerMatch.map( + (changes) => new Set(changes.map((change) => change.subject.value)) + ); + let subjects = subjectSets[0]; + for (let set of subjectSets) { + subjects = new Set([...subjects].filter((e) => set.has(e))); + } + let changes = []; + for (let changedTripleSet of changedTriplesPerMatch) { + changedTripleSet.forEach((change) => { + if (subjects.has(change.subject.value)) changes.push(change); + }); + } + if (changes) { + let changeSet = originFilteredChangeSets[0] || {}; + changeSet.inserts = changes; + matchedSets = [changeSet]; + } + } else { + if (changedTriples.some((triple) => tripleMatchesSpec(triple, entry.match))) + matchedSets = originFilteredChangeSets; + } - const someTripleMatchedSpec = - changedTriples - .some( (triple) => tripleMatchesSpec( triple, matchSpec ) ); - - if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] ) - console.log(`Triple matches spec? ${someTripleMatchedSpec}`); + if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"]) + console.log(`How many triples match spec? ${matchedSets.length}`); - if( someTripleMatchedSpec ) { + if (matchedSets.length) { // inform matching entities if( process.env["DEBUG_DELTA_SEND"] ) console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`); if( entry.options && entry.options.gracePeriod ) { setTimeout( - () => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ), - entry.options.gracePeriod ); + () => sendRequest(entry, matchedSets, muCallIdTrail), + entry.options.gracePeriod + ); } else { - sendRequest( entry, originFilteredChangeSets, muCallIdTrail ); + sendRequest(entry, matchedSets, muCallIdTrail); } } } ); From 930681865901e10cd032514a2c6dedf41994fb86 Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Tue, 30 Aug 2022 14:21:32 +0200 Subject: [PATCH 2/7] Add caching of incoming delta messages --- README.md | 5 +++++ app.js | 23 ++++++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index cb44e1f..2e21b29 100644 --- a/README.md +++ b/README.md @@ -70,6 +70,11 @@ The exported property contains an array of definitions, each linking a match to - `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future. - `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname). +### Delta messages cache + +The incoming delta messages are cached, since a match can cross multiple incoming delta messages. +The timeout of this cache can be configured in ms using the `CACHE_TIMEOUT` environment variable. + ## Delta formats The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat. diff --git a/app.js b/app.js index b40fc37..b2ae003 100644 --- a/app.js +++ b/app.js @@ -45,14 +45,25 @@ app.post( '/', function( req, res ) { res.status(204).send(); } ); +let changeSetsCache = [] +let cacheTimeout = parseInt(process.env.CACHE_TIMEOUT || 2500); + async function informWatchers( changeSets, res, muCallIdTrail ){ + // HACK: the cache is a list of lists that each contain elements and will be emptied after the cacheTimeout is reached + let changeSetsCopy = changeSets; + changeSetsCache.push(changeSetsCopy); + setTimeout(()=>{changeSetsCopy.length=0}, cacheTimeout) + + let usedChangeSets = [].concat(...changeSetsCache) + console.log(`Size of changesetscache is ${changeSetsCache.length} length of used changeset ${usedChangeSets.length}`) + services.map( async (entry) => { // for each entity if( process.env["DEBUG_DELTA_MATCH"] ) console.log(`Checking if we want to send to ${entry.callback.url}`); const originFilteredChangeSets = await filterMatchesForOrigin( - changeSets, + usedChangeSets, entry ); if ( @@ -73,9 +84,19 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ allDeletes = [...allDeletes, ...change.delete]; }); + const changedTriples = [...allInserts, ...allDeletes]; let matchedSets = []; + // TODO: add current changeset to cache with timeout + // HACK: the cache is a list of lists that each contain elements and will be emptied after the cacheTimeout is reached + //allInsertsCache.push(allInserts); + //setTimeout(()=>{allInserts.length=0}, cacheTimeout) + //allDeletesCache.push(allDeletes); + //setTimeout(()=>{allDeletes.length=0}, cacheTimeout) + //let cachedInserts = [].concat(...allInsertsCache); + //let cachedDeletes = [].concat(...allDeletesCache); + if (entry.subjectMatch) { let changedTriplesPerMatch = []; for (let spec of entry.subjectMatch) { From 663250a0fc82ae34cfde8fe9cf8ee2058445135b Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Tue, 30 Aug 2022 15:28:58 +0200 Subject: [PATCH 3/7] Refactor to use library for expire caching instead of hack --- app.js | 21 ++++----------------- package.json | 1 + 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/app.js b/app.js index b2ae003..55cb936 100644 --- a/app.js +++ b/app.js @@ -3,6 +3,7 @@ import request from 'request'; import services from '/config/rules.js'; import bodyParser from 'body-parser'; import dns from 'dns'; +import ExpireArray from 'expire-array'; // Also parse application/json as json app.use( bodyParser.json( { @@ -45,17 +46,12 @@ app.post( '/', function( req, res ) { res.status(204).send(); } ); -let changeSetsCache = [] let cacheTimeout = parseInt(process.env.CACHE_TIMEOUT || 2500); +const changeSetsCache = new ExpireArray(cacheTimeout); async function informWatchers( changeSets, res, muCallIdTrail ){ - // HACK: the cache is a list of lists that each contain elements and will be emptied after the cacheTimeout is reached - let changeSetsCopy = changeSets; - changeSetsCache.push(changeSetsCopy); - setTimeout(()=>{changeSetsCopy.length=0}, cacheTimeout) - - let usedChangeSets = [].concat(...changeSetsCache) - console.log(`Size of changesetscache is ${changeSetsCache.length} length of used changeset ${usedChangeSets.length}`) + changeSets.forEach(s=>{changeSetsCache.push(s)}) + let usedChangeSets = changeSetsCache.all() services.map( async (entry) => { // for each entity @@ -88,15 +84,6 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ const changedTriples = [...allInserts, ...allDeletes]; let matchedSets = []; - // TODO: add current changeset to cache with timeout - // HACK: the cache is a list of lists that each contain elements and will be emptied after the cacheTimeout is reached - //allInsertsCache.push(allInserts); - //setTimeout(()=>{allInserts.length=0}, cacheTimeout) - //allDeletesCache.push(allDeletes); - //setTimeout(()=>{allDeletes.length=0}, cacheTimeout) - //let cachedInserts = [].concat(...allInsertsCache); - //let cachedDeletes = [].concat(...allDeletesCache); - if (entry.subjectMatch) { let changedTriplesPerMatch = []; for (let spec of entry.subjectMatch) { diff --git a/package.json b/package.json index 89a224e..3ea6d7a 100644 --- a/package.json +++ b/package.json @@ -22,6 +22,7 @@ }, "homepage": "https://github.com/mu-semtech/delta-notifier#readme", "dependencies": { + "expire-array": "^1.1.0", "request": "^2.88.0" } } From 569a6f4a19209e4e3876154632ee420d9affa21c Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Tue, 30 Aug 2022 16:02:49 +0200 Subject: [PATCH 4/7] Add processing of subjectMatch for deletes as well --- app.js | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/app.js b/app.js index 55cb936..8c9c8b6 100644 --- a/app.js +++ b/app.js @@ -85,6 +85,7 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ let matchedSets = []; if (entry.subjectMatch) { + // check inserts let changedTriplesPerMatch = []; for (let spec of entry.subjectMatch) { let localMatches = allInserts.filter((triple) => @@ -108,7 +109,36 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ if (changes) { let changeSet = originFilteredChangeSets[0] || {}; changeSet.inserts = changes; - matchedSets = [changeSet]; + changeSet.deletes = []; + matchedSets.push(changeSet); + } + + // Check deletes + changedTriplesPerMatch = []; + for (let spec of entry.subjectMatch) { + let localMatches = allDeletes.filter((triple) => + tripleMatchesSpec(triple, spec) + ); + changedTriplesPerMatch.push(localMatches); + } + subjectSets = changedTriplesPerMatch.map( + (changes) => new Set(changes.map((change) => change.subject.value)) + ); + subjects = subjectSets[0]; + for (let set of subjectSets) { + subjects = new Set([...subjects].filter((e) => set.has(e))); + } + changes = []; + for (let changedTripleSet of changedTriplesPerMatch) { + changedTripleSet.forEach((change) => { + if (subjects.has(change.subject.value)) changes.push(change); + }); + } + if (changes) { + let changeSet = originFilteredChangeSets[0] || {}; + changeSet.inserts = []; + changeSet.deletes = changes; + matchedSets.push(changeSet); } } else { if (changedTriples.some((triple) => tripleMatchesSpec(triple, entry.match))) From 9167975c68cb4066101f8bbf75921ed753bdd677 Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Thu, 1 Sep 2022 11:56:53 +0200 Subject: [PATCH 5/7] Add variable support to extendedMatch --- app.js | 130 +++++++++++++++++++++++++++++++-------------------- package.json | 1 + 2 files changed, 80 insertions(+), 51 deletions(-) diff --git a/app.js b/app.js index 8c9c8b6..b6ada27 100644 --- a/app.js +++ b/app.js @@ -4,6 +4,7 @@ import services from '/config/rules.js'; import bodyParser from 'body-parser'; import dns from 'dns'; import ExpireArray from 'expire-array'; +import ObjectSet from 'object-set-js' // Also parse application/json as json app.use( bodyParser.json( { @@ -84,61 +85,27 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ const changedTriples = [...allInserts, ...allDeletes]; let matchedSets = []; - if (entry.subjectMatch) { + if (entry.extendedMatch) { + let variableNames = new Set(); + let variableOccurences = {} + for (let matchSpec of entry.extendedMatch) + for (let key in matchSpec) + if (matchSpec[key].type === "variable") { + let name = matchSpec[key].value + variableNames.add(name) + variableOccurences[name] = (variableOccurences[name] || 0) + 1 + } + for (let key in variableOccurences) + if (variableOccurences[key] < 2) + console.log(`Variable "${key}" is probably misconfigured since it only occurs once`) // check inserts - let changedTriplesPerMatch = []; - for (let spec of entry.subjectMatch) { - let localMatches = allInserts.filter((triple) => - tripleMatchesSpec(triple, spec) - ); - changedTriplesPerMatch.push(localMatches); - } - let subjectSets = changedTriplesPerMatch.map( - (changes) => new Set(changes.map((change) => change.subject.value)) - ); - let subjects = subjectSets[0]; - for (let set of subjectSets) { - subjects = new Set([...subjects].filter((e) => set.has(e))); - } - let changes = []; - for (let changedTripleSet of changedTriplesPerMatch) { - changedTripleSet.forEach((change) => { - if (subjects.has(change.subject.value)) changes.push(change); - }); - } - if (changes) { - let changeSet = originFilteredChangeSets[0] || {}; - changeSet.inserts = changes; - changeSet.deletes = []; - matchedSets.push(changeSet); + if (triplesMatchExtendedSpec(allInserts, entry.extendedMatch, variableNames)) { + matchedSets = originFilteredChangeSets; } // Check deletes - changedTriplesPerMatch = []; - for (let spec of entry.subjectMatch) { - let localMatches = allDeletes.filter((triple) => - tripleMatchesSpec(triple, spec) - ); - changedTriplesPerMatch.push(localMatches); - } - subjectSets = changedTriplesPerMatch.map( - (changes) => new Set(changes.map((change) => change.subject.value)) - ); - subjects = subjectSets[0]; - for (let set of subjectSets) { - subjects = new Set([...subjects].filter((e) => set.has(e))); - } - changes = []; - for (let changedTripleSet of changedTriplesPerMatch) { - changedTripleSet.forEach((change) => { - if (subjects.has(change.subject.value)) changes.push(change); - }); - } - if (changes) { - let changeSet = originFilteredChangeSets[0] || {}; - changeSet.inserts = []; - changeSet.deletes = changes; - matchedSets.push(changeSet); + if (triplesMatchExtendedSpec(allDeletes, entry.extendedMatch, variableNames)) { + matchedSets = originFilteredChangeSets; } } else { if (changedTriples.some((triple) => tripleMatchesSpec(triple, entry.match))) @@ -178,6 +145,9 @@ function tripleMatchesSpec( triple, matchSpec ) { if( subMatchSpec && !subMatchValue ) return false; + if (subMatchSpec.type === "variable") + continue; + for( let subKey in subMatchSpec ) // we're now matching something like {type: "url", value: "http..."} if( subMatchSpec[subKey] !== subMatchValue[subKey] ) @@ -186,6 +156,64 @@ function tripleMatchesSpec( triple, matchSpec ) { return true; // no false matches found, let's send a response } +function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames) { + let changedTriplesPerMatch = []; + for (let spec of matchSpec) { + let localMatches = allDelta.filter((triple) => + tripleMatchesSpec(triple, spec) + ); + changedTriplesPerMatch.push({spec: spec, matches: localMatches}); + } + let variables = {} + for (let variable of variableNames) { + variables[variable] = new ObjectSet(); + } + changedTriplesPerMatch.forEach((o)=>{ + for (let e in o.spec) { + let s = o.spec[e] + if (s.type === "variable") for (let triple of o.matches) { + variables[s.value].add({value: triple[e], graph: triple.graph}) + } + } + }) + let combinations = allCombinations(variables) + let validCombinations = []; + for (let combination of combinations){ + let combinationIsValid = true; + for (let idx in matchSpec){ + let localSpec = Object.assign({}, matchSpec[idx]) + for (let key in localSpec) { + let type = localSpec[key].type; + let varName = localSpec[key].value; + if (type === "variable"){ + let val = combination[varName].value + localSpec[varName] = combination[varName].value + localSpec.graph = combination[varName].graph + } + } + if (![...changedTriplesPerMatch[idx].matches].some(t => tripleMatchesSpec(t, localSpec))) { + // TODO: add possible fetch + combinationIsValid = false; + } + } + validCombinations.push(combination) + } + return validCombinations.length > 0 +} + +function allCombinations(obj) { + let combos = [{}]; + Object.entries(obj).forEach(([key, values]) => { + let all = []; + values.forEach((value) => { + combos.forEach((combo) => { + all.push(Object.assign({[key]: value}, combo)); + }); + }); + combos = all; + }); + return combos; +} function formatChangesetBody( changeSets, options ) { if( options.resourceFormat == "v0.0.1" ) { diff --git a/package.json b/package.json index 3ea6d7a..a65d901 100644 --- a/package.json +++ b/package.json @@ -23,6 +23,7 @@ "homepage": "https://github.com/mu-semtech/delta-notifier#readme", "dependencies": { "expire-array": "^1.1.0", + "object-set-js": "^1.0.3", "request": "^2.88.0" } } From 58ed07af98bbb47627a3f5589f930cec38d8edce Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Thu, 1 Sep 2022 16:13:37 +0200 Subject: [PATCH 6/7] Add optional fetching of missing matches --- app.js | 74 ++++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/app.js b/app.js index b6ada27..5f6049b 100644 --- a/app.js +++ b/app.js @@ -1,4 +1,4 @@ -import { app, uuid } from 'mu'; +import { app, uuid, query } from 'mu'; import request from 'request'; import services from '/config/rules.js'; import bodyParser from 'body-parser'; @@ -96,15 +96,20 @@ async function informWatchers( changeSets, res, muCallIdTrail ){ variableOccurences[name] = (variableOccurences[name] || 0) + 1 } for (let key in variableOccurences) - if (variableOccurences[key] < 2) + if (variableOccurences[key] === 1) console.log(`Variable "${key}" is probably misconfigured since it only occurs once`) + if (variableNames.size === 0) { + console.log(`There are no variables configured`) + } + // check inserts - if (triplesMatchExtendedSpec(allInserts, entry.extendedMatch, variableNames)) { + let fetchMissing = process.env["FETCH_MISSING_MATCHES"] !== "false"; + if (await triplesMatchExtendedSpec(allInserts, entry.extendedMatch, variableNames, fetchMissing)) { matchedSets = originFilteredChangeSets; } // Check deletes - if (triplesMatchExtendedSpec(allDeletes, entry.extendedMatch, variableNames)) { + if (await triplesMatchExtendedSpec(allDeletes, entry.extendedMatch, variableNames, false)) { matchedSets = originFilteredChangeSets; } } else { @@ -156,7 +161,7 @@ function tripleMatchesSpec( triple, matchSpec ) { return true; // no false matches found, let's send a response } -function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames) { +async function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames, fetchMissing) { let changedTriplesPerMatch = []; for (let spec of matchSpec) { let localMatches = allDelta.filter((triple) => @@ -186,21 +191,70 @@ function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames) { let type = localSpec[key].type; let varName = localSpec[key].value; if (type === "variable"){ - let val = combination[varName].value - localSpec[varName] = combination[varName].value + localSpec[key] = combination[varName].value localSpec.graph = combination[varName].graph } } if (![...changedTriplesPerMatch[idx].matches].some(t => tripleMatchesSpec(t, localSpec))) { - // TODO: add possible fetch - combinationIsValid = false; + let matched = await fetchMissingMatch(localSpec, fetchMissing) + if (matched === null) + combinationIsValid = false; } } - validCombinations.push(combination) + if (combinationIsValid) { + validCombinations.push(combination) + } } + console.log(`The number of valid combinations is ${validCombinations.length}`) return validCombinations.length > 0 } +async function fetchMissingMatch(match, fetchMissing) { + if (!fetchMissing) + return null + else { + let qMatch = {} + let selectParts = [] + for (let key of ["subject", "predicate", "object", "graph"]) { + let matchObject = match[key] || {type: ""} + if (matchObject.type === "uri") { + qMatch[key] = `<${matchObject.value}>` + } else { + qMatch[key] = `?${key}` + selectParts.push(qMatch[key]) + } + } + let q=` + SELECT ${selectParts.join(" ")} + WHERE { + GRAPH ${qMatch.graph} { + ${qMatch.subject} ${qMatch.predicate} ${qMatch.object} . + } + } + LIMIT 1 + ` + return await query(q) + .then((result) => + { + let results = result["results"] + if ("bindings" in results) { + let values = results.bindings[0] + for (let key in values) { + console.log(`Key is ${key}`) + match[key] = values[key] + } + } + return match + } + ).catch((err) => + { + console.error(`Error ${err} happened`) + return null + } + ) + } +} + function allCombinations(obj) { let combos = [{}]; Object.entries(obj).forEach(([key, values]) => { From df8ab3f521c39c26d5b7926e652551537b05f029 Mon Sep 17 00:00:00 2001 From: Jan-Pieter Baert Date: Thu, 1 Sep 2022 16:44:09 +0200 Subject: [PATCH 7/7] Add documentation for extendedMatch --- README.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/README.md b/README.md index 2e21b29..80f757d 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,16 @@ export default [ // form of element is {subject,predicate,object} // predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" } }, + extendedMatch: { + // list of elements in form as above + [{ + predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }, + subject: { type: "variable", "value": "var" } + },{ + predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }, + object: { type: "variable", "value": "var" } + }] + } callback: { url: "http://resource/.mu/delta", method: "POST" }, @@ -62,6 +72,8 @@ The exported property contains an array of definitions, each linking a match to - `match.subject`: Matches the subject. Both `type` and `value` may be specified. - `match.predicate`: Matches the predicade. Both `type` and `value` may be specified. - `match.object`: Matches the object. Both `type` and `value` may be specified. + - `extendedMatch`: A list of patterns to match against. All supplied patterns must match in the delta. + A special type `"variable"` can be used in patterns, variables with the same `value` will match to the same value. - `callback`: The place to inform about a matched delta - `callback.url`: URL to inform about a match - `callback.method`: Method to use when informing about a match @@ -75,6 +87,9 @@ The exported property contains an array of definitions, each linking a match to The incoming delta messages are cached, since a match can cross multiple incoming delta messages. The timeout of this cache can be configured in ms using the `CACHE_TIMEOUT` environment variable. +When matches can't be found in the cache they will be fetched from the database unless `FETCH_MISSING_MATCHES` is set to `false` + + ## Delta formats The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat.