Skip to content

Commit

Permalink
Merge pull request #17 from quoll/issue-16
Browse files Browse the repository at this point in the history
Issue 16. Integrated Store with Rules engine
  • Loading branch information
craigbro authored Aug 26, 2016
2 parents f99200c + afba291 commit d88214a
Show file tree
Hide file tree
Showing 13 changed files with 558 additions and 45 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
language: clojure
42 changes: 25 additions & 17 deletions src/naga/engine.clj
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
(ns naga.engine
(:require [naga.structs :as st :refer [EPVPattern RulePatternPair
StatusMap StatusMapEntry Body Program]]
[naga.queue :as q]
[naga.store :as store]
[naga.util :as u]
[schema.core :as s])
(:import [naga.structs Rule]
[naga.store Storage]
[naga.queue PQueue]))
(ns ^{:doc "Functions to run rules until completion."
:author "Paula Gearon"}
naga.engine
(:require [naga.structs :as st :refer [EPVPattern RulePatternPair
StatusMap StatusMapEntry Body Program]]
[naga.queue :as q]
[naga.store :as store]
[naga.util :as u]
[schema.core :as s])
(:import [naga.structs Rule]
[naga.store Storage]
[naga.queue PQueue]))


(def true* (constantly true))


(s/defn extract-dirty-pattern :- EPVPattern
(s/defn extract-dirty-pattern :- (s/maybe EPVPattern)
"Takes a key and value pair (from a status map) and determines if
the value (a ConstraintData) is marked dirty. If it is dirty, then return
the key (an EPVPattern)."
Expand All @@ -29,7 +31,7 @@
[storage :- Storage
status :- StatusMap
p :- EPVPattern]
(let [resolution (store/resolve storage p)
(let [resolution (store/resolve-pattern storage p)
last-count (:last-count @(get status p))]
(when-not (= last-count (count resolution))
(with-meta p {:resolution resolution}))))
Expand Down Expand Up @@ -89,7 +91,9 @@
(if (nil? current-rule)
;; finished, build results as rule names mapped to how often
;; the rule was run
(u/mapmap :name (comp deref :execution-count) (vals rules))
[storage
(u/mapmap :name (comp deref :execution-count) (vals rules))]


;; find if any patterns have updated
(if-let [dirty-patterns (seq (keep extract-dirty-pattern
Expand Down Expand Up @@ -129,10 +133,14 @@
;; no dirty patterns, so rule did not need to be run
(recur remaining-queue storage)))))))

(s/defn run :- s/Bool
(s/defn run :- [(s/one Storage "Resulting data store")
(s/one {s/Str s/Num} "Execution stats")]
"Runs a program against a given configuration"
[config :- {s/Keyword s/Any}
{:keys [rules axioms]} :- Program]
(let [storage (store/get-storage-handle config)]
(store/assert-data storage axioms)
(execute rules storage)))
(let [storage (store/get-storage-handle config)
storage' (store/start-tx storage)
[output-storage stats] (->> (store/assert-data storage' axioms)
(execute rules))
result-storage (store/commit-tx output-storage)]
[result-storage stats]))
8 changes: 7 additions & 1 deletion src/naga/queue.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
(ns naga.queue
(ns ^{:doc "Defines a Queue structure that can be added to the tail, and removed from the head.
Anything already in the queue (compared by ID) will not be added again, but a function can
be provided that will update the element when it is already present.
Includes a 'salience' which allows elements to be promoted through the queue ahead
of less salient elements."
:author "Paula Gearon"}
naga.queue
(:refer-clojure :exclude [pop])
(:require [schema.core :as s :refer [=>]]))

Expand Down
15 changes: 8 additions & 7 deletions src/naga/rules.clj
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
(ns naga.rules
"Defines rule structures and constructors to keep them consistent"
(:require [schema.core :as s]
[naga.structs :as st :refer [EPVPattern RulePatternPair Body Axiom Program]]
[naga.util :as u])
(:import [clojure.lang Symbol]
[naga.structs Rule]))
(ns ^{:doc "Defines rule structures and constructors to keep them consistent"
:author "Paula Gearon"}
naga.rules
(:require [schema.core :as s]
[naga.structs :as st :refer [EPVPattern RulePatternPair Body Axiom Program]]
[naga.util :as u])
(:import [clojure.lang Symbol]
[naga.structs Rule]))

(defn- gen-rule-name [] (gensym "rule-"))

Expand Down
193 changes: 193 additions & 0 deletions src/naga/storage/memory.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
(ns ^{:doc "A storage implementation over in-memory indexing. Includes full query engine."
:author "Paula Gearon"}
naga.storage.memory
(:require [clojure.set :as set]
[schema.core :as s]
[naga.structs :as st :refer [EPVPattern Results Value]]
[naga.store :as store]
[naga.util :as u]
[naga.storage.memory-index :as mem])
(:import [clojure.lang Symbol]
[naga.store Storage]))

(s/defn without :- [s/Any]
"Returns a sequence minus a specific element"
[e :- s/Any
s :- [s/Any]]
(remove (partial = e) s))

(s/defn vars :- [Symbol]
"Return a seq of all variables in a pattern"
[pattern :- EPVPattern]
(filter mem/vartest? pattern))

(s/defn vars-set :- #{Symbol}
"Return a set of all variables in a pattern"
[pattern :- EPVPattern]
(into #{} (vars pattern)))

(s/defn paths :- [[EPVPattern]]
"Returns a seq of all paths through the constraints"
([patterns :- [EPVPattern]]
(let [all-paths (paths #{} patterns)]
(assert (every? (partial = (count patterns)) (map count all-paths))
(str "No valid paths through: " (into [] patterns)))
all-paths))
([bound :- #{Symbol}
patterns :- [EPVPattern]]
(apply concat
(keep ;; discard paths that can't proceed (they return nil)
(fn [p]
(let [b (vars-set p)]
;; only proceed when the pattern matches what has been bound
(if (or (empty? bound) (seq (set/intersection b bound)))
;; pattern can be added to the path, get the other patterns
(let [remaining (without p patterns)]
;; if there are more patterns to add to the path, recurse
(if (seq remaining)
(map (partial cons p)
(seq
(paths (into bound b) remaining)))
[[p]])))))
patterns))))

(s/defn min-join-path
"Calculates a plan based on no outer joins, and minimized joins"
[patterns :- [EPVPattern]
count-map :- {EPVPattern s/Num}]
(or
(->> (paths patterns)
(sort-by (partial mapv count-map))
first)
patterns)) ;; TODO: longest paths with minimized cross products

(s/defn user-plan
"Returns the original path specified by the user"
[patterns :- [EPVPattern]
_ :- {EPVPattern s/Num}]
patterns)

(s/defn select-planner
"Selects a query planner"
[options]
(let [opt (into #{} options)]
(condp #(get %2 %1) opt
:user user-plan
:min min-join-path
min-join-path)))

(s/defn matching-vars :- {s/Num s/Num}
"Returns pairs of indexes into seqs where the vars match"
[from :- [s/Any]
to :- [Symbol]]
(->> to
(keep-indexed
(fn [nt vt]
(seq
(keep-indexed
(fn [nf vf]
(if (and (mem/vartest? vf) (= vt vf))
[nf nt]))
from))))
(apply concat)
(into {})))

(s/defn modify-pattern :- [s/Any]
"Creates a new EPVPattern from an existing one, based on existing bindings."
[existing :- [Value]
mapping :- {s/Num s/Num}
pattern :- EPVPattern]
;; TODO: this is in an inner loop. Is it faster to:
;; (reduce (fn [p [f t]] (assoc p f t)) pattern mapping)
(map-indexed (fn [n v]
(if-let [x (mapping n)]
(nth existing x)
v))
pattern))

(s/defn left-join :- Results
"Takes a partial result, and joins on the resolution of a pattern"
[graph
part :- Results
pattern :- EPVPattern]
(let [cols (:cols (meta part))
total-cols (->> (vars pattern)
(remove (set cols))
(concat cols)
(into []))
pattern->left (matching-vars pattern cols)]
;; iterate over part, lookup pattern
(with-meta
(for [lrow part
:let [lookup (modify-pattern lrow pattern->left pattern)]
rrow (mem/resolve-pattern graph lookup)]
(concat lrow rrow))
{:cols total-cols})))

(s/defn join-patterns :- Results
"Joins the resolutions for a series of patterns into a single result."
[graph
patterns :- [EPVPattern]
& options]
(let [resolution-map (u/mapmap (fn [p]
(if-let [{r :resolution} (meta p)]
r
(mem/resolve-pattern graph p)))
patterns)

count-map (u/mapmap (comp count resolution-map) patterns)

query-planner (select-planner options)

;; run the query planner
[fpath & rpath] (query-planner patterns count-map)

;; execute the plan by joining left-to-right
ljoin (partial left-join graph)

part-result (with-meta
(resolution-map fpath)
{:cols (vars fpath)})]

(reduce ljoin part-result rpath)))


(s/defn project :- Results
[pattern :- [s/Any]
data :- Results]
(let [pattern->data (matching-vars pattern (:cols (meta data)))]
(map #(modify-pattern % pattern->data pattern) data)))

(s/defn add-to-graph
[graph
data :- Results]
(reduce (fn [acc d] (apply mem/graph-add acc d)) graph data))

(defrecord MemoryStore [graph]
Storage
(start-tx [this] this)

(commit-tx [this] this)

(resolve-pattern [_ pattern]
(mem/resolve-pattern graph pattern))

(query [_ output-pattern patterns]
(->> (join-patterns graph patterns)
(project output-pattern)))

(assert-data [_ data]
(->MemoryStore (add-to-graph graph data)))

(query-insert [this assertion-pattern patterns]
(->> (join-patterns graph patterns)
(project assertion-pattern)
(add-to-graph graph)
->MemoryStore)))

(def empty-store (->MemoryStore mem/empty-graph))

(s/defn create-store :- Storage
"Factory function to create a store"
[config]
empty-store)
73 changes: 73 additions & 0 deletions src/naga/storage/memory_index.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
(ns ^{:doc "A graph implementation with full indexing."
:author "Paula Gearon"}
naga.storage.memory-index
(:require [schema.core :as s]))

(def ? :?)

(s/defn index-add :- {s/Any {s/Any #{s/Any}}}
"Add elements to a 3-level index"
[idx :- {s/Any {s/Any s/Any}}
a :- s/Any
b :- s/Any
c :- s/Any]
(update-in idx [a b] (fn [v] (if (seq v) (conj v c) #{c}))))

(s/defn index-delete :- {s/Any {s/Any #{s/Any}}}
"Remove elements from a 3-level index. Returns the new index, or nil if there is no change."
[idx :- {s/Any {s/Any #{s/Any}}}
a :- s/Any
b :- s/Any
c :- s/Any]
(if-let [idx2 (idx a)]
(if-let [idx3 (idx2 b)]
(let [new-idx3 (disj idx3 c)]
(if-not (identical? new-idx3 idx3)
(let [new-idx2 (if (seq new-idx3) (assoc idx2 b new-idx3) (dissoc idx2 b))
new-idx (if (seq new-idx2) (assoc idx a new-idx2) (dissoc idx a))]
new-idx))))))

(s/defn vartest? :- s/Bool
[x]
(and (symbol? x) (= \? (first (name x)))))

(defn- simplify [g & ks] (map #(if (vartest? %) ? :v) ks))

(defmulti index-get "Lookup an index in the graph for the requested data" simplify)

(defmethod index-get [:v :v :v] [{idx :spo} s p o] (let [os (get-in idx [s p])] (if (get os o) [[]] [])))
(defmethod index-get [:v :v ?] [{idx :spo} s p o] (map vector (get-in idx [s p])))
(defmethod index-get [:v ? :v] [{idx :osp} s p o] (map vector (get-in idx [o s])))
(defmethod index-get [:v ? ?] [{idx :spo} s p o] (let [edx (idx s)] (for [p (keys edx) o (edx p)] [p o])))
(defmethod index-get [ ? :v :v] [{idx :pos} s p o] (map vector (get-in idx [p o])))
(defmethod index-get [ ? :v ?] [{idx :pos} s p o] (let [edx (idx p)] (for [o (keys edx) s (edx o)] [s o])))
(defmethod index-get [ ? ? :v] [{idx :osp} s p o] (let [edx (idx o)] (for [s (keys edx) p (edx s)] [s p])))
(defmethod index-get [ ? ? ?] [{idx :spo} s p o] (for [s (keys idx) p (keys (idx s)) o ((idx s) p)] [s p o]))

(defprotocol Graph
(graph-add [this subj pred obj] "Adds triples to the graph")
(graph-delete [this subj pred obj] "Removes triples from the graph")
(resolve-triple [this subj pred obj] "Resolves patterns from the graph, and returns unbound columns only"))

(defrecord GraphIndexed [spo pos osp]
Graph
(graph-add [this subj pred obj]
(let [new-spo (index-add spo subj pred obj)]
(if (identical? spo new-spo)
this
(assoc this :spo new-spo
:pos (index-add pos pred obj subj)
:osp (index-add osp obj subj pred)))))
(graph-delete [this subj pred obj]
(if-let [idx (index-delete spo subj pred obj)]
(assoc this :spo idx :pos (index-delete pos pred obj subj) :osp (index-delete osp obj subj pred))
this))
(resolve-triple [this subj pred obj]
(index-get this subj pred obj)))

(defn resolve-pattern
"Convenience function to extract elements out of a pattern to query for it"
[graph [s p o :as pattern]]
(resolve-triple graph s p o))

(def empty-graph (->GraphIndexed {} {} {}))
Loading

0 comments on commit d88214a

Please sign in to comment.