Skip to content

Commit

Permalink
improvement(jobs): job refactoring and randomization
Browse files Browse the repository at this point in the history
1. Refactoring the Statement Generators into generators package
2. Each table gets it's own random number for validation and generation
3. Warmup consolidated into Job, runs with its own validator and
   statement generator -> propably can be removed, as Warmup is just a
   mutation without DDL and deletes

Future Plans
1. Remove stopFlag as context.Context can do the same, hard kill is
   to CANCEL the Global Parent, and soft kill is cancelation of the
   current context
2. Generators Refactoring

Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Oct 25, 2024
1 parent 68d7220 commit 905daff
Show file tree
Hide file tree
Showing 12 changed files with 216 additions and 155 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ cmd/gemini/dist/
bin/
coverage.txt
dist/
results/*.log
1 change: 0 additions & 1 deletion .run/Run Gemini.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
<filePath value="$PROJECT_DIR$/cmd/gemini/main.go" />
<output_directory value="bin/" />
<method v="2" />
</configuration>
</component>
29 changes: 12 additions & 17 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ func run(_ *cobra.Command, _ []string) error {
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
pump := jobs.NewPump(stopFlag, logger)

gens, err := createGenerators(schema, schemaConfig, intSeed, partitionCount, logger)
if err != nil {
Expand All @@ -278,8 +277,10 @@ func run(_ *cobra.Command, _ []string) error {
if !nonInteractive {
sp := createSpinner(interactive())
ticker := time.NewTicker(time.Second)

go func() {
defer done()
defer ticker.Stop()
for {
select {
case <-stopFlag.SignalChannel():
Expand All @@ -291,17 +292,9 @@ func run(_ *cobra.Command, _ []string) error {
}()
}

if warmup > 0 && !stopFlag.IsHardOrSoft() {
jobsList := jobs.New(jobs.WarmupMode, warmup, concurrency)
if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil {
logger.Error("warmup encountered an error", zap.Error(err))
stopFlag.SetHard(true)
}
}

if !stopFlag.IsHardOrSoft() {
jobsList := jobs.New(mode, duration, concurrency)
if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil {
jobsList := jobs.New(mode, duration, concurrency, stopFlag, logger, schema, st, globalStatus, &schemaConfig, intSeed, gens, failFast, warmup)
if err = jobsList.Run(ctx); err != nil {
logger.Debug("error detected", zap.Error(err))
}
}
Expand All @@ -313,14 +306,16 @@ func run(_ *cobra.Command, _ []string) error {
return nil
}

func createFile(fname string, def *os.File) (*os.File, error) {
if fname != "" {
f, err := os.Create(fname)
func createFile(name string, def *os.File) (*os.File, error) {
if name != "" {
f, err := os.Create(name)
if err != nil {
return nil, errors.Wrapf(err, "Unable to open output file %s", fname)
return nil, errors.Wrapf(err, "Unable to open output file %s", name)
}

return f, nil
}

return def, nil
}

Expand Down Expand Up @@ -473,7 +468,7 @@ func init() {
rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run")
rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run")
rootCmd.Flags().BoolVarP(&failFast, "fail-fast", "f", false, "Stop on the first failure")
rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Statement in non-interactive mode (disable progress indicator)")
rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Run in non-interactive mode (disable progress indicator)")
rootCmd.Flags().DurationVarP(&duration, "duration", "", 30*time.Second, "")
rootCmd.Flags().StringVarP(&outFileArg, "outfile", "", "", "Specify the name of the file where the results should go")
rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'")
Expand All @@ -497,7 +492,7 @@ func init() {
rootCmd.Flags().IntVarP(&minColumns, "min-columns", "", 8, "Minimum number of generated columns")
rootCmd.Flags().StringVarP(&datasetSize, "dataset-size", "", "large", "Specify the type of dataset size to use, small|large")
rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "basic", "Specify the type of cql features to use, basic|normal|all")
rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Statement gemini with materialized views support")
rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Run gemini with materialized views support")
rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal")
rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation")
rootCmd.Flags().DurationVarP(
Expand Down
17 changes: 9 additions & 8 deletions pkg/jobs/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import (

"github.com/scylladb/gemini/pkg/generators/statements"
"github.com/scylladb/gemini/pkg/joberror"
"github.com/scylladb/gemini/pkg/typedef"
)

func (m *mutation) DDL(ctx context.Context) error {
m.table.RLock()
func (m *mutation) DDL(ctx context.Context, table *typedef.Table) error {
table.RLock()
// Scylla does not allow changing the DDL of a table with materialized views.
if len(m.table.MaterializedViews) > 0 {
m.table.RUnlock()
if len(table.MaterializedViews) > 0 {
table.RUnlock()
return nil
}
m.table.RUnlock()
table.RUnlock()

m.table.Lock()
defer m.table.Unlock()
ddlStmts, err := statements.GenDDLStmt(m.schema, m.table, m.random, m.partitionRangeConfig, m.schemaCfg)
table.Lock()
defer table.Unlock()
ddlStmts, err := statements.GenDDLStmt(m.schema, table, m.random, m.partitionRangeConfig, m.schemaCfg)
if err != nil {
m.logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err))
m.globalStatus.WriteErrors.Add(1)
Expand Down
194 changes: 112 additions & 82 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,26 @@ import (
"github.com/scylladb/gemini/pkg/typedef"
)

type Mode []string

const (
WriteMode = "write"
ReadMode = "read"
MixedMode = "mixed"
WarmupMode = "warmup"
)

func ModeFromString(m string) Mode {
switch m {
case WriteMode:
return Mode{WriteMode}
case ReadMode:
return Mode{ReadMode}
case MixedMode:
return Mode{WriteMode, ReadMode}
case WarmupMode:
return Mode{WarmupMode}
default:
return Mode{}
}
}

type List struct {
name string
duration time.Duration
logger *zap.Logger
random *rand.Rand
stopFlag *stop.Flag
workers uint64
jobs []Job
generators []*generators.Generator
schema *typedef.Schema
verbose bool
failFast bool
type Runner struct {
duration time.Duration
logger *zap.Logger
random *rand.Rand
stopFlag *stop.Flag
workers uint64
generators []*generators.Generator
schema *typedef.Schema
failFast bool
schemaCfg *typedef.SchemaConfig
warmup time.Duration
globalStatus *status.GlobalStatus
pump <-chan time.Duration
store store.Store
mode Mode
}

type Job interface {
Name() string
Do(context.Context, generators.Interface) error
Do(context.Context, generators.Interface, *typedef.Table) error
}

func New(
Expand All @@ -79,70 +58,121 @@ func New(
stopFlag *stop.Flag,
logger *zap.Logger,
schema *typedef.Schema,
table *typedef.Table,
store store.Store,
globalStatus *status.GlobalStatus,
schemaCfg *typedef.SchemaConfig,
seed uint64,
gens []*generators.Generator,
pump <-chan time.Duration,
failFast bool,
verbose bool,
) List {
partitionRangeConfig := schemaCfg.GetPartitionRangeConfig()
rnd := rand.New(rand.NewSource(seed))

jobs := make([]Job, 0, 2)
name := "work cycle"
for _, m := range ModeFromString(mode) {
switch m {
case WriteMode:
jobs = append(jobs, NewMutation(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, schemaCfg, pump, failFast, verbose))
case ReadMode:
jobs = append(jobs, NewValidation(logger, pump, schema, schemaCfg, table, store, rnd, &partitionRangeConfig, globalStatus, stopFlag, failFast))
case WarmupMode:
jobs = append(jobs, NewWarmup(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, failFast, verbose))
name = "warmup cycle"
}
}

return List{
name: name,
jobs: jobs,
duration: duration,
workers: workers,
stopFlag: stopFlag,
failFast: failFast,
verbose: verbose,
random: rnd,
generators: gens,
schema: schema,
warmup time.Duration,
) *Runner {
return &Runner{
warmup: warmup,
globalStatus: globalStatus,
pump: NewPump(stopFlag, logger.Named("Pump")),
store: store,
mode: ModeFromString(mode),
logger: logger,
schemaCfg: schemaCfg,
duration: duration,
workers: workers,
stopFlag: stopFlag,
failFast: failFast,
random: rand.New(rand.NewSource(seed)),
generators: gens,
schema: schema,
}
}

func (l List) Name() string {
return l.name
func (l *Runner) Name() string {
return "Runner"
}

func (l List) Do(ctx context.Context) error {
func (l *Runner) Run(ctx context.Context) error {
ctx = l.stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop)
g, gCtx := errgroup.WithContext(ctx)
partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()

l.logger.Info("start jobs")

if l.warmup > 0 {
l.logger.Info("Warmup Job Started",
zap.Int("duration", int(l.warmup.Seconds())),
zap.Int("workers", int(l.workers)),
)
time.AfterFunc(l.warmup, func() {
l.logger.Info("jobs time is up, begins jobs completion")
l.stopFlag.SetSoft(true)
})

warmup := func(_ <-chan time.Duration, rnd *rand.Rand) Job {
return NewWarmup(l.logger, l.schema, l.store, &partitionRangeConfig, l.globalStatus, l.schemaCfg, l.stopFlag, rnd, l.failFast)
}

if err := l.start(ctx, warmup); err != nil {
return err
}
}

time.AfterFunc(l.duration, func() {
l.logger.Info("jobs time is up, begins jobs completion")
l.stopFlag.SetSoft(true)
})

l.logger.Info("start jobs")
if l.mode.IsWrite() {
return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
return NewMutation(
l.logger.Named("Mutation"),
l.schema,
l.store,
&partitionRangeConfig,
l.globalStatus,
l.stopFlag,
rnd,
l.schemaCfg,
pump,
l.failFast,
)
})
}

return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
return NewValidation(
l.logger,
pump,
l.schema, l.schemaCfg,
l.store,
rnd,
&partitionRangeConfig,
l.globalStatus,
l.stopFlag,
l.failFast,
)
})
}

for j := range l.schema.Tables {
func (l *Runner) start(ctx context.Context, job func(<-chan time.Duration, *rand.Rand) Job) error {
g, gCtx := errgroup.WithContext(ctx)

g.SetLimit(int(l.workers))

partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()

for j, table := range l.schema.Tables {
gen := l.generators[j]
for i := 0; i < int(l.workers); i++ {
for idx := range l.jobs {
jobF := l.jobs[idx]
g.Go(func() error {
return jobF.Do(gCtx, gen)
})
}
pump := NewPump(l.stopFlag, l.logger.Named("Pump-"+table.Name))
rnd := rand.New(rand.NewSource(l.random.Uint64()))

v := NewValidation(l.logger, pump, l.schema, l.schemaCfg, l.store, rnd, &partitionRangeConfig, l.globalStatus, l.stopFlag, l.failFast)
j := job(pump, rnd)

g.TryGo(func() error {
return v.Do(gCtx, gen, table)
})

for i := 0; i < int(l.workers)-1; i++ {
g.TryGo(func() error {
return j.Do(gCtx, gen, table)
})
}
}

Expand Down
26 changes: 26 additions & 0 deletions pkg/jobs/mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package jobs

type Mode []string

const (
WriteMode = "write"
ReadMode = "read"
MixedMode = "mixed"
)

func (m Mode) IsWrite() bool {
return m[0] == WriteMode
}

func ModeFromString(m string) Mode {
switch m {
case WriteMode:
return Mode{WriteMode}
case ReadMode:
return Mode{ReadMode}
case MixedMode:
return Mode{WriteMode, ReadMode}
default:
return Mode{}
}
}
Loading

0 comments on commit 905daff

Please sign in to comment.