diff --git a/.gitignore b/.gitignore
index 97f5c95..af30c10 100644
--- a/.gitignore
+++ b/.gitignore
@@ -5,3 +5,4 @@ cmd/gemini/dist/
bin/
coverage.txt
dist/
+results/*.log
diff --git a/.run/Run Gemini.run.xml b/.run/Run Gemini.run.xml
index 3e4f4ec..ea9ee6a 100644
--- a/.run/Run Gemini.run.xml
+++ b/.run/Run Gemini.run.xml
@@ -8,7 +8,6 @@
-
\ No newline at end of file
diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go
index fbfbceb..5fd82f5 100644
--- a/cmd/gemini/root.go
+++ b/cmd/gemini/root.go
@@ -267,7 +267,6 @@ func run(_ *cobra.Command, _ []string) error {
stopFlag := stop.NewFlag("main")
warmupStopFlag := stop.NewFlag("warmup")
stop.StartOsSignalsTransmitter(logger, stopFlag, warmupStopFlag)
- pump := jobs.NewPump(stopFlag, logger)
gens, err := createGenerators(schema, schemaConfig, intSeed, partitionCount, logger)
if err != nil {
@@ -278,8 +277,10 @@ func run(_ *cobra.Command, _ []string) error {
if !nonInteractive {
sp := createSpinner(interactive())
ticker := time.NewTicker(time.Second)
+
go func() {
defer done()
+ defer ticker.Stop()
for {
select {
case <-stopFlag.SignalChannel():
@@ -291,17 +292,9 @@ func run(_ *cobra.Command, _ []string) error {
}()
}
- if warmup > 0 && !stopFlag.IsHardOrSoft() {
- jobsList := jobs.New(jobs.WarmupMode, warmup, concurrency)
- if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil {
- logger.Error("warmup encountered an error", zap.Error(err))
- stopFlag.SetHard(true)
- }
- }
-
if !stopFlag.IsHardOrSoft() {
- jobsList := jobs.New(mode, duration, concurrency)
- if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil {
+ jobsList := jobs.New(mode, duration, concurrency, stopFlag, logger, schema, st, globalStatus, &schemaConfig, intSeed, gens, failFast, warmup)
+ if err = jobsList.Run(ctx); err != nil {
logger.Debug("error detected", zap.Error(err))
}
}
@@ -313,14 +306,16 @@ func run(_ *cobra.Command, _ []string) error {
return nil
}
-func createFile(fname string, def *os.File) (*os.File, error) {
- if fname != "" {
- f, err := os.Create(fname)
+func createFile(name string, def *os.File) (*os.File, error) {
+ if name != "" {
+ f, err := os.Create(name)
if err != nil {
- return nil, errors.Wrapf(err, "Unable to open output file %s", fname)
+ return nil, errors.Wrapf(err, "Unable to open output file %s", name)
}
+
return f, nil
}
+
return def, nil
}
diff --git a/pkg/jobs/ddl.go b/pkg/jobs/ddl.go
index 9cd98d6..8f2de1b 100644
--- a/pkg/jobs/ddl.go
+++ b/pkg/jobs/ddl.go
@@ -11,20 +11,21 @@ import (
"github.com/scylladb/gemini/pkg/generators/statements"
"github.com/scylladb/gemini/pkg/joberror"
+ "github.com/scylladb/gemini/pkg/typedef"
)
-func (m *mutation) DDL(ctx context.Context) error {
- m.table.RLock()
+func (m *mutation) DDL(ctx context.Context, table *typedef.Table) error {
+ table.RLock()
// Scylla does not allow changing the DDL of a table with materialized views.
- if len(m.table.MaterializedViews) > 0 {
- m.table.RUnlock()
+ if len(table.MaterializedViews) > 0 {
+ table.RUnlock()
return nil
}
- m.table.RUnlock()
+ table.RUnlock()
- m.table.Lock()
- defer m.table.Unlock()
- ddlStmts, err := statements.GenDDLStmt(m.schema, m.table, m.random, m.partitionRangeConfig, m.schemaCfg)
+ table.Lock()
+ defer table.Unlock()
+ ddlStmts, err := statements.GenDDLStmt(m.schema, table, m.random, m.partitionRangeConfig, m.schemaCfg)
if err != nil {
m.logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err))
m.globalStatus.WriteErrors.Add(1)
diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go
index 2389464..bfe2ca1 100644
--- a/pkg/jobs/jobs.go
+++ b/pkg/jobs/jobs.go
@@ -29,47 +29,26 @@ import (
"github.com/scylladb/gemini/pkg/typedef"
)
-type Mode []string
-
-const (
- WriteMode = "write"
- ReadMode = "read"
- MixedMode = "mixed"
- WarmupMode = "warmup"
-)
-
-func ModeFromString(m string) Mode {
- switch m {
- case WriteMode:
- return Mode{WriteMode}
- case ReadMode:
- return Mode{ReadMode}
- case MixedMode:
- return Mode{WriteMode, ReadMode}
- case WarmupMode:
- return Mode{WarmupMode}
- default:
- return Mode{}
- }
-}
-
-type List struct {
- name string
- duration time.Duration
- logger *zap.Logger
- random *rand.Rand
- stopFlag *stop.Flag
- workers uint64
- jobs []Job
- generators []*generators.Generator
- schema *typedef.Schema
- verbose bool
- failFast bool
+type Runner struct {
+ duration time.Duration
+ logger *zap.Logger
+ random *rand.Rand
+ stopFlag *stop.Flag
+ workers uint64
+ generators []*generators.Generator
+ schema *typedef.Schema
+ failFast bool
+ schemaCfg *typedef.SchemaConfig
+ warmup time.Duration
+ globalStatus *status.GlobalStatus
+ pump <-chan time.Duration
+ store store.Store
+ mode Mode
}
type Job interface {
Name() string
- Do(context.Context, generators.Interface) error
+ Do(context.Context, generators.Interface, *typedef.Table) error
}
func New(
@@ -79,70 +58,121 @@ func New(
stopFlag *stop.Flag,
logger *zap.Logger,
schema *typedef.Schema,
- table *typedef.Table,
store store.Store,
globalStatus *status.GlobalStatus,
schemaCfg *typedef.SchemaConfig,
seed uint64,
gens []*generators.Generator,
- pump <-chan time.Duration,
failFast bool,
- verbose bool,
-) List {
- partitionRangeConfig := schemaCfg.GetPartitionRangeConfig()
- rnd := rand.New(rand.NewSource(seed))
-
- jobs := make([]Job, 0, 2)
- name := "work cycle"
- for _, m := range ModeFromString(mode) {
- switch m {
- case WriteMode:
- jobs = append(jobs, NewMutation(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, schemaCfg, pump, failFast, verbose))
- case ReadMode:
- jobs = append(jobs, NewValidation(logger, pump, schema, schemaCfg, table, store, rnd, &partitionRangeConfig, globalStatus, stopFlag, failFast))
- case WarmupMode:
- jobs = append(jobs, NewWarmup(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, failFast, verbose))
- name = "warmup cycle"
- }
- }
-
- return List{
- name: name,
- jobs: jobs,
- duration: duration,
- workers: workers,
- stopFlag: stopFlag,
- failFast: failFast,
- verbose: verbose,
- random: rnd,
- generators: gens,
- schema: schema,
+ warmup time.Duration,
+) *Runner {
+ return &Runner{
+ warmup: warmup,
+ globalStatus: globalStatus,
+ pump: NewPump(stopFlag, logger.Named("Pump")),
+ store: store,
+ mode: ModeFromString(mode),
+ logger: logger,
+ schemaCfg: schemaCfg,
+ duration: duration,
+ workers: workers,
+ stopFlag: stopFlag,
+ failFast: failFast,
+ random: rand.New(rand.NewSource(seed)),
+ generators: gens,
+ schema: schema,
}
}
-func (l List) Name() string {
- return l.name
+func (l *Runner) Name() string {
+ return "Runner"
}
-func (l List) Do(ctx context.Context) error {
+func (l *Runner) Run(ctx context.Context) error {
ctx = l.stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop)
- g, gCtx := errgroup.WithContext(ctx)
+ partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()
+
+ l.logger.Info("start jobs")
+
+ if l.warmup > 0 {
+ l.logger.Info("Warmup Job Started",
+ zap.Int("duration", int(l.warmup.Seconds())),
+ zap.Int("workers", int(l.workers)),
+ )
+ time.AfterFunc(l.warmup, func() {
+ l.logger.Info("jobs time is up, begins jobs completion")
+ l.stopFlag.SetSoft(true)
+ })
+
+ warmup := func(_ <-chan time.Duration, rnd *rand.Rand) Job {
+ return NewWarmup(l.logger, l.schema, l.store, &partitionRangeConfig, l.globalStatus, l.schemaCfg, l.stopFlag, rnd, l.failFast)
+ }
+
+ if err := l.start(ctx, warmup); err != nil {
+ return err
+ }
+ }
+
time.AfterFunc(l.duration, func() {
l.logger.Info("jobs time is up, begins jobs completion")
l.stopFlag.SetSoft(true)
})
- l.logger.Info("start jobs")
+ if l.mode.IsWrite() {
+ return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
+ return NewMutation(
+ l.logger.Named("Mutation"),
+ l.schema,
+ l.store,
+ &partitionRangeConfig,
+ l.globalStatus,
+ l.stopFlag,
+ rnd,
+ l.schemaCfg,
+ pump,
+ l.failFast,
+ )
+ })
+ }
+
+ return l.start(ctx, func(pump <-chan time.Duration, rnd *rand.Rand) Job {
+ return NewValidation(
+ l.logger,
+ pump,
+ l.schema, l.schemaCfg,
+ l.store,
+ rnd,
+ &partitionRangeConfig,
+ l.globalStatus,
+ l.stopFlag,
+ l.failFast,
+ )
+ })
+}
- for j := range l.schema.Tables {
+func (l *Runner) start(ctx context.Context, job func(<-chan time.Duration, *rand.Rand) Job) error {
+ g, gCtx := errgroup.WithContext(ctx)
+
+ g.SetLimit(int(l.workers))
+
+ partitionRangeConfig := l.schemaCfg.GetPartitionRangeConfig()
+
+ for j, table := range l.schema.Tables {
gen := l.generators[j]
- for i := 0; i < int(l.workers); i++ {
- for idx := range l.jobs {
- jobF := l.jobs[idx]
- g.Go(func() error {
- return jobF.Do(gCtx, gen)
- })
- }
+ pump := NewPump(l.stopFlag, l.logger.Named("Pump-"+table.Name))
+ rnd := rand.New(rand.NewSource(l.random.Uint64()))
+
+ v := NewValidation(l.logger, pump, l.schema, l.schemaCfg, l.store, rnd, &partitionRangeConfig, l.globalStatus, l.stopFlag, l.failFast)
+ j := job(pump, rnd)
+
+ g.TryGo(func() error {
+ return v.Do(gCtx, gen, table)
+ })
+
+ for i := 0; i < int(l.workers)-1; i++ {
+ g.TryGo(func() error {
+ return j.Do(gCtx, gen, table)
+ })
}
}
diff --git a/pkg/jobs/mode.go b/pkg/jobs/mode.go
new file mode 100644
index 0000000..e11ae57
--- /dev/null
+++ b/pkg/jobs/mode.go
@@ -0,0 +1,26 @@
+package jobs
+
+type Mode []string
+
+const (
+ WriteMode = "write"
+ ReadMode = "read"
+ MixedMode = "mixed"
+)
+
+func (m Mode) IsWrite() bool {
+ return m[0] == WriteMode
+}
+
+func ModeFromString(m string) Mode {
+ switch m {
+ case WriteMode:
+ return Mode{WriteMode}
+ case ReadMode:
+ return Mode{ReadMode}
+ case MixedMode:
+ return Mode{WriteMode, ReadMode}
+ default:
+ return Mode{}
+ }
+}
diff --git a/pkg/jobs/mutation.go b/pkg/jobs/mutation.go
index 0ba2d6a..252a52d 100644
--- a/pkg/jobs/mutation.go
+++ b/pkg/jobs/mutation.go
@@ -24,13 +24,11 @@ type (
stopFlag *stop.Flag
pump <-chan time.Duration
failFast bool
- verbose bool
}
mutation struct {
logger *zap.Logger
schema *typedef.Schema
- table *typedef.Table
store store.Store
partitionRangeConfig *typedef.PartitionRangeConfig
schemaCfg *typedef.SchemaConfig
@@ -43,32 +41,30 @@ type (
func NewMutation(
logger *zap.Logger,
schema *typedef.Schema,
- table *typedef.Table,
store store.Store,
partitionRangeConfig *typedef.PartitionRangeConfig,
globalStatus *status.GlobalStatus,
stopFlag *stop.Flag,
+ rnd *rand.Rand,
schemaCfg *typedef.SchemaConfig,
pump <-chan time.Duration,
failFast bool,
- verbose bool,
) *Mutation {
return &Mutation{
- logger: logger.Named("mutation"),
+ logger: logger,
mutation: mutation{
logger: logger.Named("mutation-with-deletes"),
schema: schema,
- table: table,
store: store,
partitionRangeConfig: partitionRangeConfig,
globalStatus: globalStatus,
deletes: true,
schemaCfg: schemaCfg,
+ random: rnd,
},
stopFlag: stopFlag,
pump: pump,
failFast: failFast,
- verbose: verbose,
}
}
@@ -76,7 +72,7 @@ func (m *Mutation) Name() string {
return "Mutation"
}
-func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error {
+func (m *Mutation) Do(ctx context.Context, generator generators.Interface, table *typedef.Table) error {
m.logger.Info("starting mutation loop")
defer m.logger.Info("ending mutation loop")
@@ -84,6 +80,7 @@ func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error
if m.stopFlag.IsHardOrSoft() {
return nil
}
+
select {
case <-m.stopFlag.SignalChannel():
m.logger.Debug("mutation job terminated")
@@ -95,9 +92,9 @@ func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error
var err error
if m.mutation.ShouldDoDDL() {
- err = m.mutation.DDL(ctx)
+ err = m.mutation.DDL(ctx, table)
} else {
- err = m.mutation.Statement(ctx, generator)
+ err = m.mutation.Statement(ctx, generator, table)
}
if err != nil {
@@ -111,13 +108,14 @@ func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error
}
}
-func (m *mutation) Statement(ctx context.Context, generator generators.Interface) error {
- mutateStmt, err := statements.GenMutateStmt(m.schema, m.table, generator, m.random, m.partitionRangeConfig, m.deletes)
+func (m *mutation) Statement(ctx context.Context, generator generators.Interface, table *typedef.Table) error {
+ mutateStmt, err := statements.GenMutateStmt(m.schema, table, generator, m.random, m.partitionRangeConfig, m.deletes)
if err != nil {
m.logger.Error("Failed! Mutation statement generation failed", zap.Error(err))
m.globalStatus.WriteErrors.Add(1)
return err
}
+
if mutateStmt == nil {
if w := m.logger.Check(zap.DebugLevel, "no statement generated"); w != nil {
w.Write(zap.String("job", "mutation"))
@@ -157,6 +155,7 @@ func (m *mutation) ShouldDoDDL() bool {
return false
}
- ind := m.random.Intn(1000000)
- return ind%100000 == 0
+ // 2% Change of DDL Happening
+ ind := m.random.Intn(100)
+ return ind < 2
}
diff --git a/pkg/jobs/validation.go b/pkg/jobs/validation.go
index 3b31f8d..41c9f2b 100644
--- a/pkg/jobs/validation.go
+++ b/pkg/jobs/validation.go
@@ -24,7 +24,6 @@ type Validation struct {
pump <-chan time.Duration
schema *typedef.Schema
schemaConfig *typedef.SchemaConfig
- table *typedef.Table
store store.Store
random *rand.Rand
partitionRangeConfig *typedef.PartitionRangeConfig
@@ -38,7 +37,6 @@ func NewValidation(
pump <-chan time.Duration,
schema *typedef.Schema,
schemaConfig *typedef.SchemaConfig,
- table *typedef.Table,
store store.Store,
random *rand.Rand,
partitionRangeConfig *typedef.PartitionRangeConfig,
@@ -51,7 +49,6 @@ func NewValidation(
pump: pump,
schema: schema,
schemaConfig: schemaConfig,
- table: table,
store: store,
random: random,
partitionRangeConfig: partitionRangeConfig,
@@ -65,11 +62,11 @@ func (v *Validation) Name() string {
return "Validation"
}
-func (v *Validation) validate(ctx context.Context, generator generators.Interface) error {
- stmt, cleanup := statements.GenCheckStmt(v.schema, v.table, generator, v.random, v.partitionRangeConfig)
+func (v *Validation) validate(ctx context.Context, generator generators.Interface, table *typedef.Table) error {
+ stmt, cleanup := statements.GenCheckStmt(v.schema, table, generator, v.random, v.partitionRangeConfig)
defer cleanup()
- err := validation(ctx, v.schemaConfig, v.table, v.store, stmt, v.logger)
+ err := validation(ctx, v.schemaConfig, table, v.store, stmt, v.logger)
switch {
case err == nil:
@@ -88,7 +85,7 @@ func (v *Validation) validate(ctx context.Context, generator generators.Interfac
return nil
}
-func (v *Validation) Do(ctx context.Context, generator generators.Interface) error {
+func (v *Validation) Do(ctx context.Context, generator generators.Interface, table *typedef.Table) error {
v.logger.Info("starting validation loop")
defer v.logger.Info("ending validation loop")
@@ -103,7 +100,7 @@ func (v *Validation) Do(ctx context.Context, generator generators.Interface) err
time.Sleep(hb)
}
- if err := v.validate(ctx, generator); errors.Is(err, context.Canceled) {
+ if err := v.validate(ctx, generator, table); errors.Is(err, context.Canceled) {
return nil
}
@@ -139,6 +136,13 @@ func validation(
var lastErr, err error
attempt := 1
for ; ; attempt++ {
+ select {
+ case <-time.After(delay):
+ case <-ctx.Done():
+ logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err))
+ return nil
+ }
+
lastErr = err
err = s.Check(ctx, table, stmt, attempt == maxAttempts)
@@ -161,13 +165,6 @@ func validation(
} else {
logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err))
}
-
- select {
- case <-time.After(delay):
- case <-ctx.Done():
- logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err))
- return nil
- }
}
if attempt > 1 {
diff --git a/pkg/jobs/warmup.go b/pkg/jobs/warmup.go
index 6ef1477..8e4106e 100644
--- a/pkg/jobs/warmup.go
+++ b/pkg/jobs/warmup.go
@@ -2,6 +2,7 @@ package jobs
import (
"context"
+ "golang.org/x/exp/rand"
"go.uber.org/zap"
@@ -17,34 +18,33 @@ type Warmup struct {
logger *zap.Logger
stopFlag *stop.Flag
failFast bool
- verbose bool
}
func NewWarmup(
logger *zap.Logger,
schema *typedef.Schema,
- table *typedef.Table,
store store.Store,
partitionRangeConfig *typedef.PartitionRangeConfig,
globalStatus *status.GlobalStatus,
+ schemaCfg *typedef.SchemaConfig,
stopFlag *stop.Flag,
+ rnd *rand.Rand,
failFast bool,
- verbose bool,
) *Warmup {
return &Warmup{
logger: logger.Named("mutation"),
mutation: mutation{
logger: logger.Named("mutation-without-deletes"),
schema: schema,
- table: table,
store: store,
partitionRangeConfig: partitionRangeConfig,
+ schemaCfg: schemaCfg,
globalStatus: globalStatus,
+ random: rnd,
deletes: false,
},
stopFlag: stopFlag,
failFast: failFast,
- verbose: verbose,
}
}
@@ -52,7 +52,7 @@ func (w *Warmup) Name() string {
return "Warmup"
}
-func (w *Warmup) Do(ctx context.Context, generator generators.Interface) error {
+func (w *Warmup) Do(ctx context.Context, generator generators.Interface, table *typedef.Table) error {
w.logger.Info("starting warmup loop")
defer w.logger.Info("ending warmup loop")
@@ -62,7 +62,7 @@ func (w *Warmup) Do(ctx context.Context, generator generators.Interface) error {
return nil
}
- _ = w.mutation.Statement(ctx, generator)
+ _ = w.mutation.Statement(ctx, generator, table)
if w.failFast && w.mutation.globalStatus.HasErrors() {
w.stopFlag.SetSoft(true)
return nil
diff --git a/pkg/typedef/simple_type.go b/pkg/typedef/simple_type.go
index f2ad0c0..93d0433 100644
--- a/pkg/typedef/simple_type.go
+++ b/pkg/typedef/simple_type.go
@@ -206,7 +206,7 @@ func (st SimpleType) genValue(r *rand.Rand, p *PartitionRangeConfig) any {
case TYPE_FLOAT:
return r.Float32()
case TYPE_INET:
- return net.ParseIP(utils.RandIPV4Address(r, r.Intn(255), 2)).String()
+ return net.ParseIP(utils.RandIPV4Address(r)).String()
case TYPE_INT:
return r.Int31()
case TYPE_SMALLINT:
diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go
index 4f3ab17..0cb04b6 100644
--- a/pkg/utils/utils.go
+++ b/pkg/utils/utils.go
@@ -18,6 +18,7 @@ import (
"encoding/hex"
"fmt"
"github.com/pkg/errors"
+ "golang.org/x/exp/constraints"
"strconv"
"strings"
"time"
@@ -55,22 +56,34 @@ func RandTime(rnd *rand.Rand) int64 {
return rnd.Int63n(86400000000000)
}
-func RandIPV4Address(rnd *rand.Rand, v, pos int) string {
+func ipV4Builder[T constraints.Integer](bytes [4]T) string {
+ var builder strings.Builder
+ builder.Grow(16) // Maximum length of an IPv4 address
+
+ for _, b := range bytes {
+ builder.WriteString(strconv.FormatUint(uint64(b), 10))
+ builder.WriteRune('.')
+ }
+
+ return builder.String()[:builder.Len()-1]
+}
+
+func RandIPV4Address(rnd *rand.Rand) string {
+ return ipV4Builder([4]int{rnd.Intn(256), rnd.Intn(256), rnd.Intn(256), rnd.Intn(256)})
+}
+
+func RandIPV4AddressPositional(rnd *rand.Rand, v, pos int) string {
if pos < 0 || pos > 4 {
panic(fmt.Sprintf("invalid position for the desired value of the IP part %d, 0-3 supported", pos))
}
if v < 0 || v > 255 {
panic(fmt.Sprintf("invalid value for the desired position %d of the IP, 0-255 suppoerted", v))
}
- var blocks []string
- for i := 0; i < 4; i++ {
- if i == pos {
- blocks = append(blocks, strconv.Itoa(v))
- } else {
- blocks = append(blocks, strconv.Itoa(rnd.Intn(255)))
- }
- }
- return strings.Join(blocks, ".")
+
+ data := [4]int{rnd.Intn(255), rnd.Intn(255), rnd.Intn(255), rnd.Intn(255)}
+ data[pos] = v
+
+ return ipV4Builder(data)
}
func RandInt2(rnd *rand.Rand, min, max int) int {
diff --git a/results/.gitkeep b/results/.gitkeep
new file mode 100644
index 0000000..e69de29