Skip to content

Commit

Permalink
Merge branch 'main' of github.com:threatgrid/asami
Browse files Browse the repository at this point in the history
  • Loading branch information
Paula Gearon committed Oct 10, 2021
2 parents 5e5e06e + 24bdce1 commit ef87009
Show file tree
Hide file tree
Showing 12 changed files with 220 additions and 62 deletions.
26 changes: 25 additions & 1 deletion src/asami/common_index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,38 @@
and multigraph implementations."
:author "Paula Gearon"}
asami.common-index
(:require [asami.graph :refer [Graph graph-add graph-delete graph-diff resolve-triple count-triple broad-node-type?]]
(:require [asami.datom :as datom :refer [->Datom]]
[asami.graph :refer [Graph graph-add graph-delete graph-diff resolve-triple count-triple broad-node-type?]]
[asami.internal :as internal]
[zuko.schema :as st]
[clojure.set :as set]
#?(:clj [schema.core :as s]
:cljs [schema.core :as s :include-macros true]))
#?(:clj (:import [clojure.lang ITransientCollection])))


(defn graph-transact
"Common graph transaction operation"
[graph tx-id assertions retractions generated-data]
(let [[a r] @generated-data
asserts (transient a)
retracts (transient r)
new-graph (as-> graph gr
(reduce (fn [acc [s p o]]
(let [ad (graph-delete acc s p o)]
(when-not (identical? ad acc)
(conj! retracts (->Datom s p o tx-id false)))
ad))
gr retractions)
(reduce (fn [acc [s p o]]
(let [aa (graph-add acc s p o tx-id)]
(when-not (identical? aa acc)
(conj! asserts (->Datom s p o tx-id true)))
aa))
gr assertions))]
(vreset! generated-data [(persistent! asserts) (persistent! retracts)])
new-graph))

(defprotocol NestedIndex
(lowest-level-fn [this] "Returns a function for handling the lowest index level retrieval")
(lowest-level-sets-fn [this] "Returns a function retrieving all lowest level values as sets")
Expand Down
108 changes: 89 additions & 19 deletions src/asami/core.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
[asami.memory :as memory]
#?(:clj [asami.durable.store :as durable]) ;; TODO: make this available to CLJS when ready
[asami.query :as query]
[asami.datom :as datom :refer [->Datom]]
[asami.graph :as gr]
[asami.entities :as entities]
[asami.entities.general :refer [GraphType]]
Expand Down Expand Up @@ -62,7 +61,7 @@
database was created, false if it already exists."
[uri :- s/Str]
(boolean
(if-not (@connections uri)
(when-not (@connections uri)
(swap! connections assoc uri (connection-for uri)))))

(s/defn connect :- ConnectionType
Expand Down Expand Up @@ -125,6 +124,17 @@
(swap! connections assoc uri c)
c)))

(defn check-attachment
"Checks if a connection is attached to the connections map.
If not, then connect. Returns the connection if previously connected,
false if it needed to be reconnected."
[connection]
(let [url (storage/get-url connection)]
(or (@connections url)
(do
(swap! connections assoc url connection)
false))))

(def db storage/db)
(def as-of storage/as-of)
(def as-of-t storage/as-of-t)
Expand All @@ -148,13 +158,13 @@
(s/optional-key :update-fn) (s/pred fn?)}
[s/Any]))

