diff --git a/.run/Run Gemini Mixed.run.xml b/.run/Run Gemini Mixed.run.xml index c6d7c6c..1021a5e 100644 --- a/.run/Run Gemini Mixed.run.xml +++ b/.run/Run Gemini Mixed.run.xml @@ -3,7 +3,7 @@ - + diff --git a/Makefile b/Makefile index 489c78e..6c78e71 100644 --- a/Makefile +++ b/Makefile @@ -20,30 +20,39 @@ GEMINI_TEST_CLUSTER ?= $(shell docker inspect --format='{{ .NetworkSettings.Netw GEMINI_ORACLE_CLUSTER ?= $(shell docker inspect --format='{{ .NetworkSettings.Networks.gemini.IPAddress }}' gemini-oracle) GEMINI_DOCKER_NETWORK ?= gemini GEMINI_FLAGS ?= --fail-fast \ - --level=info \ - --non-interactive \ - --consistency=LOCAL_QUORUM \ - --test-host-selection-policy=token-aware \ - --oracle-host-selection-policy=token-aware \ - --mode=$(MODE) \ - --non-interactive \ - --request-timeout=5s \ - --connect-timeout=15s \ - --use-server-timestamps=false \ --async-objects-stabilization-attempts=10 \ - --max-mutation-retries=10 \ - --replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \ - --oracle-replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \ + --async-objects-stabilization-backoff=1s \ --concurrency=$(CONCURRENCY) \ - --dataset-size=$(DATASET_SIZE) \ + --cql-features=$(CQL_FEATURES) \ + --dataset-size=$(DATASET_SIZE) \ + --mode=$(MODE) \ + --duration=$(DURATION) \ --seed=$(SEED) \ --schema-seed=$(SEED) \ - --cql-features=$(CQL_FEATURES) \ - --duration=$(DURATION) \ --warmup=$(WARMUP) \ + --drop-schema=true \ + --level=info \ + --materialized-views=false \ + --async-objects-stabilization-attempts=10 \ --profiling-port=6060 \ - --drop-schema=true - + --token-range-slices=10000 \ + --partition-key-buffer-reuse-size=100 \ + --oracle-connect-timeout=15s \ + --oracle-request-timeout=5s \ + --oracle-consistency=LOCAL_QUORUM \ + --oracle-max-mutation-retries=10 \ + --oracle-max-mutation-retries-backoff=1s \ + --oracle-use-server-timestamps=false \ + --oracle-replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \ + --oracle-host-selection-policy=token-aware \ + --test-consistency=LOCAL_QUORUM \ + --test-request-timeout=5s \ + --test-connect-timeout=15s \ + --test-use-server-timestamps=false \ + --test-host-selection-policy=token-aware \ + --test-replication-strategy="{'class': 'NetworkTopologyStrategy', 'replication_factor': '1'}" \ + --test-max-mutation-retries=10 \ + --test-dc=datacenter1 ifndef GOBIN export GOBIN := $(MAKEFILE_PATH)/bin diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index b841ce7..ad6cdbc 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -17,7 +17,6 @@ package main import ( "encoding/json" "fmt" - "github.com/scylladb/gemini/pkg/store/drivers" "io" "log" "math" @@ -46,6 +45,7 @@ import ( "github.com/scylladb/gemini/pkg/replication" "github.com/scylladb/gemini/pkg/status" "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/store/drivers" "github.com/scylladb/gemini/pkg/typedef" "github.com/scylladb/gemini/pkg/utils" ) @@ -146,13 +146,7 @@ func run(_ *cobra.Command, _ []string) error { var oracle store.Driver if len(oracleConfig.Hosts) > 0 { - oracle, err = drivers.NewCQL( - context.Background(), - "oracle", - schema, - oracleConfig, - logger.Named("oracle_store"), - ) + oracle, err = drivers.NewCQL(ctx, "oracle", schema, oracleConfig, logger.Named("oracle_store")) if err != nil { return errors.Wrap(err, "failed to create oracle store") } @@ -167,7 +161,7 @@ func run(_ *cobra.Command, _ []string) error { oracle = drivers.NewNop() } - test, err := drivers.NewCQL(context.Background(), "test", schema, testConfig, logger.Named("test_store")) + test, err := drivers.NewCQL(ctx, "test", schema, testConfig, logger.Named("test_store")) if err != nil { return errors.Wrap(err, "failed to create oracle store") } @@ -282,8 +276,14 @@ func createLogger(level string) *zap.Logger { } encoderCfg := zap.NewProductionEncoderConfig() - encoderCfg.EncodeLevel = zapcore.CapitalColorLevelEncoder + encoderCfg.EncodeName = zapcore.FullNameEncoder + encoderCfg.EncodeLevel = zapcore.LowercaseLevelEncoder encoderCfg.EncodeTime = zapcore.RFC3339TimeEncoder + encoderCfg.EncodeDuration = zapcore.MillisDurationEncoder + encoderCfg.LevelKey = "L" + encoderCfg.MessageKey = "M" + encoderCfg.TimeKey = "T" + encoderCfg.CallerKey = "C" logger := zap.New(zapcore.NewCore( zapcore.NewJSONEncoder(encoderCfg), @@ -352,12 +352,12 @@ func init() { rootCmd.Flags().StringVarP(&testConfig.Username, "test-username", "", "", "Username for the test cluster") rootCmd.Flags().StringVarP(&testConfig.Password, "test-password", "", "", "Password for the test cluster") rootCmd.Flags().StringVarP(&testConfig.StatementLog, "test-statement-log-file", "", "", "File to write statements flow to") - rootCmd.Flags().DurationVarP(&testConfig.RequestTimeout, "request-timeout", "", 30*time.Second, "Duration of waiting request execution") - rootCmd.Flags().DurationVarP(&testConfig.ConnectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established") - rootCmd.Flags().DurationVarP(&testConfig.MaxRetriesMutateSleep, "max-mutation-retries-backoff", "", 10*time.Millisecond, "Duration between attempts to apply a mutation for example 10ms or 1s") + rootCmd.Flags().DurationVarP(&testConfig.RequestTimeout, "test-request-timeout", "", 30*time.Second, "Duration of waiting request execution") + rootCmd.Flags().DurationVarP(&testConfig.ConnectTimeout, "test-connect-timeout", "", 30*time.Second, "Duration of waiting connection established") + rootCmd.Flags().DurationVarP(&testConfig.MaxRetriesMutateSleep, "test-max-mutation-retries-backoff", "", 10*time.Millisecond, "Duration between attempts to apply a mutation for example 10ms or 1s") rootCmd.Flags().IntVarP(&testConfig.MaxRetriesMutate, "test-max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") - rootCmd.Flags().BoolVarP(&testConfig.UseServerSideTimestamps, "use-server-timestamps", "", false, "Use server-side generated timestamps for writes") - rootCmd.Flags().StringVarP(&testReplicationStrategy, "replication-strategy", "", "simple", "Specify the desired replication strategy as either the coded short hand simple|network to get the default for each type or provide the entire specification in the form {'class':'....'}") + rootCmd.Flags().BoolVarP(&testConfig.UseServerSideTimestamps, "test-use-server-timestamps", "", false, "Use server-side generated timestamps for writes") + rootCmd.Flags().StringVarP(&testReplicationStrategy, "test-replication-strategy", "", "simple", "Specify the desired replication strategy as either the coded short hand simple|network to get the default for each type or provide the entire specification in the form {'class':'....'}") rootCmd.Flags().StringSliceVarP(&oracleConfig.Hosts, "oracle-cluster", "o", []string{}, "Host names or IPs of the oracle cluster that provides correct answers. If omitted no oracle will be used") rootCmd.Flags().StringVarP(&oracleConfig.Trace, "oracle-tracing-outfile", "", "", "Specify the file to which tracing information gets written. Two magic names are available, 'stdout' and 'stderr'. By default tracing is disabled.") @@ -409,21 +409,23 @@ func printSetup(seed, schemaSeed uint64, schema *typedef.Schema) { tw := new(tabwriter.Writer) tw.Init(os.Stdout, 0, 8, 2, '\t', tabwriter.AlignRight) - fmt.Fprintf(tw, "Seed:\t%d\n", seed) - fmt.Fprintf(tw, "Schema seed:\t%d\n", schemaSeed) - fmt.Fprintf(tw, "Maximum duration:\t%s\n", duration) - fmt.Fprintf(tw, "Warmup duration:\t%s\n", warmup) - fmt.Fprintf(tw, "Concurrency:\t%d\n", concurrency) - fmt.Fprintf(tw, "Test cluster:\t%s\n", testConfig.Hosts) - fmt.Fprintf(tw, "Oracle cluster:\t%s\n", oracleConfig.Hosts) - fmt.Printf("Schema: \t%s\n", string(jsonSchema)) + _, _ = fmt.Fprintf(tw, "Seed:\t%d\n", seed) + _, _ = fmt.Fprintf(tw, "Schema seed:\t%d\n", schemaSeed) + _, _ = fmt.Fprintf(tw, "Maximum duration:\t%s\n", duration) + _, _ = fmt.Fprintf(tw, "Warmup duration:\t%s\n", warmup) + _, _ = fmt.Fprintf(tw, "Concurrency:\t%d\n", concurrency) + _, _ = fmt.Fprintf(tw, "Test cluster:\t%s\n", testConfig.Hosts) + _, _ = fmt.Fprintf(tw, "Oracle cluster:\t%s\n", oracleConfig.Hosts) + _, _ = fmt.Printf("Schema: \t%s\n", string(jsonSchema)) if outFileArg == "" { - fmt.Fprintf(tw, "Output file:\t%s\n", "") + _, _ = fmt.Fprintf(tw, "Output file:\t%s\n", "") } else { - fmt.Fprintf(tw, "Output file:\t%s\n", outFileArg) + _, _ = fmt.Fprintf(tw, "Output file:\t%s\n", outFileArg) + } + if err := tw.Flush(); err != nil { + log.Printf("Failed to print setup: %v", err) } - tw.Flush() } func RealRandom() uint64 { @@ -450,10 +452,11 @@ func seedFromString(seed string) uint64 { func generateSchema(logger *zap.Logger, sc typedef.SchemaConfig, schemaSeed string) (schema *typedef.Schema, intSchemaSeed uint64, err error) { intSchemaSeed = seedFromString(schemaSeed) schema = generators.GenSchema(sc, intSchemaSeed) - err = schema.Validate(partitionCount) - if err == nil { + + if err = schema.Validate(partitionCount); err == nil { return schema, intSchemaSeed, nil } + if schemaSeed != "random" { // If user provided schema, allow to run it, but log warning logger.Warn(errors.Wrap(err, "validation failed, running this test could end up in error or stale gemini").Error())