Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): add statement-log-file option #413

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ var (
requestTimeout time.Duration
connectTimeout time.Duration
profilingPort int
testStatementLogFile string
oracleStatementLogFile string
)

func interactive() bool {
Expand Down Expand Up @@ -133,14 +135,6 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
return schemaBuilder.Build(), nil
}

type createBuilder struct {
stmt string
}

func (cb createBuilder) ToCql() (stmt string, names []string) {
return cb.stmt, nil
}

func run(_ *cobra.Command, _ []string) error {
logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
Expand Down Expand Up @@ -219,6 +213,8 @@ func run(_ *cobra.Command, _ []string) error {
MaxRetriesMutate: maxRetriesMutate,
MaxRetriesMutateSleep: maxRetriesMutateSleep,
UseServerSideTimestamps: useServerSideTimestamps,
TestLogStatementsFile: testStatementLogFile,
OracleLogStatementsFile: oracleStatementLogFile,
}
var tracingFile *os.File
if tracingOutFile != "" {
Expand All @@ -243,22 +239,25 @@ func run(_ *cobra.Command, _ []string) error {
defer utils.IgnoreError(st.Close)

if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropSchema(schema) {
for _, stmt := range generators.GetDropKeyspace(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
}

testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
if err = st.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil {
if err = st.Create(
context.Background(),
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to create keyspace")
}

for _, stmt := range generators.GetCreateSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
return errors.Wrap(err, "unable to create schema")
}
}
Expand Down Expand Up @@ -531,6 +530,8 @@ func init() {
rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established")
rootCmd.Flags().IntVarP(&profilingPort, "profiling-port", "", 0, "If non-zero starts pprof profiler on given port at 'http://0.0.0.0:<port>/profile'")
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
rootCmd.Flags().StringVarP(&testStatementLogFile, "test-statement-log-file", "", "", "File to write statements flow to")
rootCmd.Flags().StringVarP(&oracleStatementLogFile, "oracle-statement-log-file", "", "", "File to write statements flow to")
}

func printSetup(seed, schemaSeed uint64) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func GetCreateSchema(s *typedef.Schema) []string {
return stmts
}

func GetDropSchema(s *typedef.Schema) []string {
func GetDropKeyspace(s *typedef.Schema) []string {
return []string{
fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.Keyspace.Name),
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func ddl(
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
if err = s.Mutate(ctx, ddlStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -376,13 +376,11 @@ func mutation(
}
return err
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values

if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
if err = s.Mutate(ctx, mutateStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -425,7 +423,7 @@ func validation(
attempt := 1
for {
lastErr = err
err = s.Check(ctx, table, stmt.Query, attempt == maxAttempts, stmt.Values...)
err = s.Check(ctx, table, stmt, attempt == maxAttempts)

if err == nil {
if attempt > 1 {
Expand Down
135 changes: 135 additions & 0 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2019 ScyllaDB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtlogger

import (
"log"
"os"
"strconv"
"sync/atomic"
"time"

"github.com/pkg/errors"

"github.com/scylladb/gemini/pkg/typedef"
)

const (
defaultChanSize = 1000
errorsOnFileLimit = 5
)

type FileLogger struct {
fd *os.File
activeChannel atomic.Pointer[loggerChan]
channel loggerChan
filename string
isFileNonOperational bool
}

type loggerChan chan logRec

type logRec struct {
stmt *typedef.Stmt
ts time.Time
}

func (fl *FileLogger) LogStmt(stmt *typedef.Stmt) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
stmt: stmt,
}
}
}

func (fl *FileLogger) LogStmtWithTimeStamp(stmt *typedef.Stmt, ts time.Time) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- logRec{
stmt: stmt,
ts: ts,
}
}
}

func (fl *FileLogger) Close() error {
return fl.fd.Close()
}

func (fl *FileLogger) committer() {
var err2 error

defer func() {
fl.activeChannel.Swap(nil)
close(fl.channel)
}()

errsAtRow := 0

for rec := range fl.channel {
if fl.isFileNonOperational {
continue
}

_, err1 := fl.fd.Write([]byte(rec.stmt.PrettyCQL()))
opType := rec.stmt.QueryType.OpType()
if rec.ts.IsZero() || !(opType == typedef.OpInsert || opType == typedef.OpUpdate || opType == typedef.OpDelete) {
_, err2 = fl.fd.Write([]byte(";\n"))
} else {
_, err2 = fl.fd.Write([]byte(" USING TIMESTAMP " + strconv.FormatInt(rec.ts.UnixNano()/1000, 10) + ";\n"))
}
if err2 == nil && err1 == nil {
errsAtRow = 0
continue
}

if errors.Is(err2, os.ErrClosed) || errors.Is(err1, os.ErrClosed) {
fl.isFileNonOperational = true
return
}

errsAtRow++
if errsAtRow > errorsOnFileLimit {
fl.isFileNonOperational = true
}

if err2 != nil {
err1 = err2
}
log.Printf("failed to write to file %q: %s", fl.filename, err1)
return
}
}

func NewFileLogger(filename string) (*FileLogger, error) {
if filename == "" {
return nil, nil
}
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}

out := &FileLogger{
filename: filename,
fd: fd,
channel: make(loggerChan, defaultChanSize),
}
out.activeChannel.Store(&out.channel)

go out.committer()
return out, nil
}
31 changes: 17 additions & 14 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/scylladb/gemini/pkg/typedef"
)

type cqlStore struct {
type cqlStore struct { //nolint:govet
session *gocql.Session
schema *typedef.Schema
ops *prometheus.CounterVec
Expand All @@ -39,20 +39,21 @@ type cqlStore struct {
maxRetriesMutate int
maxRetriesMutateSleep time.Duration
useServerSideTimestamps bool
stmtLogger stmtLogger
}

func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, builder, time.Now(), values...)
err = cs.doMutate(ctx, stmt, time.Now())
if err == nil {
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return nil
}
select {
Expand All @@ -67,14 +68,15 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in
return err
}

func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
queryBody, _ := builder.ToCql()

query := cs.session.Query(queryBody, values...).WithContext(ctx)
func (cs *cqlStore) doMutate(ctx context.Context, stmt *typedef.Stmt, ts time.Time) error {
queryBody, _ := stmt.Query.ToCql()
query := cs.session.Query(queryBody, stmt.Values...).WithContext(ctx)
if cs.useServerSideTimestamps {
query = query.DefaultTimestamp(false)
cs.stmtLogger.LogStmt(stmt)
} else {
query = query.WithTimestamp(ts.UnixNano() / 1000)
cs.stmtLogger.LogStmtWithTimeStamp(stmt, ts)
}

if err := query.Exec(); err != nil {
Expand All @@ -90,10 +92,11 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
return nil
}

func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
query, _ := builder.ToCql()
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]interface{}, err error) {
query, _ := stmt.Query.ToCql()
cs.stmtLogger.LogStmt(stmt)
iter := cs.session.Query(query, stmt.Values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return loadSet(iter), iter.Close()
}

Expand Down Expand Up @@ -126,8 +129,8 @@ func ignore(err error) bool {
}
}

func opType(builder qb.Builder) string {
switch builder.(type) {
func opType(stmt *typedef.Stmt) string {
switch stmt.Query.(type) {
case *qb.InsertBuilder:
return "insert"
case *qb.DeleteBuilder:
Expand Down
Loading
Loading