(s/defn transact
(s/defn transact-async
;; returns a deref'able object that derefs to:
;; {:db-before DatabaseType
;; :db-after DatabaseType
;; :tx-data [datom/DatomType]
;; :tempids {s/Any s/Any}}
"Updates a database. This is currently synchronous, but returns a future or delay for compatibility with Datomic.
"Updates a database.
connection: The connection to the database to be updated.
tx-info: This is either a seq of items to be transacted, or a map.
If this is a map, then a :tx-data value will contain the same type of seq that tx-info may have.
Expand All @@ -180,44 +190,104 @@
:tempids mapping of the temporary IDs in entities to the allocated nodes"
[{:keys [name state] :as connection} :- ConnectionType
{:keys [tx-data tx-triples executor update-fn] :as tx-info} :- TransactData]

;; Detached databases need to be reattached when transacted into
(check-attachment connection)

(let [op (if update-fn
(fn []
(let [[db-before db-after] (storage/transact-update connection update-fn)]
{:db-before db-before
:db-after db-after}))
(fn []
(let [tx-id (storage/next-tx connection)
as-datom (fn [assert? [e a v]] (->Datom e a v tx-id assert?))
current-db (storage/db connection)
(let [current-db (storage/db connection)
;; single maps should not be passed in, but if they are then wrap them
seq-wrapper (fn [x] (if (map? x) [x] x))
;; a volatile to capture data for the user
generated-data (volatile! [tx-triples nil {}])
;; volatiles to capture data for the user
;; This is to avoid passing parameters to functions that users may want to call directly
;; and especially to avoid the difficulty of asking users to of return multiple structures
vtempids (volatile! {}) ;; volatile to capture the tempid map from built-triples
generated-data (volatile! [[] []]) ;; volatile to capture the asserted and retracted data in a transaction
[db-before db-after] (if tx-triples
;; simple assertion of triples
(storage/transact-data connection (seq-wrapper tx-triples) nil)
(storage/transact-data connection generated-data (seq-wrapper tx-triples) nil)
;; a seq of statements and/or entities
;; this generates triples and retractions inside a transaction
;; capture this data to return to the user
;; convert these to assertions/retractions and send to transaction
;; also, capture tempids that are generated during conversion
(storage/transact-data connection
generated-data
(fn [graph]
;; building triples returns a tuple of assertions, retractions, tempids
(vreset! generated-data
(entities/build-triples graph (seq-wrapper (or tx-data tx-info)))))))
(let [[_ _ tempids :as result]
(entities/build-triples graph (seq-wrapper (or tx-data tx-info)))]
(vreset! vtempids tempids)
result))))
;; pull out the info captured during the transaction
[triples retracts tempids] (deref generated-data)]
[triples retracts] (deref generated-data)]
{:db-before db-before
:db-after db-after
:tx-data (concat
(map (partial as-datom false) retracts)
(map (partial as-datom true) triples))
:tempids tempids})))]
:tx-data (concat retracts triples)
:tempids @vtempids})))]
#?(:clj (CompletableFuture/supplyAsync (reify Supplier (get [_] (op)))
(or executor clojure.lang.Agent/soloExecutor))
:cljs (let [d (delay (op))]
(force d)
d))))

;; set a generous default transaction timeout of 100 seconds
#?(:clj (def ^:const default-tx-timeout 100000))

#?(:clj
(defn get-timeout
"Retrieves the timeout value to use in ms"
[]
(or (System/getProperty "asami.txTimeoutMsec")
(System/getProperty "datomic.txTimeoutMsec")
default-tx-timeout)))

#?(:clj
(s/defn transact
"This returns a completed future with the data from a transaction.
See the documentation for transact-async for full details on arguments.
If the transaction times out, the call to transact will throw an ExceptionInfo exception.
The default is 100 seconds
The result derefs to a map of:
:db-before database value before the transaction
:db-after database value after the transaction
:tx-data a sequence of the transacted datom operations
:tempids a map of temporary id values and the db identifiers that were allocated for them}"
;; returns a deref'able object that derefs to:
;; {:db-before DatabaseType
;; :db-after DatabaseType
;; :tx-data [datom/DatomType]
;; :tempids {s/Any s/Any}}
[connection :- ConnectionType
tx-info :- TransactData]
(let [transact-future (transact-async connection tx-info)
timeout (get-timeout)]
(when (= ::timeout (deref transact-future timeout ::timeout))
(throw (ex-info "Transaction timeout" {:timeout timeout})))
transact-future))

:cljs
(s/defn transact
"This is a thin wrapper around the transact-async function.
TODO: convert this to a promise-based approach for the async implementation
See the documentation for transact-async for full details on arguments.
returns a deref'able object that derefs to a map of:
:db-before database value before the transaction
:db-after database value after the transaction
:tx-data a sequence of the transacted datom operations
:tempids a map of temporary id values and the db identifiers that were allocated for them}"
;; {:db-before DatabaseType
;; :db-after DatabaseType
;; :tx-data [datom/DatomType]
;; :tempids {s/Any s/Any}}
[connection :- ConnectionType
tx-info :- TransactData]
(transact-async connection tx-info)))

(defn- graphs-of
"Converts Database objects to the graph that they wrap. Other arguments are returned unmodified."
[inputs]
Expand Down
12 changes: 8 additions & 4 deletions src/asami/durable/graph.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
;; The statement already existed. The pools SHOULD be identical, but check in case they're not
(if (identical? pool new-pool)
this
(assoc this :pool new-pool)))))
(do
(log/warn "A statement existed that used an element not found in the data pool")
(assoc this :pool new-pool))))))

