Skip to content

Commit

Permalink
feature(compression): add zstd log compression to statement log files
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Malusev <[email protected]>
  • Loading branch information
CodeLieutenant committed Jan 21, 2025
1 parent 1157b2c commit 9797681
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 21 deletions.
7 changes: 5 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ DOCKER_VERSION ?= latest
GOLANGCI_VERSION ?= 1.62.0

CQL_FEATURES ?= normal
CONCURRENCY ?= 1
CONCURRENCY ?= 4
DURATION ?= 10m
WARMUP ?= 0
MODE ?= mixed
Expand Down Expand Up @@ -42,7 +42,10 @@ GEMINI_FLAGS ?= --fail-fast \
--duration=$(DURATION) \
--warmup=$(WARMUP) \
--profiling-port=6060 \
--drop-schema=true
--drop-schema=true \
--oracle-statement-log-file=$(PWD)/results/oracle-statements.log.zst \
--test-statement-log-file=$(PWD)/results/test-statements.log.zst \
--statement-log-file-compression=zstd


ifndef GOBIN
Expand Down
22 changes: 17 additions & 5 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"encoding/json"
"fmt"
"github.com/scylladb/gemini/pkg/stmtlogger"
"log"
"math"
"net/http"
Expand Down Expand Up @@ -108,6 +109,7 @@ var (
profilingPort int
testStatementLogFile string
oracleStatementLogFile string
statementLogFileCompression string
)

func interactive() bool {
Expand Down Expand Up @@ -214,11 +216,12 @@ func run(_ *cobra.Command, _ []string) error {

testCluster, oracleCluster := createClusters(cons, testHostSelectionPolicy, oracleHostSelectionPolicy, logger)
storeConfig := store.Config{
MaxRetriesMutate: maxRetriesMutate,
MaxRetriesMutateSleep: maxRetriesMutateSleep,
UseServerSideTimestamps: useServerSideTimestamps,
TestLogStatementsFile: testStatementLogFile,
OracleLogStatementsFile: oracleStatementLogFile,
MaxRetriesMutate: maxRetriesMutate,
MaxRetriesMutateSleep: maxRetriesMutateSleep,
UseServerSideTimestamps: useServerSideTimestamps,
TestLogStatementsFile: testStatementLogFile,
OracleLogStatementsFile: oracleStatementLogFile,
LogStatementFileCompression: getLogStatementFileCompression(statementLogFileCompression),
}
var tracingFile *os.File
if tracingOutFile != "" {
Expand Down Expand Up @@ -411,6 +414,14 @@ func createClusters(
return testCluster, oracleCluster
}

func getLogStatementFileCompression(input string) stmtlogger.Compression {
switch input {

Check failure on line 418 in cmd/gemini/root.go

View workflow job for this annotation

GitHub Actions / Lint Test and Build

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case "zstd":
return stmtlogger.ZSTD
}
return stmtlogger.NoCompression
}

func getReplicationStrategy(rs string, fallback *replication.Replication, logger *zap.Logger) *replication.Replication {
switch rs {
case "network":
Expand Down Expand Up @@ -538,6 +549,7 @@ func init() {
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to")
rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to")
rootCmd.Flags().StringVarP(&statementLogFileCompression, "statement-log-file-compression", "", "zstd", "Compression algorithm to use for statement log files")
}

func printSetup(seed, schemaSeed uint64) {
Expand Down
33 changes: 29 additions & 4 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bufio"
"bytes"
"context"
"github.com/klauspost/compress/zstd"
"io"
"log"
"os"
Expand Down Expand Up @@ -53,9 +54,16 @@ type (
wg sync.WaitGroup
active atomic.Bool
}

Compression int
)

const (
NoCompression Compression = iota
ZSTD
)

func NewFileLogger(filename string) (StmtToFile, error) {
func NewFileLogger(filename string, compression Compression) (StmtToFile, error) {
if filename == "" {
return &nopFileLogger{}, nil
}
Expand All @@ -65,14 +73,31 @@ func NewFileLogger(filename string) (StmtToFile, error) {
return nil, err
}

return NewLogger(fd)
return NewLogger(fd, compression)
}

func NewLogger(w io.Writer) (StmtToFile, error) {
func NewLogger(w io.Writer, compression Compression) (StmtToFile, error) {
ctx, cancel := context.WithCancel(context.Background())

var writer *bufio.Writer
switch compression {
case ZSTD:
zstdWriter, err := zstd.NewWriter(w,
zstd.WithEncoderLevel(zstd.SpeedFastest),
zstd.WithAllLitEntropyCompression(true),
)
if err != nil {
cancel()
return nil, err
}

writer = bufio.NewWriterSize(zstdWriter, 8192)
default:
writer = bufio.NewWriterSize(w, 8192)
}

out := &logger{
writer: bufio.NewWriterSize(w, 8192),
writer: writer,
fd: w,
channel: make(chan *bytes.Buffer, defaultChanSize),
cancel: cancel,
Expand Down
22 changes: 12 additions & 10 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type Store interface {
}

type Config struct {
TestLogStatementsFile string
OracleLogStatementsFile string
MaxRetriesMutate int
MaxRetriesMutateSleep time.Duration
UseServerSideTimestamps bool
TestLogStatementsFile string
OracleLogStatementsFile string
LogStatementFileCompression stmtlogger.Compression
MaxRetriesMutate int
MaxRetriesMutateSleep time.Duration
UseServerSideTimestamps bool
}

func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig, cfg Config, traceOut io.Writer, logger *zap.Logger) (Store, error) {
Expand All @@ -78,15 +79,15 @@ func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig
}, []string{"system", "method"},
)

oracleStore, err := getStore("oracle", schema, oracleCluster, cfg, cfg.OracleLogStatementsFile, traceOut, logger, ops)
oracleStore, err := getStore("oracle", schema, oracleCluster, cfg, cfg.OracleLogStatementsFile, cfg.LogStatementFileCompression, traceOut, logger, ops)
if err != nil {
return nil, err
}

if testCluster == nil {
return nil, errors.New("test cluster is empty")
}
testStore, err := getStore("test", schema, testCluster, cfg, cfg.TestLogStatementsFile, traceOut, logger, ops)
testStore, err := getStore("test", schema, testCluster, cfg, cfg.TestLogStatementsFile, cfg.LogStatementFileCompression, traceOut, logger, ops)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -263,6 +264,7 @@ func getStore(
clusterConfig *gocql.ClusterConfig,
cfg Config,
stmtLogFile string,
compression stmtlogger.Compression,
traceOut io.Writer,
logger *zap.Logger,
ops *prometheus.CounterVec,
Expand All @@ -272,17 +274,17 @@ func getStore(
system: name,
}, nil
}
oracleSession, err := newSession(clusterConfig, traceOut)
session, err := newSession(clusterConfig, traceOut)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect to %s cluster", name)
}
oracleFileLogger, err := stmtlogger.NewFileLogger(stmtLogFile)
oracleFileLogger, err := stmtlogger.NewFileLogger(stmtLogFile, compression)
if err != nil {
return nil, err
}

return &cqlStore{
session: oracleSession,
session: session,
schema: schema,
system: name,
ops: ops,
Expand Down

0 comments on commit 9797681

Please sign in to comment.