diff --git a/cmd/gemini/generators.go b/cmd/gemini/generators.go deleted file mode 100644 index 9b4f51d..0000000 --- a/cmd/gemini/generators.go +++ /dev/null @@ -1,57 +0,0 @@ -// Copyright 2019 ScyllaDB -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -package main - -import ( - "github.com/scylladb/gemini/pkg/generators" - "github.com/scylladb/gemini/pkg/typedef" - - "go.uber.org/zap" -) - -func createGenerators( - schema *typedef.Schema, - schemaConfig typedef.SchemaConfig, - seed, distributionSize uint64, - logger *zap.Logger, -) (generators.Generators, error) { - partitionRangeConfig := schemaConfig.GetPartitionRangeConfig() - - var gs []*generators.Generator - for id := range schema.Tables { - table := schema.Tables[id] - pkVariations := table.PartitionKeys.ValueVariationsNumber(&partitionRangeConfig) - - distFunc, err := createDistributionFunc(partitionKeyDistribution, distributionSize, seed, stdDistMean, oneStdDev) - if err != nil { - return nil, err - } - - tablePartConfig := &generators.Config{ - PartitionsRangeConfig: partitionRangeConfig, - PartitionsCount: distributionSize, - PartitionsDistributionFunc: distFunc, - Seed: seed, - PkUsedBufferSize: pkBufferReuseSize, - } - g := generators.NewGenerator(table, tablePartConfig, logger.Named("generators")) - if pkVariations < 2^32 { - // Low partition key variation can lead to having staled partitions - // Let's detect and mark them before running test - g.FindAndMarkStalePartitions() - } - gs = append(gs, g) - } - return gs, nil -} diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index a90d7d5..278a102 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -269,11 +269,13 @@ func run(_ *cobra.Command, _ []string) error { stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag) pump := jobs.NewPump(stopFlag, logger) - gens, err := createGenerators(schema, schemaConfig, intSeed, partitionCount, logger) + distFunc, err := createDistributionFunc(partitionKeyDistribution, partitionCount, intSeed, normalDistMean, normalDistSigma) if err != nil { - return err + return errors.Wrapf(err, "Faile to create distribution function: %s", partitionKeyDistribution) } - gens.StartAll(stopFlag) + + gens := generators.New(ctx, schema, distFunc, schemaConfig.GetPartitionRangeConfig(), intSeed, partitionCount, pkBufferReuseSize, logger) + defer utils.IgnoreError(gens.Close) if !nonInteractive { sp := createSpinner(interactive()) diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 500fc75..46c4b73 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -15,12 +15,13 @@ package generators import ( + "context" + "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/exp/rand" "github.com/scylladb/gemini/pkg/routingkey" - "github.com/scylladb/gemini/pkg/stop" "github.com/scylladb/gemini/pkg/typedef" ) @@ -37,7 +38,7 @@ type TokenIndex uint64 type DistributionFunc func() TokenIndex -type GeneratorInterface interface { +type Interface interface { Get() *typedef.ValueWithToken GetOld() *typedef.ValueWithToken GiveOld(_ *typedef.ValueWithToken) @@ -64,14 +65,6 @@ func (g *Generator) PartitionCount() uint64 { return g.partitionCount } -type Generators []*Generator - -func (g Generators) StartAll(stopFlag *stop.Flag) { - for _, gen := range g { - gen.Start(stopFlag) - } -} - type Config struct { PartitionsDistributionFunc DistributionFunc PartitionsRangeConfig typedef.PartitionRangeConfig @@ -80,9 +73,9 @@ type Config struct { PkUsedBufferSize uint64 } -func NewGenerator(table *typedef.Table, config *Config, logger *zap.Logger) *Generator { +func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Generator { wakeUpSignal := make(chan struct{}) - return &Generator{ + return Generator{ partitions: NewPartitions(int(config.PartitionsCount), int(config.PkUsedBufferSize), wakeUpSignal), partitionCount: config.PartitionsCount, table: table, @@ -135,39 +128,33 @@ func (g *Generator) ReleaseToken(token uint64) { g.GetPartitionForToken(TokenIndex(token)).releaseToken(token) } -func (g *Generator) Start(stopFlag *stop.Flag) { - go func() { - g.logger.Info("starting partition key generation loop") - defer g.partitions.CloseAll() - for { - g.fillAllPartitions(stopFlag) - select { - case <-stopFlag.SignalChannel(): - g.logger.Debug("stopping partition key generation loop", - zap.Uint64("keys_created", g.cntCreated), - zap.Uint64("keys_emitted", g.cntEmitted)) - return - case <-g.wakeUpSignal: - } +func (g *Generator) Start(ctx context.Context) { + defer g.partitions.Close() + g.logger.Info("starting partition key generation loop") + for { + g.fillAllPartitions(ctx) + select { + case <-ctx.Done(): + g.logger.Debug("stopping partition key generation loop", + zap.Uint64("keys_created", g.cntCreated), + zap.Uint64("keys_emitted", g.cntEmitted)) + return + case <-g.wakeUpSignal: } - }() + } } func (g *Generator) FindAndMarkStalePartitions() { r := rand.New(rand.NewSource(10)) - nonStale := make([]bool, g.partitionCount) - for n := uint64(0); n < g.partitionCount*100; n++ { - values := CreatePartitionKeyValues(g.table, r, &g.partitionsConfig) - token, err := g.routingKeyCreator.GetHash(g.table, values) + + for range g.partitionCount * 100 { + token, _, err := g.createPartitionKeyValues(r) if err != nil { - g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error()) + g.logger.Panic("failed to get primary key hash", zap.Error(err)) } - nonStale[g.shardOf(token)] = true - } - for idx, val := range nonStale { - if !val { - g.partitions[idx].MarkStale() + if err = g.partition(token).MarkStale(); err != nil { + g.logger.Panic("failed to mark partition as stale", zap.Error(err)) } } } @@ -175,7 +162,7 @@ func (g *Generator) FindAndMarkStalePartitions() { // fillAllPartitions guarantees that each partition was tested to be full // at least once since the function started and before it ended. // In other words no partition will be starved. -func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { +func (g *Generator) fillAllPartitions(ctx context.Context) { pFilled := make([]bool, len(g.partitions)) allFilled := func() bool { for idx, filled := range pFilled { @@ -188,22 +175,30 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { } return true } - for !stopFlag.IsHardOrSoft() { - values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig) - token, err := g.routingKeyCreator.GetHash(g.table, values) + + for { + select { + case <-ctx.Done(): + return + default: + } + + token, values, err := g.createPartitionKeyValues() if err != nil { - g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error()) + g.logger.Panic("failed to get primary key hash", zap.Error(err)) } g.cntCreated++ - idx := token % g.partitionCount - partition := g.partitions[idx] + + partition := g.partition(token) if partition.Stale() || partition.inFlight.Has(token) { continue } + select { case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}: g.cntEmitted++ default: + idx := g.shardOf(token) if !pFilled[idx] { pFilled[idx] = true if allFilled() { @@ -217,3 +212,28 @@ func (g *Generator) fillAllPartitions(stopFlag *stop.Flag) { func (g *Generator) shardOf(token uint64) int { return int(token % g.partitionCount) } + +func (g *Generator) partition(token uint64) *Partition { + return g.partitions[g.shardOf(token)] +} + +func (g *Generator) createPartitionKeyValues(r ...*rand.Rand) (uint64, []any, error) { + rnd := g.r + + if len(r) > 0 && r[0] != nil { + rnd = r[0] + } + + values := make([]any, 0, g.table.PartitionKeysLenValues()) + + for _, pk := range g.table.PartitionKeys { + values = append(values, pk.Type.GenValue(rnd, &g.partitionsConfig)...) + } + + token, err := g.routingKeyCreator.GetHash(g.table, values) + if err != nil { + return 0, nil, errors.Wrap(err, "failed to get primary key hash") + } + + return token, values, nil +} diff --git a/pkg/generators/generator_test.go b/pkg/generators/generator_test.go index 3a46551..c66c6b0 100644 --- a/pkg/generators/generator_test.go +++ b/pkg/generators/generator_test.go @@ -15,13 +15,13 @@ package generators_test import ( + "context" "sync/atomic" "testing" "go.uber.org/zap" "github.com/scylladb/gemini/pkg/generators" - "github.com/scylladb/gemini/pkg/stop" "github.com/scylladb/gemini/pkg/typedef" ) @@ -32,7 +32,7 @@ func TestGenerator(t *testing.T) { PartitionKeys: generators.CreatePkColumns(1, "pk"), } var current uint64 - cfg := &generators.Config{ + cfg := generators.Config{ PartitionsRangeConfig: typedef.PartitionRangeConfig{ MaxStringLength: 10, MinStringLength: 0, @@ -47,7 +47,7 @@ func TestGenerator(t *testing.T) { } logger, _ := zap.NewDevelopment() generator := generators.NewGenerator(table, cfg, logger) - generator.Start(stop.NewFlag("main_test")) + generator.Start(context.Background()) for i := uint64(0); i < cfg.PartitionsCount; i++ { atomic.StoreUint64(¤t, i) v := generator.Get() diff --git a/pkg/generators/generators.go b/pkg/generators/generators.go new file mode 100644 index 0000000..95a06d8 --- /dev/null +++ b/pkg/generators/generators.go @@ -0,0 +1,84 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package generators + +import ( + "context" + "math" + "sync" + + "go.uber.org/zap" + + "github.com/scylladb/gemini/pkg/typedef" +) + +type Generators struct { + wg *sync.WaitGroup + cancel context.CancelFunc + Generators []Generator +} + +func New( + ctx context.Context, + schema *typedef.Schema, + distFunc DistributionFunc, + partitionRangeConfig typedef.PartitionRangeConfig, + seed, distributionSize, pkBufferReuseSize uint64, + logger *zap.Logger, +) *Generators { + gs := make([]Generator, 0, len(schema.Tables)) + + cfg := Config{ + PartitionsRangeConfig: partitionRangeConfig, + PartitionsCount: distributionSize, + PartitionsDistributionFunc: distFunc, + Seed: seed, + PkUsedBufferSize: pkBufferReuseSize, + } + + wg := new(sync.WaitGroup) + wg.Add(len(schema.Tables)) + + ctx, cancel := context.WithCancel(ctx) + + for _, table := range schema.Tables { + g := NewGenerator(table, cfg, logger.Named("generators-"+table.Name)) + go func() { + defer wg.Done() + g.Start(ctx) + }() + + if table.PartitionKeys.ValueVariationsNumber(&partitionRangeConfig) < math.MaxUint32 { + // Low partition key variation can lead to having staled partitions + // Let's detect and mark them before running test + g.FindAndMarkStalePartitions() + } + + gs = append(gs, g) + } + + return &Generators{ + Generators: gs, + wg: wg, + cancel: cancel, + } +} + +func (g *Generators) Close() error { + g.cancel() + g.wg.Wait() + + return nil +} diff --git a/pkg/generators/partition.go b/pkg/generators/partition.go index e70d46c..eb418a7 100644 --- a/pkg/generators/partition.go +++ b/pkg/generators/partition.go @@ -15,7 +15,9 @@ package generators import ( - "sync" + "sync/atomic" + + "go.uber.org/multierr" "github.com/scylladb/gemini/pkg/inflight" "github.com/scylladb/gemini/pkg/typedef" @@ -26,18 +28,17 @@ type Partition struct { oldValues chan *typedef.ValueWithToken inFlight inflight.InFlight wakeUpSignal chan<- struct{} // wakes up generator - closed bool - lock sync.RWMutex - isStale bool + closed atomic.Bool + isStale atomic.Bool } -func (s *Partition) MarkStale() { - s.isStale = true - s.Close() +func (s *Partition) MarkStale() error { + s.isStale.Store(true) + return s.Close() } func (s *Partition) Stale() bool { - return s.isStale + return s.isStale.Load() } // get returns a new value and ensures that it's corresponding token @@ -103,39 +104,33 @@ func (s *Partition) pick() *typedef.ValueWithToken { } func (s *Partition) safelyGetOldValuesChannel() chan *typedef.ValueWithToken { - s.lock.RLock() - if s.closed { + if s.closed.Load() { // Since only giveOld could have been potentially called after partition is closed // we need to protect it against writing to closed channel return nil } - defer s.lock.RUnlock() + return s.oldValues } -func (s *Partition) Close() { - s.lock.RLock() - if s.closed { - s.lock.RUnlock() - return +func (s *Partition) Close() error { + if s.closed.CompareAndSwap(false, true) { + close(s.values) + close(s.oldValues) } - s.lock.RUnlock() - s.lock.Lock() - if s.closed { - return - } - s.closed = true - close(s.values) - close(s.oldValues) - s.lock.Unlock() + + return nil } type Partitions []*Partition -func (p Partitions) CloseAll() { +func (p Partitions) Close() error { + var err error for _, part := range p { - part.Close() + err = multierr.Append(err, part.Close()) } + + return err } func NewPartitions(count, pkBufferSize int, wakeUpSignal chan struct{}) Partitions { diff --git a/pkg/generators/utils.go b/pkg/generators/utils.go index e5758dc..cd08ee5 100644 --- a/pkg/generators/utils.go +++ b/pkg/generators/utils.go @@ -14,27 +14,17 @@ package generators -import ( - "golang.org/x/exp/rand" - - "github.com/scylladb/gemini/pkg/typedef" -) - -func CreatePartitionKeyValues(table *typedef.Table, r *rand.Rand, g *typedef.PartitionRangeConfig) []any { - values := make([]any, 0, table.PartitionKeysLenValues()) - for _, pk := range table.PartitionKeys { - values = append(values, pk.Type.GenValue(r, g)...) - } - return values -} +import "github.com/scylladb/gemini/pkg/typedef" func CreatePkColumns(cnt int, prefix string) typedef.Columns { - var cols typedef.Columns + cols := make(typedef.Columns, 0, cnt) + for i := 0; i < cnt; i++ { cols = append(cols, &typedef.ColumnDef{ Name: GenColumnName(prefix, i), Type: typedef.TYPE_INT, }) } + return cols } diff --git a/pkg/jobs/gen_check_stmt.go b/pkg/jobs/gen_check_stmt.go index 3822b40..2cae7d7 100644 --- a/pkg/jobs/gen_check_stmt.go +++ b/pkg/jobs/gen_check_stmt.go @@ -28,7 +28,7 @@ import ( func GenCheckStmt( s *typedef.Schema, table *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, rnd *rand.Rand, p *typedef.PartitionRangeConfig, ) *typedef.Stmt { @@ -114,7 +114,7 @@ func GenCheckStmt( func genSinglePartitionQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, ) *typedef.Stmt { t.RLock() defer t.RUnlock() @@ -144,7 +144,7 @@ func genSinglePartitionQuery( func genSinglePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum int, @@ -183,7 +183,7 @@ func genSinglePartitionQueryMv( func genMultiplePartitionQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, numQueryPKs int, ) *typedef.Stmt { t.RLock() @@ -223,7 +223,7 @@ func genMultiplePartitionQuery( func genMultiplePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs int, @@ -274,7 +274,7 @@ func genMultiplePartitionQueryMv( func genClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, maxClusteringRels int, @@ -321,7 +321,7 @@ func genClusteringRangeQuery( func genClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, maxClusteringRels int, @@ -374,7 +374,7 @@ func genClusteringRangeQueryMv( func genMultiplePartitionClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, numQueryPKs, maxClusteringRels int, @@ -435,7 +435,7 @@ func genMultiplePartitionClusteringRangeQuery( func genMultiplePartitionClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs, maxClusteringRels int, @@ -516,7 +516,7 @@ func genMultiplePartitionClusteringRangeQueryMv( func genSingleIndexQuery( s *typedef.Schema, t *typedef.Table, - _ generators.GeneratorInterface, + _ generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, idxCount int, diff --git a/pkg/jobs/gen_mutate_stmt.go b/pkg/jobs/gen_mutate_stmt.go index 6f65caa..e3baad6 100644 --- a/pkg/jobs/gen_mutate_stmt.go +++ b/pkg/jobs/gen_mutate_stmt.go @@ -26,7 +26,7 @@ import ( "github.com/scylladb/gemini/pkg/typedef" ) -func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.GeneratorInterface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { +func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { t.RLock() defer t.RUnlock() diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 3730b7d..5274c28 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -107,7 +107,7 @@ func (l List) Run( schemaConfig typedef.SchemaConfig, s store.Store, pump <-chan time.Duration, - generators []*generators.Generator, + generators *generators.Generators, globalStatus *status.GlobalStatus, logger *zap.Logger, seed uint64, @@ -124,19 +124,19 @@ func (l List) Run( partitionRangeConfig := schemaConfig.GetPartitionRangeConfig() logger.Info("start jobs") - for j := range schema.Tables { - gen := generators[j] - table := schema.Tables[j] + for j, table := range schema.Tables { + generator := &generators.Generators[j] for i := 0; i < int(l.workers); i++ { for idx := range l.jobs { jobF := l.jobs[idx].function r := rand.New(rand.NewSource(seed)) g.Go(func() error { - return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, gen, globalStatus, logger, stopFlag, failFast, verbose) + return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, generator, globalStatus, logger, stopFlag, failFast, verbose) }) } } } + return g.Wait() }