Skip to content

Commit

Permalink
improvement(write-rate): gemini is now able to write 10k req/s with 1…
Browse files Browse the repository at this point in the history
…6 threads on small cluster

Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Nov 19, 2024
1 parent 45960da commit 8a0dd27
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 125 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ jobs:
matrix:
gemini-features: ["basic", "normal"]
gemini-concurrency: [4]
duration: ["5m"]
dataset-size: [large]
duration: ["1m"]
dataset-size: [large, small]
oracle-scylla-version: ["6.1"]
test-scylla-version: ["6.2"]
fail-fast: false
Expand All @@ -54,6 +54,7 @@ jobs:
CONCURRENCY=${{ matrix.gemini-concurrency }} \
CQL_FEATURES=${{ matrix.gemini-features }} \
DURATION=${{ matrix.duration }} \
WARMUP=30s \
DATASET_SIZE=${{ matrix.dataset-size }} \
- name: Shutdown ScyllaDB
shell: bash
Expand Down
2 changes: 1 addition & 1 deletion .run/Run Gemini Mixed.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<module name="gemini" />
<working_directory value="$PROJECT_DIR$" />
<go_parameters value="-gcflags &quot;all=-N -l&quot;" />
<parameters value="--mode mixed --dataset-size=large --cql-features normal --warmup 0 --duration 2m --drop-schema true --fail-fast --level info --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 1 --profiling-port 6060" />
<parameters value="--mode mixed --seed=150 --schema-seed=150 --dataset-size=large --cql-features normal --warmup 0 --duration 2m --drop-schema true --fail-fast --level info --test-host-selection-policy token-aware --oracle-host-selection-policy token-aware --test-cluster=192.168.100.2 --oracle-cluster=192.168.100.3 --non-interactive --request-timeout 180s --connect-timeout 120s --use-server-timestamps false --async-objects-stabilization-attempts 10 --async-objects-stabilization-backoff 100ms --replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --oracle-replication-strategy &quot;{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}&quot; --max-mutation-retries 5 --max-mutation-retries-backoff 1000ms --concurrency 16 --profiling-port 6060" />
<kind value="PACKAGE" />
<package value="github.com/scylladb/gemini/cmd/gemini" />
<directory value="$PROJECT_DIR$" />
Expand Down
9 changes: 1 addition & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,9 @@ GEMINI_FLAGS =--fail-fast \
--use-server-timestamps=false \
--async-objects-stabilization-attempts=10 \
--max-mutation-retries=10 \
--async-objects-stabilization-backoff=1000ms \
--max-mutation-retries-backoff=1000ms \
--replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \
--oracle-replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \
--concurrency=$(CONCURRENCY) \
--use-lwt=true \
--dataset-size=$(DATASET_SIZE) \
--seed=$(SEED) \
--schema-seed=$(SEED) \
Expand All @@ -108,7 +105,7 @@ GEMINI_FLAGS =--fail-fast \

.PHONY: pprof-profile
pprof-profile:
go tool pprof -http=:8080 -seconds 60 http://localhost:6060/debug/pprof/profile
go tool pprof -http=:8080 http://localhost:6060/debug/pprof/profile

.PHONY: pprof-heap
pprof-heap:
Expand All @@ -126,10 +123,6 @@ pprof-block:
pprof-mutex:
go tool pprof -http=:8084 http://localhost:6060/debug/pprof/mutex

.PHONY: pprof-trace
pprof-trace:
go tool pprof -http=:8085 -seconds 60 http://localhost:6060/debug/pprof/trace

.PHONY: docker-integration-test
docker-integration-test:
@mkdir -p $(PWD)/results
Expand Down
6 changes: 3 additions & 3 deletions cmd/gemini/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func init() {
rootCmd.Flags().IntVarP(&maxColumns, "max-columns", "", 16, "Maximum number of generated columns")
rootCmd.Flags().IntVarP(&minColumns, "min-columns", "", 8, "Minimum number of generated columns")
rootCmd.Flags().StringVarP(&datasetSize, "dataset-size", "", "large", "Specify the type of dataset size to use, small|large")
rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "basic", "Specify the type of cql features to use, basic|normal|all")
rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "normal", "Specify the type of cql features to use, basic|normal|all")
rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Run gemini with materialized views support")
rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal")
rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 10, "Maximum number of attempts to apply a mutation")
rootCmd.Flags().DurationVarP(
&maxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond,
"Duration between attempts to apply a mutation for example 10ms or 1s")
rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 30000, "Number of reused buffered partition keys")
rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 10000, "Number of slices to divide the token space into")
rootCmd.Flags().Uint64VarP(&pkBufferReuseSize, "partition-key-buffer-reuse-size", "", 1000, "Number of reused buffered partition keys")
rootCmd.Flags().Uint64VarP(&partitionCount, "token-range-slices", "", 1000, "Number of slices to divide the token space into")
rootCmd.Flags().StringVarP(
&partitionKeyDistribution, "partition-key-distribution", "", "uniform",
"Specify the distribution from which to draw partition keys, supported values are currently uniform|normal|zipf")
Expand Down
44 changes: 25 additions & 19 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package main

