Skip to content

Commit

Permalink
mongo: apply tx in a specific order to avoid broken quads for concurr…
Browse files Browse the repository at this point in the history
…ent reads
  • Loading branch information
dennwc committed Apr 11, 2017
1 parent a98837a commit 82eb692
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 19 deletions.
38 changes: 20 additions & 18 deletions graph/mongo/quadstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,18 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
}
}
}
var dn int
if d.Action == graph.Add {
dn = 1
} else {
dn = -1
}
ids[d.Quad.Subject] += dn
ids[d.Quad.Object] += dn
ids[d.Quad.Predicate] += dn
if d.Quad.Label != nil {
ids[d.Quad.Label] += dn
}
}
if clog.V(2) {
clog.Infof("Existence verified. Proceeding.")
Expand All @@ -338,30 +350,20 @@ func (qs *QuadStore) ApplyDeltas(deltas []graph.Delta, ignoreOpts graph.IgnoreOp
return &graph.DeltaError{Delta: d, Err: err}
}
}
for _, d := range deltas {
err := qs.updateQuad(d.Quad, d.ID.Int(), d.Action)
if err != nil {
return &graph.DeltaError{Delta: d, Err: err}
}
var countdelta int
if d.Action == graph.Add {
countdelta = 1
} else {
countdelta = -1
}
ids[d.Quad.Subject] += countdelta
ids[d.Quad.Object] += countdelta
ids[d.Quad.Predicate] += countdelta
if d.Quad.Label != nil {
ids[d.Quad.Label] += countdelta
}
}
// make sure to create all nodes before writing any quads
// concurrent reads may observe broken quads in other case
for k, v := range ids {
err := qs.updateNodeBy(k, v)
if err != nil {
return err
}
}
for _, d := range deltas {
err := qs.updateQuad(d.Quad, d.ID.Int(), d.Action)
if err != nil {
return &graph.DeltaError{Delta: d, Err: err}
}
}
qs.session.SetSafe(&mgo.Safe{})
return nil
}
Expand Down
75 changes: 74 additions & 1 deletion graph/mongo/quadstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
"github.com/cayleygraph/cayley/graph/graphtest"
"github.com/cayleygraph/cayley/graph/path/pathtest"
"github.com/cayleygraph/cayley/internal/dock"
"bytes"
"math/rand"
"sync"
"github.com/cayleygraph/cayley/quad"
)

func makeMongo(t testing.TB) (graph.QuadStore, graph.Options, func()) {
var conf dock.Config

conf.Image = "mongo:3"
conf.Image = "mongo:3.2.12"
conf.OpenStdin = true
conf.Tty = true

Expand Down Expand Up @@ -47,3 +51,72 @@ func TestMongoAll(t *testing.T) {
func TestMongoPaths(t *testing.T) {
pathtest.RunTestMorphisms(t, makeMongo)
}

func randString() string {
const n = 60
b := bytes.NewBuffer(nil)
b.Grow(n)
for i := 0; i < n; i++ {
b.WriteByte(byte('a' + rand.Intn(26)))
}
return b.String()
}

func TestMongoConcurrent(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
qs, opts, closer := makeMongo(t)
defer closer()
if opts == nil {
opts = make(graph.Options)
}
opts["ignore_duplicate"] = true
qw := graphtest.MakeWriter(t, qs, opts)

const n = 1000
subjects := make([]string, 0, n/4)
for i := 0; i < cap(subjects); i++ {
subjects = append(subjects, randString())
}
var wg sync.WaitGroup
wg.Add(2)
done := make(chan struct{})
go func() {
defer wg.Done()
defer close(done)
for i := 0; i < n; i++ {
n1 := subjects[rand.Intn(len(subjects))]
n2 := subjects[rand.Intn(len(subjects))]
t := graph.NewTransaction()
t.AddQuad(quad.Make(n1, "link", n2, nil))
t.AddQuad(quad.Make(n2, "link", n1, nil))
if err := qw.ApplyTransaction(t); err != nil {
panic(err)
}
}
}()

go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
}
n1 := subjects[rand.Intn(len(subjects))]
it := qs.QuadIterator(quad.Subject, qs.ValueOf(quad.String(n1)))
for it.Next() {
q := qs.Quad(it.Result())
_ = q.Subject.Native()
_ = q.Predicate.Native()
_ = q.Object.Native()
}
if err := it.Close(); err != nil {
panic(err)
}
}
}()
wg.Wait()
}

0 comments on commit 82eb692

Please sign in to comment.