Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for match patterns #14

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ export default [
// form of element is {subject,predicate,object}
// predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" }
},
extendedMatch: {
// list of elements in form as above
[{
predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" },
subject: { type: "variable", "value": "var" }
},{
predicate: { type: "uri", value: "http://www.semanticdesktop.org/ontologies/2007/03/22/nmo#isPartOf" },
object: { type: "variable", "value": "var" }
}]
}
callback: {
url: "http://resource/.mu/delta", method: "POST"
},
Expand All @@ -62,6 +72,8 @@ The exported property contains an array of definitions, each linking a match to
- `match.subject`: Matches the subject. Both `type` and `value` may be specified.
- `match.predicate`: Matches the predicade. Both `type` and `value` may be specified.
- `match.object`: Matches the object. Both `type` and `value` may be specified.
- `extendedMatch`: A list of patterns to match against. All supplied patterns must match in the delta.
A special type `"variable"` can be used in patterns, variables with the same `value` will match to the same value.
- `callback`: The place to inform about a matched delta
- `callback.url`: URL to inform about a match
- `callback.method`: Method to use when informing about a match
Expand All @@ -70,6 +82,14 @@ The exported property contains an array of definitions, each linking a match to
- `options.gracePeriod`: Only send the response after a certain amount of time. This will group changes in the future.
- `options.ignoreFromSelf`: Don't inform about changes that originated from the microservice to be informed (based on the hostname).

### Delta messages cache

The incoming delta messages are cached, since a match can cross multiple incoming delta messages.
The timeout of this cache can be configured in ms using the `CACHE_TIMEOUT` environment variable.

When matches can't be found in the cache they will be fetched from the database unless `FETCH_MISSING_MATCHES` is set to `false`


## Delta formats

The delta may be offered in multiple formats. Versions should match the exact string. Specify `options.resourceFormat` to indicate the specific resourceformat.
Expand Down
191 changes: 174 additions & 17 deletions app.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { app, uuid } from 'mu';
import { app, uuid, query } from 'mu';
import request from 'request';
import services from '/config/rules.js';
import bodyParser from 'body-parser';
import dns from 'dns';
import ExpireArray from 'expire-array';
import ObjectSet from 'object-set-js'