import (
"fmt"
"github.com/prometheus/client_golang/prometheus/promhttp"
"log"
"net/http"
"net/http/pprof"
"os"
"os/signal"
"strconv"
Expand Down Expand Up @@ -49,6 +53,8 @@ func interactive() bool {
}

func run(_ *cobra.Command, _ []string) error {
start := time.Now()

logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
defer utils.IgnoreError(logger.Sync)
Expand Down Expand Up @@ -79,23 +85,23 @@ func run(_ *cobra.Command, _ []string) error {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGABRT, syscall.SIGTERM)
defer cancel()

//go func() {
// mux := http.NewServeMux()
// mux.Handle("GET /metrics", promhttp.Handler())
// log.Fatal(http.ListenAndServe(bind, mux))
//}()
//
//if profilingPort != 0 {
// go func() {
// mux := http.NewServeMux()
// mux.HandleFunc("GET /debug/pprof/", pprof.Index)
// mux.HandleFunc("GET /debug/pprof/cmdline", pprof.Cmdline)
// mux.HandleFunc("GET /debug/pprof/profile", pprof.Profile)
// mux.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol)
// mux.HandleFunc("GET /debug/pprof/trace", pprof.Trace)
// log.Fatal(http.ListenAndServe(":"+strconv.Itoa(profilingPort), mux))
// }()
//}
go func() {
mux := http.NewServeMux()
mux.Handle("GET /metrics", promhttp.Handler())
log.Fatal(http.ListenAndServe(bind, mux))
}()

if profilingPort != 0 {
go func() {
mux := http.NewServeMux()
mux.HandleFunc("GET /debug/pprof/", pprof.Index)
mux.HandleFunc("GET /debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("GET /debug/pprof/profile", pprof.Profile)
mux.HandleFunc("GET /debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("GET /debug/pprof/trace", pprof.Trace)
log.Fatal(http.ListenAndServe(":"+strconv.Itoa(profilingPort), mux))
}()
}

outFile, err := createFile(outFileArg, os.Stdout)
if err != nil {
Expand Down Expand Up @@ -153,7 +159,7 @@ func run(_ *cobra.Command, _ []string) error {
if err != nil {
return err
}
//defer utils.IgnoreError(st.Close)
defer utils.IgnoreError(st.Close)

if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropKeyspace(schema) {
Expand Down Expand Up @@ -218,7 +224,7 @@ func run(_ *cobra.Command, _ []string) error {
}

logger.Info("test finished")
globalStatus.PrintResult(outFile, schema, version)
globalStatus.PrintResult(outFile, schema, version, start)

if globalStatus.HasErrors() {
return errors.Errorf("gemini encountered errors, exiting with non zero status")
Expand Down
126 changes: 67 additions & 59 deletions pkg/generators/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package generators

import (
"context"
"sync/atomic"

"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -58,8 +59,8 @@ type Generator struct {
partitionsConfig typedef.PartitionRangeConfig
partitionCount uint64

cntCreated uint64
cntEmitted uint64
cntCreated atomic.Uint64
cntEmitted atomic.Uint64
}

func (g *Generator) PartitionCount() uint64 {
Expand All @@ -74,10 +75,10 @@ type Config struct {
PkUsedBufferSize uint64
}

func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Generator {
func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) *Generator {
wakeUpSignal := make(chan struct{}, int(config.PartitionsCount))

return Generator{
return &Generator{
partitions: NewPartitions(int(config.PartitionsCount), int(config.PkUsedBufferSize), wakeUpSignal),
partitionCount: config.PartitionsCount,
table: table,
Expand All @@ -91,12 +92,18 @@ func NewGenerator(table *typedef.Table, config Config, logger *zap.Logger) Gener
}

func (g *Generator) Get() *typedef.ValueWithToken {
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
var out *typedef.ValueWithToken

for out == nil {
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
}
out = targetPart.get()
}
out := targetPart.get()

return out

}

func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition {
Expand All @@ -106,11 +113,17 @@ func (g *Generator) GetPartitionForToken(token TokenIndex) *Partition {
// GetOld returns a previously used value and token or a new if
// the old queue is empty.
func (g *Generator) GetOld() *typedef.ValueWithToken {
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
var out *typedef.ValueWithToken

for out == nil {
targetPart := g.GetPartitionForToken(g.idxFunc())
for targetPart.Stale() {
targetPart = g.GetPartitionForToken(g.idxFunc())
}
out = targetPart.getOld()
}
return targetPart.getOld()

return out
}

// GiveOld returns the supplied value for later reuse
Expand All @@ -125,16 +138,27 @@ func (g *Generator) ReleaseToken(token uint64) {
g.GetPartitionForToken(TokenIndex(token)).releaseToken(token)
}

func (g *Generator) Start(ctx context.Context) {
func (g *Generator) start(ctx context.Context) {
g.logger.Info("starting partition key generation loop")
defer func() {
g.partitions.Close()
g.logger.Debug("stopping partition key generation loop",
zap.Uint64("keys_created", g.cntCreated),
zap.Uint64("keys_emitted", g.cntEmitted))
g.logger.Info("stopping partition key generation loop",
zap.Uint64("keys_created", g.cntCreated.Load()),
zap.Uint64("keys_emitted", g.cntEmitted.Load()),
)

if err := g.partitions.Close(); err != nil {
g.logger.Error("failed to close partitions", zap.Error(err))
}
}()

g.fillAllPartitions(ctx)
for {
select {
case <-ctx.Done():
return
case <-g.wakeUpSignal:
g.fillAllPartitions(ctx)
}
}
}

func (g *Generator) FindAndMarkStalePartitions() {
Expand All @@ -156,57 +180,41 @@ func (g *Generator) FindAndMarkStalePartitions() {
}
}

func (g *Generator) fillPartition() {
// Be a bit smarter on how to fill partitions

values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
g.logger.Panic("failed to get primary key hash", zap.Error(err))
}
g.cntCreated.Add(1)
idx := token % g.partitionCount
partition := g.partitions[idx]
if partition.Stale() || partition.inFlight.Has(token) {
return
}
select {
case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
g.cntEmitted.Add(1)
default:
}

return
}

// fillAllPartitions guarantees that each partition was tested to be full
// at least once since the function started and before it ended.
// In other words no partition will be starved.
func (g *Generator) fillAllPartitions(ctx context.Context) {
pFilled := make([]bool, len(g.partitions))
allFilled := func() bool {
for idx, filled := range pFilled {
if !filled {
if g.partitions[idx].Stale() {
continue
}
return false
}
}
return true
}

for {
select {
case <-ctx.Done():
return
case <-g.wakeUpSignal:
}

for !allFilled() {
select {
case <-ctx.Done():
return
default:
}
values := CreatePartitionKeyValues(g.table, g.r, &g.partitionsConfig)
token, err := g.routingKeyCreator.GetHash(g.table, values)
if err != nil {
g.logger.Panic(errors.Wrap(err, "failed to get primary key hash").Error())
}
g.cntCreated++
idx := token % g.partitionCount
partition := g.partitions[idx]
if partition.Stale() || partition.inFlight.Has(token) {
continue
}
select {
case partition.values <- &typedef.ValueWithToken{Token: token, Value: values}:
g.cntEmitted++
default:
if !pFilled[idx] {
pFilled[idx] = true
}
}
default:
}

g.fillPartition()
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestGenerator(t *testing.T) {
}
logger, _ := zap.NewDevelopment()
generator := generators.NewGenerator(table, cfg, logger)
generator.Start(ctx)
generator.start(ctx)

Check failure on line 54 in pkg/generators/generator_test.go

View workflow job for this annotation

GitHub Actions / Lint Test and Build

generator.start undefined (cannot refer to unexported method start) (typecheck)
for i := uint64(0); i < cfg.PartitionsCount; i++ {
atomic.StoreUint64(&current, i)
v := generator.Get()
Expand Down
Loading

0 comments on commit 8a0dd27

Please sign in to comment.