diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml
index c60fafe..814bc55 100644
--- a/.github/workflows/integration-tests.yml
+++ b/.github/workflows/integration-tests.yml
@@ -31,8 +31,8 @@ jobs:
matrix:
gemini-features: ["basic", "normal"]
gemini-concurrency: [4]
- duration: ["5m"]
- dataset-size: [large]
+ duration: ["1m"]
+ dataset-size: [large, small]
oracle-scylla-version: ["6.1"]
test-scylla-version: ["6.2"]
fail-fast: false
@@ -54,6 +54,7 @@ jobs:
CONCURRENCY=${{ matrix.gemini-concurrency }} \
CQL_FEATURES=${{ matrix.gemini-features }} \
DURATION=${{ matrix.duration }} \
+ WARMUP=30s \
DATASET_SIZE=${{ matrix.dataset-size }} \
- name: Shutdown ScyllaDB
shell: bash
diff --git a/.run/Run Gemini Mixed.run.xml b/.run/Run Gemini Mixed.run.xml
index c967827..c6d7c6c 100644
--- a/.run/Run Gemini Mixed.run.xml
+++ b/.run/Run Gemini Mixed.run.xml
@@ -3,7 +3,7 @@
-
+
diff --git a/Makefile b/Makefile
index e6af7ac..ad905b6 100644
--- a/Makefile
+++ b/Makefile
@@ -91,12 +91,9 @@ GEMINI_FLAGS =--fail-fast \
--use-server-timestamps=false \
--async-objects-stabilization-attempts=10 \
--max-mutation-retries=10 \
- --async-objects-stabilization-backoff=1000ms \
- --max-mutation-retries-backoff=1000ms \
--replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \
--oracle-replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \
--concurrency=$(CONCURRENCY) \
- --use-lwt=true \
--dataset-size=$(DATASET_SIZE) \
--seed=$(SEED) \
--schema-seed=$(SEED) \
@@ -108,7 +105,7 @@ GEMINI_FLAGS =--fail-fast \
.PHONY: pprof-profile
pprof-profile:
- go tool pprof -http=:8080 -seconds 60 http://localhost:6060/debug/pprof/profile
+ go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile
.PHONY: pprof-heap
pprof-heap:
@@ -126,10 +123,6 @@ pprof-block:
pprof-mutex:
go tool pprof -http=:8084 http://localhost:6060/debug/pprof/mutex
-.PHONY: pprof-trace
-pprof-trace:
- go tool pprof -http=:8085 -seconds 60 http://localhost:6060/debug/pprof/trace
-
.PHONY: docker-integration-test
docker-integration-test:
@mkdir -p $(PWD)/results
diff --git a/cmd/gemini/flags.go b/cmd/gemini/flags.go
index cb2f8d8..9b6abfa 100644
--- a/cmd/gemini/flags.go
+++ b/cmd/gemini/flags.go
@@ -121,15 +121,15 @@ func init() {
rootCmd.Flags().IntVarP(&maxColumns, "max-columns", "", 16, "Maximum number of generated columns")
rootCmd.Flags().IntVarP(&minColumns, "min-columns", "", 8, "Minimum number of generated columns")
rootCmd.Flags().StringVarP(&datasetSize, "dataset-size", "", "large", "Specify the type of dataset size to use, small|large")
- rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "basic", "Specify the type of cql features to use, basic|normal|all")
+ rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "normal", "Specify the type of cql features to use, basic|normal|all")
rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Run gemini with materialized views support")
rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal")
rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 10, "Maximum number of attempts to apply a mutation")
rootCmd.Flags().DurationVarP(
&maxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond,
"Duration between attempts to apply a mutation for example 10ms or 1s")
- rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 30000, "Number of reused buffered partition keys")
- rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 10000, "Number of slices to divide the token space into")
+ rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 1000, "Number of reused buffered partition keys")
+ rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 1000, "Number of slices to divide the token space into")
rootCmd.Flags().StringVarP(
&partitionKeyDistribution, "partition-key-distribution", "", "uniform",
"Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|zipf")
diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go
index 38115b6..61dff00 100644
--- a/cmd/gemini/root.go
+++ b/cmd/gemini/root.go
@@ -16,6 +16,10 @@ package main
import (
"fmt"
+ "github.com/prometheus/client_golang/prometheus/promhttp"
+ "log"
+ "net/http"
+ "net/http/pprof"
"os"
"os/signal"
"strconv"
@@ -49,6 +53,8 @@ func interactive() bool {
}
func run(_ *cobra.Command, _ []string) error {
+ start := time.Now()
+
logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
defer utils.IgnoreError(logger.Sync)
@@ -79,23 +85,23 @@ func run(_ *cobra.Command, _ []string) error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
defer cancel()
- //go func() {
- // mux := http.NewServeMux()
- // mux.Handle("GET /metrics", promhttp.Handler())
- // log.Fatal(http.ListenAndServe(bind, mux))
- //}()
- //
- //if profilingPort != 0 {
- // go func() {
- // mux := http.NewServeMux()
- // mux.HandleFunc("GET /debug/pprof/", pprof.Index)
- // mux.HandleFunc("GET /debug/pprof/cmdline", pprof.Cmdline)
- // mux.HandleFunc("GET /debug/pprof/profile", pprof.Profile)
- // mux.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol)
- // mux.HandleFunc("GET /debug/pprof/trace", pprof.Trace)
- // log.Fatal(http.ListenAndServe(":"+strconv.Itoa(profilingPort), mux))
- // }()
- //}
+ go func() {
+ mux := http.NewServeMux()
+ mux.Handle("GET /metrics", promhttp.Handler())
+ log.Fatal(http.ListenAndServe(bind, mux))
+ }()
+
+ if profilingPort != 0 {
+ go func() {
+ mux := http.NewServeMux()
+ mux.HandleFunc("GET /debug/pprof/", pprof.Index)
+ mux.HandleFunc("GET /debug/pprof/cmdline", pprof.Cmdline)
+ mux.HandleFunc("GET /debug/pprof/profile", pprof.Profile)
+ mux.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol)
+ mux.HandleFunc("GET /debug/pprof/trace", pprof.Trace)
+ log.Fatal(http.ListenAndServe(":"+strconv.Itoa(profilingPort), mux))
+ }()
+ }
outFile, err := createFile(outFileArg, os.Stdout)
if err != nil {
@@ -153,7 +159,7 @@ func run(_ *cobra.Command, _ []string) error {
if err != nil {
return err
}
- //defer utils.IgnoreError(st.Close)
+ defer utils.IgnoreError(st.Close)
if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropKeyspace(schema) {
@@ -218,7 +224,7 @@ func run(_ *cobra.Command, _ []string) error {
}
logger.Info("test finished")
- globalStatus.PrintResult(outFile, schema, version)
+ globalStatus.PrintResult(outFile, schema, version, start)
if globalStatus.HasErrors() {
return errors.Errorf("gemini encountered errors, exiting with non zero status")
diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go
index 0a8d69d..f3c1e8f 100644
--- a/pkg/generators/generator.go
+++ b/pkg/generators/generator.go
@@ -16,6 +16,7 @@ package generators
import (
"context"
+ "sync/atomic"
"github.com/pkg/errors"
"go.uber.org/zap"
@@ -58,8 +59,8 @@ type Generator struct {
partitionsConfig typedef.PartitionRangeConfig
partitionCount uint64
- cntCreated uint64
- cntEmitted uint64
+ cntCreated atomic.Uint64
+ cntEmitted atomic.Uint64
}
func (g *Generator) PartitionCount() uint64 {
@@ -74,10 +75,10 @@ type Config struct {
PkUsedBufferSize uint64
}
-func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Generator {
+func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) *Generator {
wakeUpSignal := make(chan struct{}, int(config.PartitionsCount))
- return Generator{
+ return &Generator{
partitions: NewPartitions(int(config.PartitionsCount), int(config.PkUsedBufferSize), wakeUpSignal),
partitionCount: config.PartitionsCount,
table: table,
@@ -91,12 +92,18 @@ func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Gener
}
func (g *Generator) Get() *typedef.ValueWithToken {
- targetPart := g.GetPartitionForToken(g.idxFunc())
- for targetPart.Stale() {
- targetPart = g.GetPartitionForToken(g.idxFunc())
+ var out *typedef.ValueWithToken
+
+ for out == nil {
+ targetPart := g.GetPartitionForToken(g.idxFunc())
+ for targetPart.Stale() {
+ targetPart = g.GetPartitionForToken(g.idxFunc())
+ }
+ out = targetPart.get()
}
- out := targetPart.get()
+
return out
+
}
func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition {
@@ -106,11 +113,17 @@ func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition {
// GetOld returns a previously used value and token or a new if
// the old queue is empty.
func (g *Generator) GetOld() *typedef.ValueWithToken {
- targetPart := g.GetPartitionForToken(g.idxFunc())
- for targetPart.Stale() {
- targetPart = g.GetPartitionForToken(g.idxFunc())
+ var out *typedef.ValueWithToken
+
+ for out == nil {
+ targetPart := g.GetPartitionForToken(g.idxFunc())
+ for targetPart.Stale() {
+ targetPart = g.GetPartitionForToken(g.idxFunc())
+ }
+ out = targetPart.getOld()
}
- return targetPart.getOld()
+
+ return out
}
// GiveOld returns the supplied value for later reuse
@@ -125,16 +138,27 @@ func (g *Generator) ReleaseToken(token uint64) {
g.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
}
-func (g *Generator) Start(ctx context.Context) {
+func (g *Generator) start(ctx context.Context) {
g.logger.Info("starting partition key generation loop")
defer func() {
- g.partitions.Close()
- g.logger.Debug("stopping partition key generation loop",
- zap.Uint64("keys_created", g.cntCreated),
- zap.Uint64("keys_emitted", g.cntEmitted))
+ g.logger.Info("stopping partition key generation loop",
+ zap.Uint64("keys_created", g.cntCreated.Load()),
+ zap.Uint64("keys_emitted", g.cntEmitted.Load()),
+ )
+
+ if err := g.partitions.Close(); err != nil {
+ g.logger.Error("failed to close partitions", zap.Error(err))
+ }
}()
- g.fillAllPartitions(ctx)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-g.wakeUpSignal:
+ g.fillAllPartitions(ctx)
+ }
+ }
}
func (g *Generator) FindAndMarkStalePartitions() {
@@ -156,57 +180,41 @@ func (g *Generator) FindAndMarkStalePartitions() {
}
}
+func (g *Generator) fillPartition() {
+ // Be a bit smarter on how to fill partitions
+
+ values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
+ token, err := g.routingKeyCreator.GetHash(g.table, values)
+ if err != nil {
+ g.logger.Panic("failed to get primary key hash", zap.Error(err))
+ }
+ g.cntCreated.Add(1)
+ idx := token % g.partitionCount
+ partition := g.partitions[idx]
+ if partition.Stale() || partition.inFlight.Has(token) {
+ return
+ }
+ select {
+ case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
+ g.cntEmitted.Add(1)
+ default:
+ }
+
+ return
+}
+
// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions(ctx context.Context) {
- pFilled := make([]bool, len(g.partitions))
- allFilled := func() bool {
- for idx, filled := range pFilled {
- if !filled {
- if g.partitions[idx].Stale() {
- continue
- }
- return false
- }
- }
- return true
- }
-
for {
select {
case <-ctx.Done():
return
- case <-g.wakeUpSignal:
- }
-
- for !allFilled() {
- select {
- case <-ctx.Done():
- return
- default:
- }
- values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
- token, err := g.routingKeyCreator.GetHash(g.table, values)
- if err != nil {
- g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error())
- }
- g.cntCreated++
- idx := token % g.partitionCount
- partition := g.partitions[idx]
- if partition.Stale() || partition.inFlight.Has(token) {
- continue
- }
- select {
- case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
- g.cntEmitted++
- default:
- if !pFilled[idx] {
- pFilled[idx] = true
- }
- }
+ default:
}
+ g.fillPartition()
}
}
diff --git a/pkg/generators/generator_test.go b/pkg/generators/generator_test.go
index 7461749..26240ae 100644
--- a/pkg/generators/generator_test.go
+++ b/pkg/generators/generator_test.go
@@ -51,7 +51,7 @@ func TestGenerator(t *testing.T) {
}
logger, _ := zap.NewDevelopment()
generator := generators.NewGenerator(table, cfg, logger)
- generator.Start(ctx)
+ generator.start(ctx)
for i := uint64(0); i < cfg.PartitionsCount; i++ {
atomic.StoreUint64(¤t, i)
v := generator.Get()
diff --git a/pkg/generators/generators.go b/pkg/generators/generators.go
index 95d3f14..f6e6323 100644
--- a/pkg/generators/generators.go
+++ b/pkg/generators/generators.go
@@ -25,13 +25,13 @@ import (
type Generators struct {
wg sync.WaitGroup
- generators []Generator
+ generators []*Generator
cancel context.CancelFunc
idx int
}
func (g *Generators) Get() *Generator {
- gen := &g.generators[g.idx%len(g.generators)]
+ gen := g.generators[g.idx%len(g.generators)]
g.idx++
return gen
}
@@ -56,11 +56,11 @@ func New(
ctx, cancel := context.WithCancel(ctx)
gens := &Generators{
- generators: make([]Generator, 0, len(schema.Tables)),
+ generators: make([]*Generator, 0, len(schema.Tables)),
cancel: cancel,
}
-
gens.wg.Add(len(schema.Tables))
+
for _, table := range schema.Tables {
pkVariations := table.PartitionKeys.ValueVariationsNumber(&partitionRangeConfig)
@@ -72,6 +72,10 @@ func New(
PkUsedBufferSize: pkBufferReuseSize,
}
g := NewGenerator(table, tablePartConfig, logger.Named("generators"))
+ go func() {
+ defer gens.wg.Done()
+ g.start(ctx)
+ }()
if pkVariations < 2^32 {
// Low partition key variation can lead to having staled partitions
// Let's detect and mark them before running test
@@ -79,12 +83,6 @@ func New(
}
gens.generators = append(gens.generators, g)
-
- go func(g *Generator) {
- defer gens.wg.Done()
-
- g.Start(ctx)
- }(&g)
}
return gens, nil
diff --git a/pkg/generators/partition.go b/pkg/generators/partition.go
index 2880d97..dd6c381 100644
--- a/pkg/generators/partition.go
+++ b/pkg/generators/partition.go
@@ -51,7 +51,11 @@ func (s *Partition) Stale() bool {
// get returns a new value and ensures that it's corresponding token
// is not already in-flight.
func (s *Partition) get() *typedef.ValueWithToken {
- return s.pick()
+ if v := s.pick(); v != nil && !s.inFlight.Has(v.Token) {
+ return v
+ }
+
+ return nil
}
// getOld returns a previously used value and token or a new if
@@ -69,12 +73,12 @@ func (s *Partition) getOld() *typedef.ValueWithToken {
// is empty in which case it removes the corresponding token from the
// in-flight tracking.
func (s *Partition) giveOld(v *typedef.ValueWithToken) {
- ch := s.safelyGetOldValuesChannel()
- if ch == nil {
+ if s.closed.Load() {
return
}
+
select {
- case ch <- v:
+ case s.oldValues <- v:
default:
// Old partition buffer is full, just drop the value
}
@@ -94,28 +98,15 @@ func (s *Partition) wakeUp() {
func (s *Partition) pick() *typedef.ValueWithToken {
select {
- case val, more := <-s.values:
- if !more {
- return nil
- }
+ case val := <-s.values:
if len(s.values) <= cap(s.values)/4 {
s.wakeUp() // channel at 25% capacity, trigger generator
}
return val
default:
s.wakeUp() // channel empty, need to wait for new values
- return <-s.values
- }
-}
-
-func (s *Partition) safelyGetOldValuesChannel() chan *typedef.ValueWithToken {
- if s.closed.Load() {
- // Since only giveOld could have been potentially called after partition is closed
- // we need to protect it against writing to closed channel
return nil
}
-
- return s.oldValues
}
func (s *Partition) Close() error {
diff --git a/pkg/generators/partitions.go b/pkg/generators/partitions.go
index 237579f..b0eec36 100644
--- a/pkg/generators/partitions.go
+++ b/pkg/generators/partitions.go
@@ -14,7 +14,7 @@ func (p Partitions) Close() error {
return err
}
-func NewPartitions(count, pkBufferSize int, wakeUpSignal chan struct{}) Partitions {
+func NewPartitions(count, pkBufferSize int, wakeUpSignal chan<- struct{}) Partitions {
partitions := make(Partitions, 0, count)
for i := 0; i < count; i++ {
diff --git a/pkg/status/status.go b/pkg/status/status.go
index 9c9ece6..2568f06 100644
--- a/pkg/status/status.go
+++ b/pkg/status/status.go
@@ -19,6 +19,7 @@ import (
"fmt"
"io"
"sync/atomic"
+ "time"
"github.com/pkg/errors"
@@ -52,11 +53,12 @@ func (gs *GlobalStatus) AddReadError(err *joberror.JobError) {
gs.ReadErrors.Add(1)
}
-func (gs *GlobalStatus) PrintResultAsJSON(w io.Writer, schema *typedef.Schema, version string) error {
+func (gs *GlobalStatus) PrintResultAsJSON(w io.Writer, schema *typedef.Schema, version string, start time.Time) error {
result := map[string]any{
"result": gs,
"gemini_version": version,
"schemaHash": schema.GetHash(),
+ "Time": time.Since(start).String(),
}
encoder := json.NewEncoder(w)
encoder.SetEscapeHTML(false)
@@ -76,12 +78,13 @@ func (gs *GlobalStatus) HasErrors() bool {
return gs.WriteErrors.Load() > 0 || gs.ReadErrors.Load() > 0
}
-func (gs *GlobalStatus) PrintResult(w io.Writer, schema *typedef.Schema, version string) {
- if err := gs.PrintResultAsJSON(w, schema, version); err != nil {
+func (gs *GlobalStatus) PrintResult(w io.Writer, schema *typedef.Schema, version string, start time.Time) {
+ if err := gs.PrintResultAsJSON(w, schema, version, start); err != nil {
// In case there has been it has been a long run we want to display it anyway...
fmt.Printf("Unable to print result as json, using plain text to stdout, error=%s\n", err)
fmt.Printf("Gemini version: %s\n", version)
fmt.Printf("Results:\n")
+ fmt.Printf("\ttime: %v\n", time.Since(start).String())
fmt.Printf("\twrite ops: %v\n", gs.WriteOps.Load())
fmt.Printf("\tread ops: %v\n", gs.ReadOps.Load())
fmt.Printf("\twrite errors: %v\n", gs.WriteErrors.Load())