Skip to content

Commit

Permalink
Merge pull request #159 from scylladb/tuning_pk_generation
Browse files Browse the repository at this point in the history
gemini: partition key generation improved
  • Loading branch information
Henrik Johansson authored Jul 5, 2019
2 parents 896858c + 5a3c963 commit 3b3cff0
Show file tree
Hide file tree
Showing 13 changed files with 636 additions and 452 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# Changelog

# Unreleased

- A `source` concept is used to coordinate the creation, consumption and reuse of
partition keys.
- Two new CLI args are introduced to control the buffer sizes of the new and reusable
partition keys `partition-key-buffer-size` and `partition-key-buffer-reuse-size`.
- The CLI arg `concurrency` now means the total number of actors per job type.
You may need to scale down your settings for this argument since for example a
mixed mode execution will run with twice as many goroutines. Experimentation is
encouraged since a high number will also yield much greater throughput.

## 1.3.4

- Shutdown is no longer waiting for the warmup phase to complete.
Expand Down
27 changes: 27 additions & 0 deletions cmd/gemini/generators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import "github.com/scylladb/gemini"

func createGenerators(schema *gemini.Schema, schemaConfig gemini.SchemaConfig, actors uint64) []*gemini.Generators {
partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
MaxStringLength: schemaConfig.MaxStringLength,
MinStringLength: schemaConfig.MinStringLength,
}

var gs []*gemini.Generators
for _, table := range schema.Tables {
gCfg := &gemini.GeneratorsConfig{
Table: table,
Partitions: partitionRangeConfig,
Size: actors,
Seed: seed,
PkBufferSize: pkBufferSize,
PkUsedBufferSize: pkBufferReuseSize,
}
g := gemini.NewGenerator(gCfg)
gs = append(gs, g)
}
return gs
}
224 changes: 224 additions & 0 deletions cmd/gemini/jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
package main

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/scylladb/gemini"
"github.com/scylladb/gemini/store"
"go.uber.org/zap"
"golang.org/x/exp/rand"
)

// MutationJob continuously applies mutations against the database
// for as long as the pump is active.
func MutationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
schemaConfig := &schemaCfg
logger = logger.Named("mutation_job")
testStatus := Status{}
var i int
for hb := range pump {
hb.await()
ind := r.Intn(1000000)
if ind%100000 == 0 {
ddl(ctx, schema, schemaConfig, table, s, r, p, &testStatus, logger)
} else {
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, true, logger)
}
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
break
}
i++
}
}

// ValidationJob continuously applies validations against the database
// for as long as the pump is active.
func ValidationJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
schemaConfig := &schemaCfg
logger = logger.Named("validation_job")

testStatus := Status{}
var i int
for hb := range pump {
hb.await()
validation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
if failFast && (testStatus.ReadErrors > 0 || testStatus.WriteErrors > 0) {
break
}
i++
}
}

// WarmupJob continuously applies mutations against the database
// for as long as the pump is active or the supplied duration expires.
func WarmupJob(ctx context.Context, pump <-chan heartBeat, wg *sync.WaitGroup, schema *gemini.Schema, schemaCfg gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, c chan Status, mode string, warmup time.Duration, logger *zap.Logger) {
defer wg.Done()
schemaConfig := &schemaCfg
testStatus := Status{}
var i int
warmupTimer := time.NewTimer(warmup)
for {
select {
case _, ok := <-pump:
if !ok {
logger.Info("warmup job terminated")
return
}
}
select {
case <-warmupTimer.C:
c <- testStatus
return
default:
mutation(ctx, schema, schemaConfig, table, s, r, p, source, &testStatus, false, logger)
if i%1000 == 0 {
c <- testStatus
testStatus = Status{}
}
}
}
}

func job(done *sync.WaitGroup, f testJob, actors uint64, schema *gemini.Schema, schemaConfig gemini.SchemaConfig, s store.Store, pump *Pump, generators []*gemini.Generators, result chan Status, logger *zap.Logger) {
defer done.Done()
var finished sync.WaitGroup
finished.Add(1)

// Wait group for the worker goroutines.
var workers sync.WaitGroup
workerCtx, _ := context.WithCancel(context.Background())
workers.Add(len(schema.Tables) * int(actors))

partitionRangeConfig := gemini.PartitionRangeConfig{
MaxBlobLength: schemaConfig.MaxBlobLength,
MinBlobLength: schemaConfig.MinBlobLength,
MaxStringLength: schemaConfig.MaxStringLength,
MinStringLength: schemaConfig.MinStringLength,
}

for j, table := range schema.Tables {
for i := 0; i < int(actors); i++ {
r := rand.New(rand.NewSource(seed))
go f(workerCtx, pump.ch, &workers, schema, schemaConfig, table, s, r, partitionRangeConfig, generators[j].Get(i), result, mode, warmup, logger)
}
}

workers.Wait()
}