(graph-delete
[this subj pred obj]
Expand All @@ -91,9 +93,11 @@

(graph-transact
[this tx-id assertions retractions]
(as-> this graph
(reduce (fn [acc [s p o]] (graph/graph-delete acc s p o)) graph retractions)
(reduce (fn [acc [s p o]] (graph/graph-add acc s p o tx-id)) graph assertions)))
(common-index/graph-transact this tx-id assertions retractions (volatile! [[] [] {}])))

(graph-transact
[this tx-id assertions retractions generated-data]
(common-index/graph-transact this tx-id assertions retractions generated-data))

(graph-diff
[this other]
Expand Down
16 changes: 11 additions & 5 deletions src/asami/durable/store.cljc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(ns ^{:doc "The implements the Block storage version of a Graph/Database/Connection"
:author "Paula Gearon"}
asami.durable.store
(:require [asami.storage :as storage :refer [ConnectionType DatabaseType]]
(:require [asami.storage :as storage :refer [ConnectionType DatabaseType UpdateData]]
[asami.graph :as graph]
[asami.internal :as i :refer [now instant? long-time]]
[asami.durable.common :as common
Expand Down Expand Up @@ -225,27 +225,33 @@
"Removes a series of tuples from the latest graph, and asserts new tuples into the graph.
Updates the connection to the new graph."
([conn :- ConnectionType
updates! :- UpdateData
asserts :- [Triple] ;; triples to insert
retracts :- [Triple]] ;; triples to remove
(transact-update* conn (fn [graph tx-id] (graph/graph-transact graph tx-id asserts retracts))))
(transact-update* conn (fn [graph tx-id] (graph/graph-transact graph tx-id asserts retracts updates!))))
([conn :- ConnectionType
updates! :- UpdateData
generator-fn]
(transact-update* conn
(fn [graph tx-id]
(let [[asserts retracts] (generator-fn graph)]
(graph/graph-transact graph tx-id asserts retracts))))))
(graph/graph-transact graph tx-id asserts retracts updates!))))))

(s/defn get-url* :- s/Str
[{:keys [name]} :- ConnectionType]
(str "asami:local://" name))

(defrecord DurableConnection [name tx-manager grapha nodea lock]
storage/Connection
(get-name [this] name)
(get-url [this] (get-url* this))
(next-tx [this] (common/tx-count tx-manager))
(db [this] (db* this))
(delete-database [this] (delete-database* this))
(release [this] (release* this))
(transact-update [this update-fn] (transact-update* this update-fn))
(transact-data [this asserts retracts] (transact-data* this asserts retracts))
(transact-data [this generator-fn] (transact-data* this generator-fn))
(transact-data [this updates! asserts retracts] (transact-data* this updates! asserts retracts))
(transact-data [this updates! generator-fn] (transact-data* this updates! generator-fn))
common/Lockable
(lock! [this] #?(:clj (.lock ^Lock lock)))
(unlock! [this] #?(:clj (.unlock ^Lock lock))))
Expand Down
5 changes: 4 additions & 1 deletion src/asami/graph.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
(new-graph [this] "Creates an empty graph of the same type")
(graph-add [this subj pred obj] [this subj pred obj tx] "Adds triples to the graph")
(graph-delete [this subj pred obj] "Removes triples from the graph")
(graph-transact [this tx-id assertions retractions] "Bulk operation to add and remove multiple statements in a single operation")
(graph-transact
[this tx-id assertions retractions]
[this tx-id assertions retractions generated]
"Bulk operation to add and remove multiple statements in a single operation")
(graph-diff [this other] "Returns all subjects that have changed in this graph, compared to other")
(resolve-triple [this subj pred obj] "Resolves patterns from the graph, and returns unbound columns only")
(count-triple [this subj pred obj] "Resolves patterns from the graph, and returns the size of the resolution"))
Expand Down
6 changes: 3 additions & 3 deletions src/asami/index.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@
(log/trace "statement did not exist")
this)))
(graph-transact [this tx-id assertions retractions]
(as-> this graph
(reduce (fn [acc [s p o]] (graph-delete acc s p o)) graph retractions)
(reduce (fn [acc [s p o]] (graph-add acc s p o tx-id)) graph assertions)))
(common/graph-transact this tx-id assertions retractions (volatile! [[] [] {}])))
(graph-transact [this tx-id assertions retractions generated-data]
(common/graph-transact this tx-id assertions retractions generated-data))
(graph-diff [this other]
(when-not (= (type this) (type other))
(throw (ex-info "Unable to compare diffs between graphs of different types" {:this this :other other})))
Expand Down
Loading

0 comments on commit ef87009

Please sign in to comment.