Skip to content

Commit

Permalink
feat(cli): add statement-log-file option
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Kropachev committed Dec 19, 2023
1 parent f60664b commit d4aed5b
Show file tree
Hide file tree
Showing 9 changed files with 329 additions and 102 deletions.
25 changes: 13 additions & 12 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ var (
requestTimeout time.Duration
connectTimeout time.Duration
profilingPort int
testStatementLogFile string
oracleStatementLogFile string
)

func interactive() bool {
Expand Down Expand Up @@ -133,14 +135,6 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
return schemaBuilder.Build(), nil
}

type createBuilder struct {
stmt string
}

func (cb createBuilder) ToCql() (stmt string, names []string) {
return cb.stmt, nil
}

func run(_ *cobra.Command, _ []string) error {
logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
Expand Down Expand Up @@ -219,6 +213,8 @@ func run(_ *cobra.Command, _ []string) error {
MaxRetriesMutate: maxRetriesMutate,
MaxRetriesMutateSleep: maxRetriesMutateSleep,
UseServerSideTimestamps: useServerSideTimestamps,
TestLogStatementsFile: testStatementLogFile,
OracleLogStatementsFile: oracleStatementLogFile,
}
var tracingFile *os.File
if tracingOutFile != "" {
Expand All @@ -243,22 +239,25 @@ func run(_ *cobra.Command, _ []string) error {
defer utils.IgnoreError(st.Close)

if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropSchema(schema) {
for _, stmt := range generators.GetDropKeyspace(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
}

testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
if err = st.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil {
if err = st.Create(
context.Background(),
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to create keyspace")
}

for _, stmt := range generators.GetCreateSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
return errors.Wrap(err, "unable to create schema")
}
}
Expand Down Expand Up @@ -531,6 +530,8 @@ func init() {
rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established")
rootCmd.Flags().IntVarP(&profilingPort, "profiling-port", "", 0, "If non-zero starts pprof profiler on given port at 'http://0.0.0.0:<port>/profile'")
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to")
rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to")
}

func printSetup(seed, schemaSeed uint64) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func GetCreateSchema(s *typedef.Schema) []string {
return stmts
}

func GetDropSchema(s *typedef.Schema) []string {
func GetDropKeyspace(s *typedef.Schema) []string {
return []string{
fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.Keyspace.Name),
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func ddl(
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
if err = s.Mutate(ctx, ddlStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -376,13 +376,11 @@ func mutation(
}
return err
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values

if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
if err = s.Mutate(ctx, mutateStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -425,7 +423,7 @@ func validation(
attempt := 1
for {
lastErr = err
err = s.Check(ctx, table, stmt.Query, attempt == maxAttempts, stmt.Values...)
err = s.Check(ctx, table, stmt, attempt == maxAttempts)

if err == nil {
if attempt > 1 {
Expand Down
135 changes: 135 additions & 0 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2019 ScyllaDB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtlogger

import (
"log"
"os"
"strconv"
"sync/atomic"
"time"

"github.com/pkg/errors"

"github.com/scylladb/gemini/pkg/typedef"
)

const (
defaultChanSize = 1000
errorsOnFileLimit = 5
)

type FileLogger struct {
fd *os.File
activeChannel atomic.Pointer[loggerChan]
channel loggerChan
filename string
isFileNonOperational bool
}

type loggerChan chan logRec

type logRec struct {
stmt *typedef.Stmt
ts time.Time
}

func (fl *FileLogger) LogStmt(stmt *typedef.Stmt) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
stmt: stmt,
}
}
}

func (fl *FileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
stmt: stmt,
ts: ts,
}
}
}

func (fl *FileLogger) Close() error {
return fl.fd.Close()
}

func (fl *FileLogger) committer() {
var err2 error

defer func() {
fl.activeChannel.Swap(nil)
close(fl.channel)
}()

errsAtRow := 0

for rec := range fl.channel {
if fl.isFileNonOperational {
continue
}

_, err1 := fl.fd.Write([]byte(rec.stmt.PrettyCQL()))
opType := rec.stmt.QueryType.OpType()
if rec.ts.IsZero() || !(opType == typedef.OpInsert || opType == typedef.OpUpdate || opType == typedef.OpDelete) {
_, err2 = fl.fd.Write([]byte(";\n"))
} else {
_, err2 = fl.fd.Write([]byte(" USING TIMESTAMP " + strconv.FormatInt(rec.ts.UnixNano()/1000, 10) + ";\n"))
}
if err2 == nil && err1 == nil {
errsAtRow = 0
continue
}

if errors.Is(err2, os.ErrClosed) || errors.Is(err1, os.ErrClosed) {
fl.isFileNonOperational = true
return
}

errsAtRow++
if errsAtRow > errorsOnFileLimit {
fl.isFileNonOperational = true
}

if err2 != nil {
err1 = err2
}
log.Printf("failed to write to file %q: %s", fl.filename, err1)
return
}
}

func NewFileLogger(filename string) (*FileLogger, error) {
if filename == "" {
return nil, nil
}
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}

out := &FileLogger{
filename: filename,
fd: fd,
channel: make(loggerChan, defaultChanSize),
}
out.activeChannel.Store(&out.channel)

go out.committer()
return out, nil
}
31 changes: 17 additions & 14 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/scylladb/gemini/pkg/typedef"
)

type cqlStore struct {
type cqlStore struct { //nolint:govet
session *gocql.Session
schema *typedef.Schema
ops *prometheus.CounterVec
Expand All @@ -39,20 +39,21 @@ type cqlStore struct {
maxRetriesMutate int
maxRetriesMutateSleep time.Duration
useServerSideTimestamps bool
stmtLogger stmtLogger
}

func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, builder, time.Now(), values...)
err = cs.doMutate(ctx, stmt, time.Now())
if err == nil {
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return nil
}
select {
Expand All @@ -67,14 +68,15 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in
return err
}

func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
queryBody, _ := builder.ToCql()

query := cs.session.Query(queryBody, values...).WithContext(ctx)
func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error {
queryBody, _ := stmt.Query.ToCql()
query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx)
if cs.useServerSideTimestamps {
query = query.DefaultTimestamp(false)
cs.stmtLogger.LogStmt(stmt)
} else {
query = query.WithTimestamp(ts.UnixNano() / 1000)
cs.stmtLogger.LogStmtWithTimeStamp(stmt, ts)
}

if err := query.Exec(); err != nil {
Expand All @@ -90,10 +92,11 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
return nil
}

func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
query, _ := builder.ToCql()
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]interface{}, err error) {
query, _ := stmt.Query.ToCql()
cs.stmtLogger.LogStmt(stmt)
iter := cs.session.Query(query, stmt.Values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return loadSet(iter), iter.Close()
}

Expand Down Expand Up @@ -126,8 +129,8 @@ func ignore(err error) bool {
}
}

func opType(builder qb.Builder) string {
switch builder.(type) {
func opType(stmt *typedef.Stmt) string {
switch stmt.Query.(type) {
case *qb.InsertBuilder:
return "insert"
case *qb.DeleteBuilder:
Expand Down
Loading

0 comments on commit d4aed5b

Please sign in to comment.