diff --git a/Makefile b/Makefile index 73c5e95..3986c23 100644 --- a/Makefile +++ b/Makefile @@ -9,7 +9,7 @@ DOCKER_VERSION ?= latest GOLANGCI_VERSION ?= 1.62.0 CQL_FEATURES ?= normal -CONCURRENCY ?= 1 +CONCURRENCY ?= 4 DURATION ?= 10m WARMUP ?= 0 MODE ?= mixed @@ -42,7 +42,10 @@ GEMINI_FLAGS ?= --fail-fast \ --duration=$(DURATION) \ --warmup=$(WARMUP) \ --profiling-port=6060 \ - --drop-schema=true + --drop-schema=true \ + --oracle-statement-log-file=$(PWD)/results/oracle-statements.log.zst \ + --test-statement-log-file=$(PWD)/results/test-statements.log.zst \ + --statement-log-file-compression=zstd ifndef GOBIN diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 6d2a22d..70ecaa2 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -17,6 +17,7 @@ package main import ( "encoding/json" "fmt" + "github.com/scylladb/gemini/pkg/stmtlogger" "log" "math" "net/http" @@ -108,6 +109,7 @@ var ( profilingPort int testStatementLogFile string oracleStatementLogFile string + statementLogFileCompression string ) func interactive() bool { @@ -214,11 +216,12 @@ func run(_ *cobra.Command, _ []string) error { testCluster, oracleCluster := createClusters(cons, testHostSelectionPolicy, oracleHostSelectionPolicy, logger) storeConfig := store.Config{ - MaxRetriesMutate: maxRetriesMutate, - MaxRetriesMutateSleep: maxRetriesMutateSleep, - UseServerSideTimestamps: useServerSideTimestamps, - TestLogStatementsFile: testStatementLogFile, - OracleLogStatementsFile: oracleStatementLogFile, + MaxRetriesMutate: maxRetriesMutate, + MaxRetriesMutateSleep: maxRetriesMutateSleep, + UseServerSideTimestamps: useServerSideTimestamps, + TestLogStatementsFile: testStatementLogFile, + OracleLogStatementsFile: oracleStatementLogFile, + LogStatementFileCompression: getLogStatementFileCompression(statementLogFileCompression), } var tracingFile *os.File if tracingOutFile != "" { @@ -411,6 +414,14 @@ func createClusters( return testCluster, oracleCluster } +func getLogStatementFileCompression(input string) stmtlogger.Compression { + switch input { + case "zstd": + return stmtlogger.ZSTD + } + return stmtlogger.NoCompression +} + func getReplicationStrategy(rs string, fallback *replication.Replication, logger *zap.Logger) *replication.Replication { switch rs { case "network": @@ -538,6 +549,7 @@ func init() { rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end") rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to") rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to") + rootCmd.Flags().StringVarP(&statementLogFileCompression, "statement-log-file-compression", "", "zstd", "Compression algorithm to use for statement log files") } func printSetup(seed, schemaSeed uint64) { diff --git a/pkg/stmtlogger/filelogger.go b/pkg/stmtlogger/filelogger.go index d367bb8..42c70bf 100644 --- a/pkg/stmtlogger/filelogger.go +++ b/pkg/stmtlogger/filelogger.go @@ -18,6 +18,7 @@ import ( "bufio" "bytes" "context" + "github.com/klauspost/compress/zstd" "io" "log" "os" @@ -53,9 +54,16 @@ type ( wg sync.WaitGroup active atomic.Bool } + + Compression int +) + +const ( + NoCompression Compression = iota + ZSTD ) -func NewFileLogger(filename string) (StmtToFile, error) { +func NewFileLogger(filename string, compression Compression) (StmtToFile, error) { if filename == "" { return &nopFileLogger{}, nil } @@ -65,14 +73,31 @@ func NewFileLogger(filename string) (StmtToFile, error) { return nil, err } - return NewLogger(fd) + return NewLogger(fd, compression) } -func NewLogger(w io.Writer) (StmtToFile, error) { +func NewLogger(w io.Writer, compression Compression) (StmtToFile, error) { ctx, cancel := context.WithCancel(context.Background()) + var writer *bufio.Writer + switch compression { + case ZSTD: + zstdWriter, err := zstd.NewWriter(w, + zstd.WithEncoderLevel(zstd.SpeedFastest), + zstd.WithAllLitEntropyCompression(true), + ) + if err != nil { + cancel() + return nil, err + } + + writer = bufio.NewWriterSize(zstdWriter, 8192) + default: + writer = bufio.NewWriterSize(w, 8192) + } + out := &logger{ - writer: bufio.NewWriterSize(w, 8192), + writer: writer, fd: w, channel: make(chan *bytes.Buffer, defaultChanSize), cancel: cancel, diff --git a/pkg/store/store.go b/pkg/store/store.go index 8eeb3c3..92713c6 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -64,11 +64,12 @@ type Store interface { } type Config struct { - TestLogStatementsFile string - OracleLogStatementsFile string - MaxRetriesMutate int - MaxRetriesMutateSleep time.Duration - UseServerSideTimestamps bool + TestLogStatementsFile string + OracleLogStatementsFile string + LogStatementFileCompression stmtlogger.Compression + MaxRetriesMutate int + MaxRetriesMutateSleep time.Duration + UseServerSideTimestamps bool } func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut io.Writer, logger *zap.Logger) (Store, error) { @@ -78,7 +79,7 @@ func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig }, []string{"system", "method"}, ) - oracleStore, err := getStore("oracle", schema, oracleCluster, cfg, cfg.OracleLogStatementsFile, traceOut, logger, ops) + oracleStore, err := getStore("oracle", schema, oracleCluster, cfg, cfg.OracleLogStatementsFile, cfg.LogStatementFileCompression, traceOut, logger, ops) if err != nil { return nil, err } @@ -86,7 +87,7 @@ func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig if testCluster == nil { return nil, errors.New("test cluster is empty") } - testStore, err := getStore("test", schema, testCluster, cfg, cfg.TestLogStatementsFile, traceOut, logger, ops) + testStore, err := getStore("test", schema, testCluster, cfg, cfg.TestLogStatementsFile, cfg.LogStatementFileCompression, traceOut, logger, ops) if err != nil { return nil, err } @@ -263,6 +264,7 @@ func getStore( clusterConfig *gocql.ClusterConfig, cfg Config, stmtLogFile string, + compression stmtlogger.Compression, traceOut io.Writer, logger *zap.Logger, ops *prometheus.CounterVec, @@ -272,17 +274,17 @@ func getStore( system: name, }, nil } - oracleSession, err := newSession(clusterConfig, traceOut) + session, err := newSession(clusterConfig, traceOut) if err != nil { return nil, errors.Wrapf(err, "failed to connect to %s cluster", name) } - oracleFileLogger, err := stmtlogger.NewFileLogger(stmtLogFile) + oracleFileLogger, err := stmtlogger.NewFileLogger(stmtLogFile, compression) if err != nil { return nil, err } return &cqlStore{ - session: oracleSession, + session: session, schema: schema, system: name, ops: ops,