func ddl(ctx context.Context, schema *gemini.Schema, sc *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, testStatus *Status, logger *zap.Logger) {
if sc.CQLFeature != gemini.CQL_FEATURE_ALL {
logger.Debug("ddl statements disabled")
return
}
table.Lock()
defer table.Unlock()
ddlStmts, postStmtHook, err := schema.GenDDLStmt(table, r, p, sc)
if err != nil {
logger.Error("Failed! Mutation statement generation failed", zap.Error(err))
testStatus.WriteErrors++
return
}
if ddlStmts == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "ddl"))
}
return
}
defer postStmtHook()
defer func() {
if verbose {
jsonSchema, _ := json.MarshalIndent(schema, "", " ")
fmt.Printf("Schema: %v\n", string(jsonSchema))
}
}()
for _, ddlStmt := range ddlStmts {
ddlQuery := ddlStmt.Query
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err := s.Mutate(ctx, ddlQuery); err != nil {
e := JobError{
Timestamp: time.Now(),
Message: "DDL failed: " + err.Error(),
Query: ddlStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.WriteErrors++
} else {
testStatus.WriteOps++
}
}
}

func mutation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, deletes bool, logger *zap.Logger) {
mutateStmt, err := schema.GenMutateStmt(table, source, r, p, deletes)
if err != nil {
logger.Error("Failed! Mutation statement generation failed", zap.Error(err))
testStatus.WriteErrors++
return
}
if mutateStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "mutation"))
}
return
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err := s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
e := JobError{
Timestamp: time.Now(),
Message: "Mutation failed: " + err.Error(),
Query: mutateStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.WriteErrors++
} else {
testStatus.WriteOps++
}
}

func validation(ctx context.Context, schema *gemini.Schema, _ *gemini.SchemaConfig, table *gemini.Table, s store.Store, r *rand.Rand, p gemini.PartitionRangeConfig, source *gemini.Source, testStatus *Status, logger *zap.Logger) {
checkStmt := schema.GenCheckStmt(table, source, r, p)
if checkStmt == nil {
if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "validation"))
}
return
}
checkQuery := checkStmt.Query
checkValues := checkStmt.Values()
if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil {
w.Write(zap.String("pretty_cql", checkStmt.PrettyCQL()))
}
if err := s.Check(ctx, table, checkQuery, checkValues...); err != nil {
// De-duplication needed?
e := JobError{
Timestamp: time.Now(),
Message: "Validation failed: " + err.Error(),
Query: checkStmt.PrettyCQL(),
}
testStatus.Errors = append(testStatus.Errors, e)
testStatus.ReadErrors++
} else {
testStatus.ReadOps++
}
}
17 changes: 11 additions & 6 deletions cmd/gemini/pump.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package main

import (
"context"
"os"
"os/signal"
"sync"
"syscall"
"time"

Expand All @@ -19,6 +17,16 @@ type Pump struct {
logger *zap.Logger
}

type heartBeat struct {
sleep time.Duration
}

func (hb heartBeat) await() {
if hb.sleep > 0 {
time.Sleep(hb.sleep)
}
}

func (p *Pump) Start(d time.Duration, postFunc func()) {
go func() {
defer p.cleanup(postFunc)
Expand Down Expand Up @@ -67,13 +75,10 @@ func createPump(sz int, logger *zap.Logger) *Pump {
return pump
}

func createPumpCallback(cancel context.CancelFunc, c chan Status, wg *sync.WaitGroup, sp *spinner.Spinner) func() {
func createPumpCallback(result chan Status, sp *spinner.Spinner) func() {
return func() {
if sp != nil {
sp.Stop()
}
cancel()
wg.Wait()
close(c)
}
}
Loading

0 comments on commit 3b3cff0

Please sign in to comment.