// Also parse application/json as json
app.use( bodyParser.json( {
Expand Down Expand Up @@ -45,46 +47,91 @@ app.post( '/', function( req, res ) {
res.status(204).send();
} );

let cacheTimeout = parseInt(process.env.CACHE_TIMEOUT || 2500);
const changeSetsCache = new ExpireArray(cacheTimeout);

async function informWatchers( changeSets, res, muCallIdTrail ){
changeSets.forEach(s=>{changeSetsCache.push(s)})
let usedChangeSets = changeSetsCache.all()

services.map( async (entry) => {
// for each entity
if( process.env["DEBUG_DELTA_MATCH"] )
console.log(`Checking if we want to send to ${entry.callback.url}`);

const matchSpec = entry.match;

const originFilteredChangeSets = await filterMatchesForOrigin( changeSets, entry );
if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] && entry.options.ignoreFromSelf )
console.log(`There are ${originFilteredChangeSets.length} changes sets not from ${hostnameForEntry( entry )}`);
const originFilteredChangeSets = await filterMatchesForOrigin(
usedChangeSets,
entry
);
if (
process.env["DEBUG_TRIPLE_MATCHES_SPEC"] &&
entry.options.ignoreFromSelf
)
console.log(
`There are ${
originFilteredChangeSets.length
} changes sets not from ${hostnameForEntry(entry)}`
);

let allInserts = [];
let allDeletes = [];

originFilteredChangeSets.forEach( (change) => {
originFilteredChangeSets.forEach((change) => {
allInserts = [...allInserts, ...change.insert];
allDeletes = [...allDeletes, ...change.delete];
} );
});


const changedTriples = [...allInserts, ...allDeletes];
let matchedSets = [];

if (entry.extendedMatch) {
let variableNames = new Set();
let variableOccurences = {}
for (let matchSpec of entry.extendedMatch)
for (let key in matchSpec)
if (matchSpec[key].type === "variable") {
let name = matchSpec[key].value
variableNames.add(name)
variableOccurences[name] = (variableOccurences[name] || 0) + 1
}
for (let key in variableOccurences)
if (variableOccurences[key] === 1)
console.log(`Variable "${key}" is probably misconfigured since it only occurs once`)
if (variableNames.size === 0) {
console.log(`There are no variables configured`)
}

// check inserts
let fetchMissing = process.env["FETCH_MISSING_MATCHES"] !== "false";
if (await triplesMatchExtendedSpec(allInserts, entry.extendedMatch, variableNames, fetchMissing)) {
matchedSets = originFilteredChangeSets;
}

const someTripleMatchedSpec =
changedTriples
.some( (triple) => tripleMatchesSpec( triple, matchSpec ) );
// Check deletes
if (await triplesMatchExtendedSpec(allDeletes, entry.extendedMatch, variableNames, false)) {
matchedSets = originFilteredChangeSets;
}
} else {
if (changedTriples.some((triple) => tripleMatchesSpec(triple, entry.match)))
matchedSets = originFilteredChangeSets;
}

if( process.env["DEBUG_TRIPLE_MATCHES_SPEC"] )
console.log(`Triple matches spec? ${someTripleMatchedSpec}`);
if (process.env["DEBUG_TRIPLE_MATCHES_SPEC"])
console.log(`How many triples match spec? ${matchedSets.length}`);

if( someTripleMatchedSpec ) {
if (matchedSets.length) {
// inform matching entities
if( process.env["DEBUG_DELTA_SEND"] )
console.log(`Going to send ${entry.callback.method} to ${entry.callback.url}`);

if( entry.options && entry.options.gracePeriod ) {
setTimeout(
() => sendRequest( entry, originFilteredChangeSets, muCallIdTrail ),
entry.options.gracePeriod );
() => sendRequest(entry, matchedSets, muCallIdTrail),
entry.options.gracePeriod
);
} else {
sendRequest( entry, originFilteredChangeSets, muCallIdTrail );
sendRequest(entry, matchedSets, muCallIdTrail);
}
}
} );
Expand All @@ -103,6 +150,9 @@ function tripleMatchesSpec( triple, matchSpec ) {
if( subMatchSpec && !subMatchValue )
return false;

if (subMatchSpec.type === "variable")
continue;

for( let subKey in subMatchSpec )
// we're now matching something like {type: "url", value: "http..."}
if( subMatchSpec[subKey] !== subMatchValue[subKey] )
Expand All @@ -111,6 +161,113 @@ function tripleMatchesSpec( triple, matchSpec ) {
return true; // no false matches found, let's send a response
}

async function triplesMatchExtendedSpec(allDelta, matchSpec, variableNames, fetchMissing) {
let changedTriplesPerMatch = [];
for (let spec of matchSpec) {
let localMatches = allDelta.filter((triple) =>
tripleMatchesSpec(triple, spec)
);
changedTriplesPerMatch.push({spec: spec, matches: localMatches});
}
let variables = {}
for (let variable of variableNames) {
variables[variable] = new ObjectSet();
}
changedTriplesPerMatch.forEach((o)=>{
for (let e in o.spec) {
let s = o.spec[e]
if (s.type === "variable") for (let triple of o.matches) {
variables[s.value].add({value: triple[e], graph: triple.graph})
}
}
})
let combinations = allCombinations(variables)
let validCombinations = [];
for (let combination of combinations){
let combinationIsValid = true;
for (let idx in matchSpec){
let localSpec = Object.assign({}, matchSpec[idx])
for (let key in localSpec) {
let type = localSpec[key].type;
let varName = localSpec[key].value;
if (type === "variable"){
localSpec[key] = combination[varName].value
localSpec.graph = combination[varName].graph
}
}
if (![...changedTriplesPerMatch[idx].matches].some(t => tripleMatchesSpec(t, localSpec))) {
let matched = await fetchMissingMatch(localSpec, fetchMissing)
if (matched === null)
combinationIsValid = false;
}
}
if (combinationIsValid) {
validCombinations.push(combination)
}
}
console.log(`The number of valid combinations is ${validCombinations.length}`)
return validCombinations.length > 0
}

async function fetchMissingMatch(match, fetchMissing) {
if (!fetchMissing)
return null
else {
let qMatch = {}
let selectParts = []
for (let key of ["subject", "predicate", "object", "graph"]) {
let matchObject = match[key] || {type: ""}
if (matchObject.type === "uri") {
qMatch[key] = `<${matchObject.value}>`
} else {
qMatch[key] = `?${key}`
selectParts.push(qMatch[key])
}
}
let q=`
SELECT ${selectParts.join(" ")}
WHERE {
GRAPH ${qMatch.graph} {
${qMatch.subject} ${qMatch.predicate} ${qMatch.object} .
}
}
LIMIT 1
`
return await query(q)
.then((result) =>
{
let results = result["results"]
if ("bindings" in results) {
let values = results.bindings[0]
for (let key in values) {
console.log(`Key is ${key}`)
match[key] = values[key]
}
}
return match
}
).catch((err) =>
{
console.error(`Error ${err} happened`)
return null
}
)
}
}

function allCombinations(obj) {
let combos = [{}];
Object.entries(obj).forEach(([key, values]) => {
let all = [];
values.forEach((value) => {
combos.forEach((combo) => {
all.push(Object.assign({[key]: value}, combo));
});
});
combos = all;
});
return combos;
}

function formatChangesetBody( changeSets, options ) {
if( options.resourceFormat == "v0.0.1" ) {
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
},
"homepage": "https://github.com/mu-semtech/delta-notifier#readme",
"dependencies": {
"expire-array": "^1.1.0",
"object-set-js": "^1.0.3",
"request": "^2.88.0"
}
}