diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 3893c68..5ed2b95 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -105,6 +105,8 @@ var ( requestTimeout time.Duration connectTimeout time.Duration profilingPort int + testStatementLogFile string + oracleStatementLogFile string ) func interactive() bool { @@ -133,14 +135,6 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc return schemaBuilder.Build(), nil } -type createBuilder struct { - stmt string -} - -func (cb createBuilder) ToCql() (stmt string, names []string) { - return cb.stmt, nil -} - func run(_ *cobra.Command, _ []string) error { logger := createLogger(level) globalStatus := status.NewGlobalStatus(1000) @@ -219,6 +213,8 @@ func run(_ *cobra.Command, _ []string) error { MaxRetriesMutate: maxRetriesMutate, MaxRetriesMutateSleep: maxRetriesMutateSleep, UseServerSideTimestamps: useServerSideTimestamps, + TestLogStatementsFile: testStatementLogFile, + OracleLogStatementsFile: oracleStatementLogFile, } var tracingFile *os.File if tracingOutFile != "" { @@ -243,22 +239,25 @@ func run(_ *cobra.Command, _ []string) error { defer utils.IgnoreError(st.Close) if dropSchema && mode != jobs.ReadMode { - for _, stmt := range generators.GetDropSchema(schema) { + for _, stmt := range generators.GetDropKeyspace(schema) { logger.Debug(stmt) - if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { + if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil { return errors.Wrap(err, "unable to drop schema") } } } testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema) - if err = st.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil { + if err = st.Create( + context.Background(), + typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType), + typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil { return errors.Wrap(err, "unable to create keyspace") } for _, stmt := range generators.GetCreateSchema(schema) { logger.Debug(stmt) - if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil { + if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil { return errors.Wrap(err, "unable to create schema") } } @@ -531,6 +530,8 @@ func init() { rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established") rootCmd.Flags().IntVarP(&profilingPort, "profiling-port", "", 0, "If non-zero starts pprof profiler on given port at 'http://0.0.0.0:/profile'") rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end") + rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to") + rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to") } func printSetup(seed, schemaSeed uint64) { diff --git a/pkg/generators/statement_generator.go b/pkg/generators/statement_generator.go index 6d2cb84..9aca0e7 100644 --- a/pkg/generators/statement_generator.go +++ b/pkg/generators/statement_generator.go @@ -140,7 +140,7 @@ func GetCreateSchema(s *typedef.Schema) []string { return stmts } -func GetDropSchema(s *typedef.Schema) []string { +func GetDropKeyspace(s *typedef.Schema) []string { return []string{ fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.Keyspace.Name), } diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index 3de54e9..ac21c09 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -329,7 +329,7 @@ func ddl( if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil { w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) } - if err = s.Mutate(ctx, ddlStmt.Query); err != nil { + if err = s.Mutate(ctx, ddlStmt); err != nil { if errors.Is(err, context.Canceled) { return nil } @@ -376,13 +376,11 @@ func mutation( } return err } - mutateQuery := mutateStmt.Query - mutateValues := mutateStmt.Values if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil { w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) } - if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil { + if err = s.Mutate(ctx, mutateStmt); err != nil { if errors.Is(err, context.Canceled) { return nil } @@ -425,7 +423,7 @@ func validation( attempt := 1 for { lastErr = err - err = s.Check(ctx, table, stmt.Query, attempt == maxAttempts, stmt.Values...) + err = s.Check(ctx, table, stmt, attempt == maxAttempts) if err == nil { if attempt > 1 { diff --git a/pkg/stmtlogger/filelogger.go b/pkg/stmtlogger/filelogger.go new file mode 100644 index 0000000..5087556 --- /dev/null +++ b/pkg/stmtlogger/filelogger.go @@ -0,0 +1,135 @@ +// Copyright 2019 ScyllaDB +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package stmtlogger + +import ( + "log" + "os" + "strconv" + "sync/atomic" + "time" + + "github.com/pkg/errors" + + "github.com/scylladb/gemini/pkg/typedef" +) + +const ( + defaultChanSize = 1000 + errorsOnFileLimit = 5 +) + +type FileLogger struct { + fd *os.File + activeChannel atomic.Pointer[loggerChan] + channel loggerChan + filename string + isFileNonOperational bool +} + +type loggerChan chan logRec + +type logRec struct { + stmt *typedef.Stmt + ts time.Time +} + +func (fl *FileLogger) LogStmt(stmt *typedef.Stmt) { + ch := fl.activeChannel.Load() + if ch != nil { + *ch <- logRec{ + stmt: stmt, + } + } +} + +func (fl *FileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) { + ch := fl.activeChannel.Load() + if ch != nil { + *ch <- logRec{ + stmt: stmt, + ts: ts, + } + } +} + +func (fl *FileLogger) Close() error { + return fl.fd.Close() +} + +func (fl *FileLogger) committer() { + var err2 error + + defer func() { + fl.activeChannel.Swap(nil) + close(fl.channel) + }() + + errsAtRow := 0 + + for rec := range fl.channel { + if fl.isFileNonOperational { + continue + } + + _, err1 := fl.fd.Write([]byte(rec.stmt.PrettyCQL())) + opType := rec.stmt.QueryType.OpType() + if rec.ts.IsZero() || !(opType == typedef.OpInsert || opType == typedef.OpUpdate || opType == typedef.OpDelete) { + _, err2 = fl.fd.Write([]byte(";\n")) + } else { + _, err2 = fl.fd.Write([]byte(" USING TIMESTAMP " + strconv.FormatInt(rec.ts.UnixNano()/1000, 10) + ";\n")) + } + if err2 == nil && err1 == nil { + errsAtRow = 0 + continue + } + + if errors.Is(err2, os.ErrClosed) || errors.Is(err1, os.ErrClosed) { + fl.isFileNonOperational = true + return + } + + errsAtRow++ + if errsAtRow > errorsOnFileLimit { + fl.isFileNonOperational = true + } + + if err2 != nil { + err1 = err2 + } + log.Printf("failed to write to file %q: %s", fl.filename, err1) + return + } +} + +func NewFileLogger(filename string) (*FileLogger, error) { + if filename == "" { + return nil, nil + } + fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + return nil, err + } + + out := &FileLogger{ + filename: filename, + fd: fd, + channel: make(loggerChan, defaultChanSize), + } + out.activeChannel.Store(&out.channel) + + go out.committer() + return out, nil +} diff --git a/pkg/store/cqlstore.go b/pkg/store/cqlstore.go index 82a9986..c756e07 100644 --- a/pkg/store/cqlstore.go +++ b/pkg/store/cqlstore.go @@ -30,7 +30,7 @@ import ( "github.com/scylladb/gemini/pkg/typedef" ) -type cqlStore struct { +type cqlStore struct { //nolint:govet session *gocql.Session schema *typedef.Schema ops *prometheus.CounterVec @@ -39,20 +39,21 @@ type cqlStore struct { maxRetriesMutate int maxRetriesMutateSleep time.Duration useServerSideTimestamps bool + stmtLogger stmtLogger } func (cs *cqlStore) name() string { return cs.system } -func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) { +func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) { var i int for i = 0; i < cs.maxRetriesMutate; i++ { // retry with new timestamp as list modification with the same ts // will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937 - err = cs.doMutate(ctx, builder, time.Now(), values...) + err = cs.doMutate(ctx, stmt, time.Now()) if err == nil { - cs.ops.WithLabelValues(cs.system, opType(builder)).Inc() + cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() return nil } select { @@ -67,14 +68,15 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in return err } -func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error { - queryBody, _ := builder.ToCql() - - query := cs.session.Query(queryBody, values...).WithContext(ctx) +func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error { + queryBody, _ := stmt.Query.ToCql() + query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx) if cs.useServerSideTimestamps { query = query.DefaultTimestamp(false) + cs.stmtLogger.LogStmt(stmt) } else { query = query.WithTimestamp(ts.UnixNano() / 1000) + cs.stmtLogger.LogStmtWithTimeStamp(stmt, ts) } if err := query.Exec(); err != nil { @@ -90,10 +92,11 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti return nil } -func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) { - query, _ := builder.ToCql() - iter := cs.session.Query(query, values...).WithContext(ctx).Iter() - cs.ops.WithLabelValues(cs.system, opType(builder)).Inc() +func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]interface{}, err error) { + query, _ := stmt.Query.ToCql() + cs.stmtLogger.LogStmt(stmt) + iter := cs.session.Query(query, stmt.Values...).WithContext(ctx).Iter() + cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc() return loadSet(iter), iter.Close() } @@ -126,8 +129,8 @@ func ignore(err error) bool { } } -func opType(builder qb.Builder) string { - switch builder.(type) { +func opType(stmt *typedef.Stmt) string { + switch stmt.Query.(type) { case *qb.InsertBuilder: return "insert" case *qb.DeleteBuilder: diff --git a/pkg/store/store.go b/pkg/store/store.go index ed5fd98..d9890ad 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -32,20 +32,21 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - "github.com/scylladb/go-set/strset" - "github.com/scylladb/gocqlx/v2/qb" "go.uber.org/multierr" "gopkg.in/inf.v0" + "github.com/scylladb/go-set/strset" + + "github.com/scylladb/gemini/pkg/stmtlogger" "github.com/scylladb/gemini/pkg/typedef" ) type loader interface { - load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) + load(context.Context, *typedef.Stmt) ([]map[string]interface{}, error) } type storer interface { - mutate(context.Context, qb.Builder, ...interface{}) error + mutate(context.Context, *typedef.Stmt) error } type storeLoader interface { @@ -55,14 +56,22 @@ type storeLoader interface { name() string } +type stmtLogger interface { + LogStmt(*typedef.Stmt) + LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) + Close() error +} + type Store interface { - Create(context.Context, qb.Builder, qb.Builder) error - Mutate(context.Context, qb.Builder, ...interface{}) error - Check(context.Context, *typedef.Table, qb.Builder, bool, ...interface{}) error + Create(context.Context, *typedef.Stmt, *typedef.Stmt) error + Mutate(context.Context, *typedef.Stmt) error + Check(context.Context, *typedef.Table, *typedef.Stmt, bool) error Close() error } type Config struct { + TestLogStatementsFile string + OracleLogStatementsFile string MaxRetriesMutate int MaxRetriesMutateSleep time.Duration UseServerSideTimestamps bool @@ -75,48 +84,23 @@ func New(schema *typedef.Schema, testCluster, oracleCluster *gocql.ClusterConfig }, []string{"system", "method"}, ) - var oracleStore storeLoader - var validations bool - if oracleCluster != nil { - oracleSession, err := newSession(oracleCluster, traceOut) - if err != nil { - return nil, errors.Wrapf(err, "failed to connect to oracle cluster") - } - oracleStore = &cqlStore{ - session: oracleSession, - schema: schema, - system: "oracle", - ops: ops, - maxRetriesMutate: cfg.MaxRetriesMutate + 10, - maxRetriesMutateSleep: cfg.MaxRetriesMutateSleep, - useServerSideTimestamps: cfg.UseServerSideTimestamps, - logger: logger, - } - validations = true - } else { - oracleStore = &noOpStore{ - system: "oracle", - } + oracleStore, err := getStore("oracle", schema, oracleCluster, cfg, cfg.OracleLogStatementsFile, traceOut, logger, ops) + if err != nil { + return nil, err } - testSession, err := newSession(testCluster, traceOut) + if testCluster == nil { + return nil, errors.New("test cluster is empty") + } + testStore, err := getStore("test", schema, testCluster, cfg, cfg.TestLogStatementsFile, traceOut, logger, ops) if err != nil { - return nil, errors.Wrapf(err, "failed to connect to oracle cluster") + return nil, err } return &delegatingStore{ - testStore: &cqlStore{ - session: testSession, - schema: schema, - system: "test", - ops: ops, - maxRetriesMutate: cfg.MaxRetriesMutate, - maxRetriesMutateSleep: cfg.MaxRetriesMutateSleep, - useServerSideTimestamps: cfg.UseServerSideTimestamps, - logger: logger, - }, + testStore: testStore, oracleStore: oracleStore, - validations: validations, + validations: oracleStore != nil, logger: logger.Named("delegating_store"), }, nil } @@ -125,11 +109,11 @@ type noOpStore struct { system string } -func (n *noOpStore) mutate(context.Context, qb.Builder, ...interface{}) error { +func (n *noOpStore) mutate(context.Context, *typedef.Stmt) error { return nil } -func (n *noOpStore) load(context.Context, qb.Builder, []interface{}) ([]map[string]interface{}, error) { +func (n *noOpStore) load(context.Context, *typedef.Stmt) ([]map[string]interface{}, error) { return nil, nil } @@ -146,31 +130,39 @@ func (n *noOpStore) close() error { } type delegatingStore struct { - oracleStore storeLoader - testStore storeLoader - logger *zap.Logger - validations bool + oracleStore storeLoader + testStore storeLoader + statementLogger stmtLogger + logger *zap.Logger + validations bool } -func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder qb.Builder) error { - if err := mutate(ctx, ds.oracleStore, oracleBuilder, []interface{}{}); err != nil { +func (ds delegatingStore) Create(ctx context.Context, testBuilder, oracleBuilder *typedef.Stmt) error { + if ds.statementLogger != nil { + ds.statementLogger.LogStmt(testBuilder) + } + if err := mutate(ctx, ds.oracleStore, oracleBuilder); err != nil { return errors.Wrap(err, "oracle failed store creation") } - if err := mutate(ctx, ds.testStore, testBuilder, []interface{}{}); err != nil { + if err := mutate(ctx, ds.testStore, testBuilder); err != nil { return errors.Wrap(err, "test failed store creation") } return nil } -func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error { +func (ds delegatingStore) Mutate(ctx context.Context, stmt *typedef.Stmt) error { var testErr error var wg sync.WaitGroup wg.Add(1) + go func() { - testErr = mutate(ctx, ds.testStore, builder, values...) - wg.Done() + defer wg.Done() + testErr = errors.Wrapf( + ds.testStore.mutate(ctx, stmt), + "unable to apply mutations to the %s store", ds.testStore.name()) }() - if oracleErr := mutate(ctx, ds.oracleStore, builder, values...); oracleErr != nil { + + if oracleErr := ds.oracleStore.mutate(ctx, stmt); oracleErr != nil { // Oracle failed, transition cannot take place ds.logger.Info("oracle store failed mutation, transition to next state impossible so continuing with next mutation", zap.Error(oracleErr)) return oracleErr @@ -184,23 +176,24 @@ func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values return nil } -func mutate(ctx context.Context, s storeLoader, builder qb.Builder, values ...interface{}) error { - if err := s.mutate(ctx, builder, values...); err != nil { +func mutate(ctx context.Context, s storeLoader, stmt *typedef.Stmt) error { + if err := s.mutate(ctx, stmt); err != nil { return errors.Wrapf(err, "unable to apply mutations to the %s store", s.name()) } return nil } -func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, builder qb.Builder, detailedDiff bool, values ...interface{}) error { +func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, stmt *typedef.Stmt, detailedDiff bool) error { var testRows, oracleRows []map[string]interface{} var testErr, oracleErr error var wg sync.WaitGroup wg.Add(1) + go func() { - testRows, testErr = ds.testStore.load(ctx, builder, values) + testRows, testErr = ds.testStore.load(ctx, stmt) wg.Done() }() - oracleRows, oracleErr = ds.oracleStore.load(ctx, builder, values) + oracleRows, oracleErr = ds.oracleStore.load(ctx, stmt) if oracleErr != nil { return errors.Wrapf(oracleErr, "unable to load check data from the oracle store") } @@ -257,7 +250,47 @@ func (ds delegatingStore) Check(ctx context.Context, table *typedef.Table, build } func (ds delegatingStore) Close() (err error) { + if ds.statementLogger != nil { + err = multierr.Append(err, ds.statementLogger.Close()) + } err = multierr.Append(err, ds.testStore.close()) err = multierr.Append(err, ds.oracleStore.close()) return } + +func getStore( + name string, + schema *typedef.Schema, + clusterConfig *gocql.ClusterConfig, + cfg Config, + stmtLogFile string, + traceOut *os.File, + logger *zap.Logger, + ops *prometheus.CounterVec, +) (out storeLoader, err error) { + if clusterConfig == nil { + return &noOpStore{ + system: name, + }, nil + } + oracleSession, err := newSession(clusterConfig, traceOut) + if err != nil { + return nil, errors.Wrapf(err, "failed to connect to %s cluster", name) + } + oracleFileLogger, err := stmtlogger.NewFileLogger(stmtLogFile) + if err != nil { + return nil, err + } + + return &cqlStore{ + session: oracleSession, + schema: schema, + system: name, + ops: ops, + maxRetriesMutate: cfg.MaxRetriesMutate + 10, + maxRetriesMutateSleep: cfg.MaxRetriesMutateSleep, + useServerSideTimestamps: cfg.UseServerSideTimestamps, + logger: logger.Named(name), + stmtLogger: oracleFileLogger, + }, nil +} diff --git a/pkg/typedef/const.go b/pkg/typedef/const.go index b61befe..93e7e1e 100644 --- a/pkg/typedef/const.go +++ b/pkg/typedef/const.go @@ -36,6 +36,19 @@ const ( AlterColumnStatementType DropColumnStatementType AddColumnStatementType + DropKeyspaceStatementType + CreateKeyspaceStatementType + CreateSchemaStatementType +) + +const ( + OpSelect OpType = iota + OpInsert + OpUpdate + OpDelete + OpSchemaAlter + OpSchemaDrop + OpSchemaCreate ) //nolint:revive diff --git a/pkg/typedef/interfaces.go b/pkg/typedef/interfaces.go index 5e79e45..f12d26f 100644 --- a/pkg/typedef/interfaces.go +++ b/pkg/typedef/interfaces.go @@ -33,6 +33,11 @@ type Type interface { CQLType() gocql.TypeInfo } +type Statement interface { + ToCql() (stmt string, names []string) + PrettyCQL() string +} + type Types []Type func (l Types) LenValue() int { diff --git a/pkg/typedef/typedef.go b/pkg/typedef/typedef.go index 44afd03..9194e98 100644 --- a/pkg/typedef/typedef.go +++ b/pkg/typedef/typedef.go @@ -24,6 +24,9 @@ import ( ) type ( + CQLFeature int + OpType uint8 + ValueWithToken struct { Value Values Token uint64 @@ -48,20 +51,26 @@ type ( UseLWT bool } - CQLFeature int + Stmts struct { + PostStmtHook func() + List []*Stmt + QueryType StatementType + } + + StmtCache struct { + Query qb.Builder + Types Types + QueryType StatementType + LenValue int + } ) -type Stmts struct { - PostStmtHook func() - List []*Stmt - QueryType StatementType +type SimpleQuery struct { + query string } -type StmtCache struct { - Query qb.Builder - Types Types - QueryType StatementType - LenValue int +func (q SimpleQuery) ToCql() (stmt string, names []string) { + return q.query, nil } type Stmt struct { @@ -70,6 +79,15 @@ type Stmt struct { Values Values } +func SimpleStmt(query string, queryType StatementType) *Stmt { + return &Stmt{ + StmtCache: &StmtCache{ + Query: SimpleQuery{query}, + QueryType: queryType, + }, + } +} + func (s *Stmt) PrettyCQL() string { query, _ := s.Query.ToCql() values := s.Values.Copy() @@ -110,6 +128,27 @@ func (st StatementType) ToString() string { } } +func (st StatementType) OpType() OpType { + switch st { + case SelectStatementType, SelectRangeStatementType, SelectByIndexStatementType, SelectFromMaterializedViewStatementType: + return OpSelect + case InsertStatementType, InsertJSONStatementType: + return OpInsert + case UpdateStatementType: + return OpUpdate + case DeleteStatementType: + return OpDelete + case AlterColumnStatementType, DropColumnStatementType, AddColumnStatementType: + return OpSchemaAlter + case DropKeyspaceStatementType: + return OpSchemaDrop + case CreateKeyspaceStatementType, CreateSchemaStatementType: + return OpSchemaCreate + default: + panic(fmt.Sprintf("unknown statement type %d", st)) + } +} + func (st StatementType) PossibleAsyncOperation() bool { switch st { case SelectByIndexStatementType, SelectFromMaterializedViewStatementType: