Skip to content

Commit

Permalink
Merge pull request #198 from scylladb/overlapping_reads_and_writes
Browse files Browse the repository at this point in the history
gemini: Fix overlapping operations on the same partition key
  • Loading branch information
Henrik Johansson authored Aug 26, 2019
2 parents 7a4a7f5 + 2b3e31c commit 9c22b7c
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 58 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# Unreleased

- Fix overlapping operations on the same partition key ([#198](https://github.com/scylladb/gemini/issues/198)).
- Partition keys can now be drawn from various distributions such as ___"zipf"___,
___"uniform"___ and ___"normal"___. The CLI argument `--partition-key-distribution` is used
to select which distribution to use. The default is `normal`.
Expand Down
13 changes: 11 additions & 2 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig
return
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
token, mutateValues := mutateStmt.Values()
defer func() {
v := make(gemini.Value, len(table.PartitionKeys))
copy(v, mutateValues)
source.GiveOld(gemini.ValueWithToken{Token: token, Value: v})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
Expand All @@ -208,7 +213,11 @@ func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConf
return
}
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
token, checkValues := checkStmt.Values()
defer func() {
// Signal done with this pk...
source.GiveOld(gemini.ValueWithToken{Token: token})
}()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL()))
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ import (
"text/tabwriter"
"time"

"gonum.org/v1/gonum/stat/distuv"

"github.com/briandowns/spinner"
"github.com/gocql/gocql"
"github.com/pkg/errors"
Expand All @@ -28,6 +26,7 @@ import (
"go.uber.org/zap/zapcore"
"golang.org/x/exp/rand"
"golang.org/x/net/context"
"gonum.org/v1/gonum/stat/distuv"
)

var (
Expand Down
84 changes: 64 additions & 20 deletions generator.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,63 @@
package gemini

import (
"sync"
"time"

"go.uber.org/zap"

"github.com/scylladb/gemini/murmur"
"github.com/scylladb/go-set/u64set"
"go.uber.org/zap"
"golang.org/x/exp/rand"
)

type DistributionFunc func() uint64

type Source struct {
values []Value
values []ValueWithToken
idxFunc func() uint64
oldValues chan Value
oldValues chan ValueWithToken
inFlight syncU64set
}

func (s *Source) Get() (Value, bool) {
v := s.pick()
values := make([]interface{}, len(v))
// Make a copy to allow callers to work with the slice directly
copy(values, v)
select {
case s.oldValues <- v:
default:
// Old source is full, just drop the value
//Get returns a new value and ensures that it's corresponding token
//is not already in-flight.
func (s *Source) Get() (ValueWithToken, bool) {
for {
v := s.pick()
if s.inFlight.addIfNotPresent(v.Token) {
return v, true
}
}
return values, true
}

func (s *Source) GetOld() (Value, bool) {
//GetOld returns a previously used value and token or a new if
//the old queue is empty.
func (s *Source) GetOld() (ValueWithToken, bool) {
select {
case v, ok := <-s.oldValues:
return v, ok
default:
// There are no old values so we generate a new
return s.pick(), true
return s.Get()
}
}

// GiveOld returns the supplied value for later reuse unless the value
//is empty in which case it removes the corresponding token from the
// in-flight tracking.
func (s *Source) GiveOld(v ValueWithToken) {
if len(v.Value) == 0 {
s.inFlight.delete(v.Token)
return
}
select {
case s.oldValues <- v:
default:
// Old source is full, just drop the value
}
}

func (s *Source) pick() Value {
func (s *Source) pick() ValueWithToken {
return s.values[s.idxFunc()]
}

Expand All @@ -68,9 +84,10 @@ func NewGenerator(table *Table, config *GeneratorsConfig, logger *zap.Logger) *G
generators := make([]*Source, config.Size)
for i := uint64(0); i < config.Size; i++ {
generators[i] = &Source{
values: make([]Value, 0, config.DistributionSize),
values: make([]ValueWithToken, 0, config.DistributionSize),
idxFunc: config.DistributionFunc,
oldValues: make(chan Value, config.PkUsedBufferSize),
oldValues: make(chan ValueWithToken, config.PkUsedBufferSize),
inFlight: syncU64set{pks: u64set.New()},
}
}
gs := &Generators{
Expand All @@ -90,6 +107,11 @@ func (gs Generators) Get(idx int) *Source {
return gs.generators[idx]
}

type ValueWithToken struct {
Token uint64
Value Value
}

func (gs *Generators) create() {
gs.logger.Info("generating partition keys, this can take a while", zap.Uint64("distribution_size", gs.distributionSize))
start := time.Now()
Expand All @@ -105,7 +127,7 @@ func (gs *Generators) create() {
continue
}
if uint64(len(source.values)) < gs.distributionSize {
source.values = append(source.values, Value(values))
source.values = append(source.values, ValueWithToken{Token: hash, Value: values})
}
if uint64(len(source.values)) == gs.distributionSize {
gs.logger.Debug("partial generation", zap.Uint64("source", idx), zap.Int("size", len(source.values)))
Expand All @@ -130,3 +152,25 @@ func hash(rkc *RoutingKeyCreator, t *Table, values []interface{}) uint64 {
b, _ := rkc.CreateRoutingKey(t, values)
return uint64(murmur.Murmur3H1(b))
}

type syncU64set struct {
pks *u64set.Set
mu sync.Mutex
}

func (s *syncU64set) delete(v uint64) bool {
s.mu.Lock()
_, found := s.pks.Pop2()
s.mu.Unlock()
return found
}

func (s *syncU64set) addIfNotPresent(v uint64) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.pks.Has(v) {
return false
}
s.pks.Add(v)
return true
}
Loading

0 comments on commit 9c22b7c

Please sign in to comment.