Skip to content

Commit

Permalink
Merge pull request #152 from scylladb/new_partitioning_scheme_patch
Browse files Browse the repository at this point in the history
gemini: ensure partitioon key buffers don't block
  • Loading branch information
Henrik Johansson authored Jul 2, 2019
2 parents d289502 + ddb2900 commit 9e5880b
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 18 deletions.
28 changes: 24 additions & 4 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,10 +364,12 @@ func runJob(f testJob, schema *gemini.Schema, schemaConfig *gemini.SchemaConfig,
var gs []*gemini.Generators
for _, table := range schema.Tables {
gCfg := &gemini.GeneratorsConfig{
Table: table,
Partitions: &partitionRangeConfig,
Size: concurrency,
Seed: seed,
Table: table,
Partitions: &partitionRangeConfig,
Size: concurrency,
Seed: seed,
PkBufferSize: 10000,
PkUsedBufferSize: 100000,
}
g := gemini.NewGenerator(gCfg)
gs = append(gs, g)
Expand Down Expand Up @@ -429,6 +431,12 @@ func ddlJob(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig,
testStatus.WriteErrors++
return
}
if ddlStmts == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "ddl"))
}
return
}
defer postStmtHook()
defer func() {
if verbose {
Expand Down Expand Up @@ -462,6 +470,12 @@ func mutationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaCon
testStatus.WriteErrors++
return
}
if mutateStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "mutation"))
}
return
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
Expand All @@ -482,6 +496,12 @@ func mutationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaCon

func validationJob(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, partitionValues <-chan gemini.Value, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, partitionValues, r, p)
if checkStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "validation"))
}
return
}
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
Expand Down
33 changes: 27 additions & 6 deletions generator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gemini

import (
"fmt"

"github.com/scylladb/gemini/murmur"
"golang.org/x/exp/rand"
)
Expand All @@ -22,18 +24,20 @@ type Generators struct {
}

type GeneratorsConfig struct {
Table *Table
Partitions *PartitionRangeConfig
Size uint64
Seed uint64
Table *Table
Partitions *PartitionRangeConfig
Size uint64
Seed uint64
PkBufferSize uint64
PkUsedBufferSize uint64
}

func NewGenerator(config *GeneratorsConfig) *Generators {
generators := make([]*source, config.Size)
for i := uint64(0); i < config.Size; i++ {
generators[i] = &source{
newValues: make(chan Value, 10000),
oldValues: make(chan Value, 20000),
newValues: make(chan Value, config.PkBufferSize),
oldValues: make(chan Value, config.PkUsedBufferSize),
}
}
gs := &Generators{
Expand Down Expand Up @@ -80,3 +84,20 @@ func (gs *Generators) start() {
}
}()
}

func sendIfPossible(values chan Value, value Value) {
select {
case values <- value:
default:
fmt.Println("Skipped returning an partition key")
}
}

func recvIfPossible(values <-chan Value) (Value, bool) {
select {
case values, ok := <-values:
return values, ok
default:
return nil, false
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.uber.org/zap v1.10.0
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac
gopkg.in/inf.v0 v0.9.1
honnef.co/go/tools v0.0.0-2019.2.1 // indirect
)
Expand Down
17 changes: 9 additions & 8 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func (s *Schema) GenInsertStmt(t *Table, newPartitionValues <-chan Value, oldPar
values := make([]interface{}, len(vals))
copy(values, vals)
defer func() {
oldPartitionValues <- vals
sendIfPossible(oldPartitionValues, vals)
}()
for _, ck := range t.ClusteringKeys {
builder = builder.Columns(ck.Name)
Expand Down Expand Up @@ -531,7 +531,7 @@ func (s *Schema) GenInsertJsonStmt(t *Table, newPartitionValues <-chan Value, ol
t.mu.RLock()
defer t.mu.RUnlock()

vals, ok := <-newPartitionValues
vals, ok := recvIfPossible(newPartitionValues)
if !ok {
return nil, nil
}
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s *Schema) GenDeleteRows(t *Table, newPartitionValues <-chan Value, oldPar
values := make([]interface{}, len(vals))
copy(values, vals)
defer func() {
oldPartitionValues <- vals
sendIfPossible(oldPartitionValues, vals)
}()
if len(t.ClusteringKeys) > 0 {
ck := t.ClusteringKeys[0]
Expand Down Expand Up @@ -698,7 +698,7 @@ func (s *Schema) genSinglePartitionQuery(t *Table, partitionValues <-chan Value,
builder = builder.Where(qb.Eq(pk.Name))
typs = append(typs, pk.Type)
}
values, ok := <-partitionValues
values, ok := recvIfPossible(partitionValues)
if !ok {
return nil
}
Expand Down Expand Up @@ -737,7 +737,7 @@ func (s *Schema) genMultiplePartitionQuery(t *Table, partitionValues <-chan Valu
for i, pk := range partitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, pkNum))
for j := 0; j < pkNum; j++ {
vs, ok := <-partitionValues
vs, ok := recvIfPossible(partitionValues)
if !ok {
return nil
}
Expand Down Expand Up @@ -773,8 +773,9 @@ func (s *Schema) genClusteringRangeQuery(t *Table, partitionValues <-chan Value,
clusteringKeys = t.MaterializedViews[view].ClusteringKeys
}*/
builder := qb.Select(s.Keyspace.Name + "." + tableName)
values, ok := <-partitionValues
values, ok := recvIfPossible(partitionValues)
if !ok {
// Done or no values available...
return nil
}
for _, pk := range partitionKeys {
Expand Down Expand Up @@ -830,7 +831,7 @@ func (s *Schema) genMultiplePartitionClusteringRangeQuery(t *Table, partitionVal
for i, pk := range partitionKeys {
builder = builder.Where(qb.InTuple(pk.Name, pkNum))
for j := 0; j < pkNum; j++ {
vs, ok := <-partitionValues
vs, ok := recvIfPossible(partitionValues)
if !ok {
return nil
}
Expand Down Expand Up @@ -878,7 +879,7 @@ func (s *Schema) genSingleIndexQuery(t *Table, partitionValues <-chan Value, r *
pkNum = 1
}
*/
values, ok := <-partitionValues
values, ok := recvIfPossible(partitionValues)
if !ok {
return nil
}
Expand Down

0 comments on commit 9e5880b

Please sign in to comment.