diff --git a/.run/Run Gemini.run.xml b/.run/Run Gemini.run.xml new file mode 100644 index 00000000..3e4f4ec2 --- /dev/null +++ b/.run/Run Gemini.run.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 0b13d7ad..fbfbcebb 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -292,16 +292,16 @@ func run(_ *cobra.Command, _ []string) error { } if warmup > 0 && !stopFlag.IsHardOrSoft() { - jobsList := jobs.ListFromMode(jobs.WarmupMode, warmup, concurrency) - if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil { + jobsList := jobs.New(jobs.WarmupMode, warmup, concurrency) + if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil { logger.Error("warmup encountered an error", zap.Error(err)) stopFlag.SetHard(true) } } if !stopFlag.IsHardOrSoft() { - jobsList := jobs.ListFromMode(mode, duration, concurrency) - if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil { + jobsList := jobs.New(mode, duration, concurrency) + if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil { logger.Debug("error detected", zap.Error(err)) } } @@ -473,7 +473,7 @@ func init() { rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run") rootCmd.Flags().BoolVarP(&failFast, "fail-fast", "f", false, "Stop on the first failure") - rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Run in non-interactive mode (disable progress indicator)") + rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Statement in non-interactive mode (disable progress indicator)") rootCmd.Flags().DurationVarP(&duration, "duration", "", 30*time.Second, "") rootCmd.Flags().StringVarP(&outFileArg, "outfile", "", "", "Specify the name of the file where the results should go") rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'") @@ -497,7 +497,7 @@ func init() { rootCmd.Flags().IntVarP(&minColumns, "min-columns", "", 8, "Minimum number of generated columns") rootCmd.Flags().StringVarP(&datasetSize, "dataset-size", "", "large", "Specify the type of dataset size to use, small|large") rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "basic", "Specify the type of cql features to use, basic|normal|all") - rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Run gemini with materialized views support") + rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Statement gemini with materialized views support") rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal") rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") rootCmd.Flags().DurationVarP( diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 500fc759..699dcd8b 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -37,12 +37,11 @@ type TokenIndex uint64 type DistributionFunc func() TokenIndex -type GeneratorInterface interface { +type Interface interface { Get() *typedef.ValueWithToken GetOld() *typedef.ValueWithToken - GiveOld(_ *typedef.ValueWithToken) - GiveOlds(_ []*typedef.ValueWithToken) - ReleaseToken(_ uint64) + GiveOld(...*typedef.ValueWithToken) + ReleaseToken(uint64) } type Generator struct { @@ -118,15 +117,10 @@ func (g *Generator) GetOld() *typedef.ValueWithToken { return targetPart.getOld() } -// GiveOld returns the supplied value for later reuse unless -func (g *Generator) GiveOld(v *typedef.ValueWithToken) { - g.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v) -} - -// GiveOlds returns the supplied values for later reuse unless -func (g *Generator) GiveOlds(tokens []*typedef.ValueWithToken) { +// GiveOld returns the supplied value for later reuse +func (g *Generator) GiveOld(tokens ...*typedef.ValueWithToken) { for _, token := range tokens { - g.GiveOld(token) + g.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token) } } diff --git a/pkg/generators/statement_generator.go b/pkg/generators/statement_generator.go index 25503a89..1374dec9 100644 --- a/pkg/generators/statement_generator.go +++ b/pkg/generators/statement_generator.go @@ -87,7 +87,7 @@ func genTable(sc typedef.SchemaConfig, tableName string, r *rand.Rand) *typedef. table.Indexes = indexes var mvs []typedef.MaterializedView - if sc.CQLFeature > typedef.CQL_FEATURE_BASIC && sc.UseMaterializedViews && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 { + if sc.UseMaterializedViews && sc.CQLFeature > typedef.CQL_FEATURE_BASIC && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 { mvs = CreateMaterializedViews(columns, table.Name, partitionKeys, clusteringKeys, r) } diff --git a/pkg/jobs/gen_check_stmt.go b/pkg/generators/statements/gen_check_stmt.go similarity index 77% rename from pkg/jobs/gen_check_stmt.go rename to pkg/generators/statements/gen_check_stmt.go index 3822b409..bd406ce2 100644 --- a/pkg/jobs/gen_check_stmt.go +++ b/pkg/generators/statements/gen_check_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "math" @@ -28,94 +28,105 @@ import ( func GenCheckStmt( s *typedef.Schema, table *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, rnd *rand.Rand, p *typedef.PartitionRangeConfig, -) *typedef.Stmt { - n := 0 - mvNum := -1 - maxClusteringRels := 0 - numQueryPKs := 0 - if len(table.MaterializedViews) > 0 && rnd.Int()%2 == 0 { - mvNum = utils.RandInt2(rnd, 0, len(table.MaterializedViews)) - } - - switch mvNum { - case -1: - if len(table.Indexes) > 0 { - n = rnd.Intn(5) - } else { - n = rnd.Intn(4) - } - switch n { - case 0: - return genSinglePartitionQuery(s, table, g) - case 1: - numQueryPKs = utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) - multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) - if multiplier > 100 { - numQueryPKs = 1 - } - return genMultiplePartitionQuery(s, table, g, numQueryPKs) - case 2: - maxClusteringRels = utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) - return genClusteringRangeQuery(s, table, g, rnd, p, maxClusteringRels) - case 3: - numQueryPKs = utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) - multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) - if multiplier > 100 { - numQueryPKs = 1 - } - maxClusteringRels = utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) - return genMultiplePartitionClusteringRangeQuery(s, table, g, rnd, p, numQueryPKs, maxClusteringRels) - case 4: - // Reducing the probability to hit these since they often take a long time to run - switch rnd.Intn(5) { - case 0: - idxCount := utils.RandInt2(rnd, 1, len(table.Indexes)) - return genSingleIndexQuery(s, table, g, rnd, p, idxCount) - default: - return genSinglePartitionQuery(s, table, g) - } - } - default: - n = rnd.Intn(4) - switch n { - case 0: - return genSinglePartitionQueryMv(s, table, g, rnd, p, mvNum) - case 1: - lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() - numQueryPKs = utils.RandInt2(rnd, 1, lenPartitionKeys) - multiplier := int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) - if multiplier > 100 { - numQueryPKs = 1 - } - return genMultiplePartitionQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs) - case 2: - lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() - maxClusteringRels = utils.RandInt2(rnd, 0, lenClusteringKeys) - return genClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, maxClusteringRels) - case 3: - lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() - numQueryPKs = utils.RandInt2(rnd, 1, lenPartitionKeys) - multiplier := int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) - if multiplier > 100 { - numQueryPKs = 1 +) (*typedef.Stmt, func()) { + var stmt *typedef.Stmt + + if shouldGenerateCheckStatementForMV(table, rnd) { + stmt = genCheckStmtMV(s, table, g, rnd, p) + } else { + stmt = genCheckTableStmt(s, table, g, rnd, p) + } + + return stmt, func() { + if stmt.ValuesWithToken != nil { + for _, v := range stmt.ValuesWithToken { + g.ReleaseToken(v.Token) } - lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() - maxClusteringRels = utils.RandInt2(rnd, 0, lenClusteringKeys) - return genMultiplePartitionClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs, maxClusteringRels) } } +} + +// shouldGenerateCheckStatementForMV should be true if we have materialized views +// and the random number is even. So this means that we have a 50% chance of +// checking materialized views. +func shouldGenerateCheckStatementForMV(table *typedef.Table, rnd *rand.Rand) bool { + return len(table.MaterializedViews) > 0 && rnd.Int()%2 == 0 +} - return nil +func genCheckStmtMV(s *typedef.Schema, table *typedef.Table, g generators.Interface, rnd *rand.Rand, p *typedef.PartitionRangeConfig) *typedef.Stmt { + mvNum := utils.RandInt2(rnd, 0, len(table.MaterializedViews)) + lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() + lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() + + maxClusteringRels := utils.RandInt2(rnd, 0, lenClusteringKeys) + numQueryPKs := utils.RandInt2(rnd, 1, lenPartitionKeys) + if int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) > 100 { + numQueryPKs = 1 + } + + switch rnd.Intn(4) { + case 0: + return genSinglePartitionQueryMv(s, table, g, rnd, p, mvNum) + case 1: + return genMultiplePartitionQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs) + case 2: + return genClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, maxClusteringRels) + case 3: + return genMultiplePartitionClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs, maxClusteringRels) + default: + panic("random number generator does not work correctly, unreachable statement") + } } -func genSinglePartitionQuery( +func genCheckTableStmt( s *typedef.Schema, - t *typedef.Table, - g generators.GeneratorInterface, + table *typedef.Table, + g generators.Interface, + rnd *rand.Rand, + p *typedef.PartitionRangeConfig, ) *typedef.Stmt { + var n int + + if len(table.Indexes) > 0 { + n = rnd.Intn(5) + } else { + n = rnd.Intn(4) + } + + maxClusteringRels := utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) + numQueryPKs := utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) + multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) + if multiplier > 100 { + numQueryPKs = 1 + } + + switch n { + case 0: + return genSinglePartitionQuery(s, table, g) + case 1: + return genMultiplePartitionQuery(s, table, g, numQueryPKs) + case 2: + return genClusteringRangeQuery(s, table, g, rnd, p, maxClusteringRels) + case 3: + return genMultiplePartitionClusteringRangeQuery(s, table, g, rnd, p, numQueryPKs, maxClusteringRels) + case 4: + // Reducing the probability to hit these since they often take a long time to run + // One in five chance to hit this + if rnd.Intn(5) == 0 { + idxCount := utils.RandInt2(rnd, 1, len(table.Indexes)) + return genSingleIndexQuery(s, table, g, rnd, p, idxCount) + } + + return genSinglePartitionQuery(s, table, g) + default: + panic("random number generator does not work correctly, unreachable statement") + } +} + +func genSinglePartitionQuery(s *typedef.Schema, t *typedef.Table, g generators.Interface) *typedef.Stmt { t.RLock() defer t.RUnlock() valuesWithToken := g.GetOld() @@ -124,7 +135,8 @@ func genSinglePartitionQuery( } values := valuesWithToken.Value.Copy() builder := qb.Select(s.Keyspace.Name + "." + t.Name) - typs := make([]typedef.Type, 0, 10) + typs := make([]typedef.Type, 0, len(t.PartitionKeys)) + for _, pk := range t.PartitionKeys { builder = builder.Where(qb.Eq(pk.Name)) typs = append(typs, pk.Type) @@ -144,7 +156,7 @@ func genSinglePartitionQuery( func genSinglePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum int, @@ -183,7 +195,7 @@ func genSinglePartitionQueryMv( func genMultiplePartitionQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, numQueryPKs int, ) *typedef.Stmt { t.RLock() @@ -197,7 +209,7 @@ func genMultiplePartitionQuery( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -223,7 +235,7 @@ func genMultiplePartitionQuery( func genMultiplePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs int, @@ -241,7 +253,7 @@ func genMultiplePartitionQueryMv( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -274,7 +286,7 @@ func genMultiplePartitionQueryMv( func genClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, maxClusteringRels int, @@ -321,7 +333,7 @@ func genClusteringRangeQuery( func genClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, maxClusteringRels int, @@ -374,7 +386,7 @@ func genClusteringRangeQueryMv( func genMultiplePartitionClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, numQueryPKs, maxClusteringRels int, @@ -397,7 +409,7 @@ func genMultiplePartitionClusteringRangeQuery( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -435,7 +447,7 @@ func genMultiplePartitionClusteringRangeQuery( func genMultiplePartitionClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs, maxClusteringRels int, @@ -478,7 +490,7 @@ func genMultiplePartitionClusteringRangeQueryMv( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -516,7 +528,7 @@ func genMultiplePartitionClusteringRangeQueryMv( func genSingleIndexQuery( s *typedef.Schema, t *typedef.Table, - _ generators.GeneratorInterface, + _ generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, idxCount int, diff --git a/pkg/jobs/gen_check_stmt_test.go b/pkg/generators/statements/gen_check_stmt_test.go similarity index 70% rename from pkg/jobs/gen_check_stmt_test.go rename to pkg/generators/statements/gen_check_stmt_test.go index 912c9cf2..cea4d3d0 100644 --- a/pkg/jobs/gen_check_stmt_test.go +++ b/pkg/generators/statements/gen_check_stmt_test.go @@ -13,9 +13,10 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( + "github.com/scylladb/gemini/pkg/jobs" "path" "testing" @@ -26,63 +27,63 @@ import ( var checkDataPath = "./test_expected_data/check/" func TestGenSinglePartitionQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_partition.json"), genSinglePartitionQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_partition.json"), genSinglePartitionQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) stmt := genSinglePartitionQuery(schema, schema.Tables[0], gen) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenSinglePartitionQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_partition_mv.json"), genSinglePartitionQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_partition_mv.json"), genSinglePartitionQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt := genSinglePartitionQueryMv(schema, schema.Tables[0], gen, rnd, &prc, len(schema.Tables[0].MaterializedViews)-1) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition.json"), genMultiplePartitionQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition.json"), genMultiplePartitionQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) - stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_mv.json"), genMultiplePartitionQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_mv.json"), genMultiplePartitionQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { options := testutils.GetOptionsFromCaseName(caseName) schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) - stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenClusteringRangeQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "clustering_range.json"), genClusteringRangeQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "clustering_range.json"), genClusteringRangeQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() - stmt := genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenClusteringRangeQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "clustering_range_mv.json"), genClusteringRangeQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "clustering_range_mv.json"), genClusteringRangeQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -93,15 +94,15 @@ func TestGenClusteringRangeQueryMv(t *testing.T) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionClusteringRangeQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_clustering_range.json"), genMultiplePartitionClusteringRangeQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_clustering_range.json"), genMultiplePartitionClusteringRangeQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -111,16 +112,16 @@ func TestGenMultiplePartitionClusteringRangeQuery(t *testing.T) { gen, rnd, &prc, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionClusteringRangeQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_clustering_range_mv.json"), genMultiplePartitionClusteringRangeQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_clustering_range_mv.json"), genMultiplePartitionClusteringRangeQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { options := testutils.GetOptionsFromCaseName(caseName) schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -131,21 +132,21 @@ func TestGenMultiplePartitionClusteringRangeQueryMv(t *testing.T) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenSingleIndexQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_index.json"), genSingleIndexQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_index.json"), genSingleIndexQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt := genSingleIndexQuery(schema, schema.Tables[0], gen, rnd, &prc, len(schema.Tables[0].Indexes)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } @@ -190,7 +191,7 @@ func BenchmarkGenMultiplePartitionQuery(t *testing.B) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) subT.ResetTimer() for x := 0; x < subT.N; x++ { - _ = genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + _ = genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) } }) } @@ -214,7 +215,7 @@ func BenchmarkGenMultiplePartitionQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) } }) } @@ -231,7 +232,7 @@ func BenchmarkGenClusteringRangeQuery(t *testing.B) { prc := schema.Config.GetPartitionRangeConfig() subT.ResetTimer() for x := 0; x < subT.N; x++ { - _ = genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + _ = genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -255,7 +256,7 @@ func BenchmarkGenClusteringRangeQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -278,8 +279,8 @@ func BenchmarkGenMultiplePartitionClusteringRangeQuery(t *testing.B) { gen, rnd, &prc, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -303,8 +304,8 @@ func BenchmarkGenMultiplePartitionClusteringRangeQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } diff --git a/pkg/jobs/gen_const_test.go b/pkg/generators/statements/gen_const_test.go similarity index 99% rename from pkg/jobs/gen_const_test.go rename to pkg/generators/statements/gen_const_test.go index 1cf5922c..f602f3ef 100644 --- a/pkg/jobs/gen_const_test.go +++ b/pkg/generators/statements/gen_const_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements var ( genInsertStmtCases = []string{ diff --git a/pkg/jobs/gen_ddl_stmt.go b/pkg/generators/statements/gen_ddl_stmt.go similarity index 99% rename from pkg/jobs/gen_ddl_stmt.go rename to pkg/generators/statements/gen_ddl_stmt.go index a8866fd4..358f9bce 100644 --- a/pkg/jobs/gen_ddl_stmt.go +++ b/pkg/generators/statements/gen_ddl_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "fmt" diff --git a/pkg/jobs/gen_ddl_stmt_test.go b/pkg/generators/statements/gen_ddl_stmt_test.go similarity index 83% rename from pkg/jobs/gen_ddl_stmt_test.go rename to pkg/generators/statements/gen_ddl_stmt_test.go index e5f81a73..7b3670de 100644 --- a/pkg/jobs/gen_ddl_stmt_test.go +++ b/pkg/generators/statements/gen_ddl_stmt_test.go @@ -13,10 +13,11 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( "fmt" + "github.com/scylladb/gemini/pkg/jobs" "path" "strconv" "testing" @@ -30,24 +31,24 @@ import ( var ddlDataPath = "./test_expected_data/ddl/" func TestGenDropColumnStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(ddlDataPath, "drop_column.json"), genDropColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(ddlDataPath, "drop_column.json"), genDropColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, _, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) columnToDelete := getColumnToDeleteFromOptions(options, schema.Tables[0].Columns) stmt, err := genDropColumnStmt(schema.Tables[0], schema.Keyspace.Name, columnToDelete) - validateStmt(subT, stmt, err) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, err) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenAddColumnStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(ddlDataPath, "add_column.json"), genAddColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(ddlDataPath, "add_column.json"), genAddColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, _, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) columnToAdd := getColumnToAddFromOptions(options, len(schema.Tables[0].Columns)) stmt, err := genAddColumnStmt(schema.Tables[0], schema.Keyspace.Name, columnToAdd) - validateStmt(subT, stmt, err) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, err) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } diff --git a/pkg/jobs/gen_mutate_stmt.go b/pkg/generators/statements/gen_mutate_stmt.go similarity index 97% rename from pkg/jobs/gen_mutate_stmt.go rename to pkg/generators/statements/gen_mutate_stmt.go index 6f65caab..f727da77 100644 --- a/pkg/jobs/gen_mutate_stmt.go +++ b/pkg/generators/statements/gen_mutate_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "encoding/json" @@ -26,7 +26,7 @@ import ( "github.com/scylladb/gemini/pkg/typedef" ) -func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.GeneratorInterface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { +func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { t.RLock() defer t.RUnlock() diff --git a/pkg/jobs/gen_mutate_stmt_test.go b/pkg/generators/statements/gen_mutate_stmt_test.go similarity index 74% rename from pkg/jobs/gen_mutate_stmt_test.go rename to pkg/generators/statements/gen_mutate_stmt_test.go index 006f5329..910174b9 100644 --- a/pkg/jobs/gen_mutate_stmt_test.go +++ b/pkg/generators/statements/gen_mutate_stmt_test.go @@ -13,9 +13,10 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( + "github.com/scylladb/gemini/pkg/jobs" "path" "testing" @@ -26,43 +27,43 @@ import ( var mutateDataPath = "./test_expected_data/mutate/" func TestGenInsertStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "insert.json"), genInsertStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "insert.json"), genInsertStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() useLWT := testutils.GetOptionsFromCaseName(caseName).GetBool("lwt") stmt, err := genInsertStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc, useLWT) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenInsertJSONStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "insert_j.json"), genInsertJSONStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "insert_j.json"), genInsertJSONStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genInsertJSONStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenUpdateStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "update.json"), genUpdateStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "update.json"), genUpdateStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genUpdateStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenDeleteRows(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "delete.json"), genDeleteStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "delete.json"), genDeleteStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genDeleteRows(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } diff --git a/pkg/jobs/test_expected_data/check/clustering_range.json b/pkg/generators/statements/test_expected_data/check/clustering_range.json similarity index 100% rename from pkg/jobs/test_expected_data/check/clustering_range.json rename to pkg/generators/statements/test_expected_data/check/clustering_range.json diff --git a/pkg/jobs/test_expected_data/check/clustering_range_mv.json b/pkg/generators/statements/test_expected_data/check/clustering_range_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/clustering_range_mv.json rename to pkg/generators/statements/test_expected_data/check/clustering_range_mv.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition.json b/pkg/generators/statements/test_expected_data/check/multiple_partition.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_clustering_range.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_clustering_range.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_clustering_range_mv.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_clustering_range_mv.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range_mv.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_mv.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_mv.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_mv.json diff --git a/pkg/jobs/test_expected_data/check/single_index.json b/pkg/generators/statements/test_expected_data/check/single_index.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_index.json rename to pkg/generators/statements/test_expected_data/check/single_index.json diff --git a/pkg/jobs/test_expected_data/check/single_partition.json b/pkg/generators/statements/test_expected_data/check/single_partition.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_partition.json rename to pkg/generators/statements/test_expected_data/check/single_partition.json diff --git a/pkg/jobs/test_expected_data/check/single_partition_mv.json b/pkg/generators/statements/test_expected_data/check/single_partition_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_partition_mv.json rename to pkg/generators/statements/test_expected_data/check/single_partition_mv.json diff --git a/pkg/jobs/test_expected_data/ddl/add_column.json b/pkg/generators/statements/test_expected_data/ddl/add_column.json similarity index 100% rename from pkg/jobs/test_expected_data/ddl/add_column.json rename to pkg/generators/statements/test_expected_data/ddl/add_column.json diff --git a/pkg/jobs/test_expected_data/ddl/drop_column.json b/pkg/generators/statements/test_expected_data/ddl/drop_column.json similarity index 100% rename from pkg/jobs/test_expected_data/ddl/drop_column.json rename to pkg/generators/statements/test_expected_data/ddl/drop_column.json diff --git a/pkg/jobs/test_expected_data/mutate/delete.json b/pkg/generators/statements/test_expected_data/mutate/delete.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/delete.json rename to pkg/generators/statements/test_expected_data/mutate/delete.json diff --git a/pkg/jobs/test_expected_data/mutate/insert.json b/pkg/generators/statements/test_expected_data/mutate/insert.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/insert.json rename to pkg/generators/statements/test_expected_data/mutate/insert.json diff --git a/pkg/jobs/test_expected_data/mutate/insert_j.json b/pkg/generators/statements/test_expected_data/mutate/insert_j.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/insert_j.json rename to pkg/generators/statements/test_expected_data/mutate/insert_j.json diff --git a/pkg/jobs/test_expected_data/mutate/update.json b/pkg/generators/statements/test_expected_data/mutate/update.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/update.json rename to pkg/generators/statements/test_expected_data/mutate/update.json diff --git a/pkg/jobs/ddl.go b/pkg/jobs/ddl.go new file mode 100644 index 00000000..9cd98d6f --- /dev/null +++ b/pkg/jobs/ddl.go @@ -0,0 +1,63 @@ +package jobs + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" +) + +func (m *mutation) DDL(ctx context.Context) error { + m.table.RLock() + // Scylla does not allow changing the DDL of a table with materialized views. + if len(m.table.MaterializedViews) > 0 { + m.table.RUnlock() + return nil + } + m.table.RUnlock() + + m.table.Lock() + defer m.table.Unlock() + ddlStmts, err := statements.GenDDLStmt(m.schema, m.table, m.random, m.partitionRangeConfig, m.schemaCfg) + if err != nil { + m.logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err)) + m.globalStatus.WriteErrors.Add(1) + return err + } + if ddlStmts == nil { + if w := m.logger.Check(zap.DebugLevel, "no statement generated"); w != nil { + w.Write(zap.String("job", "ddl")) + } + return nil + } + for _, ddlStmt := range ddlStmts.List { + if w := m.logger.Check(zap.DebugLevel, "ddl statement"); w != nil { + w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) + } + if err = m.store.Mutate(ctx, ddlStmt); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + m.globalStatus.AddWriteError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: ddlStmts.QueryType.ToString(), + Message: "DDL failed: " + err.Error(), + Query: ddlStmt.PrettyCQL(), + }) + return err + } + m.globalStatus.WriteOps.Add(1) + } + ddlStmts.PostStmtHook() + + jsonSchema, _ := json.MarshalIndent(m.schema, "", " ") + fmt.Printf("New schema: %v\n", string(jsonSchema)) + + return nil +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index ac21c09f..2389464b 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -16,24 +16,21 @@ package jobs import ( "context" - "encoding/json" - "errors" - "fmt" "time" - "github.com/scylladb/gemini/pkg/generators" - "github.com/scylladb/gemini/pkg/store" - "github.com/scylladb/gemini/pkg/typedef" - - "github.com/scylladb/gemini/pkg/joberror" - "github.com/scylladb/gemini/pkg/status" - "github.com/scylladb/gemini/pkg/stop" - "go.uber.org/zap" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" ) +type Mode []string + const ( WriteMode = "write" ReadMode = "read" @@ -41,433 +38,113 @@ const ( WarmupMode = "warmup" ) -const ( - warmupName = "Warmup" - validateName = "Validation" - mutateName = "Mutation" -) - -var ( - warmup = job{name: warmupName, function: warmupJob} - validate = job{name: validateName, function: validationJob} - mutate = job{name: mutateName, function: mutationJob} -) - -type List struct { - name string - jobs []job - duration time.Duration - workers uint64 -} - -type job struct { - function func( - context.Context, - <-chan time.Duration, - *typedef.Schema, - typedef.SchemaConfig, - *typedef.Table, - store.Store, - *rand.Rand, - *typedef.PartitionRangeConfig, - *generators.Generator, - *status.GlobalStatus, - *zap.Logger, - *stop.Flag, - bool, - bool, - ) error - name string -} - -func ListFromMode(mode string, duration time.Duration, workers uint64) List { - jobs := make([]job, 0, 2) - name := "work cycle" - switch mode { +func ModeFromString(m string) Mode { + switch m { case WriteMode: - jobs = append(jobs, mutate) + return Mode{WriteMode} case ReadMode: - jobs = append(jobs, validate) + return Mode{ReadMode} + case MixedMode: + return Mode{WriteMode, ReadMode} case WarmupMode: - jobs = append(jobs, warmup) - name = "warmup cycle" + return Mode{WarmupMode} default: - jobs = append(jobs, mutate, validate) - } - return List{ - name: name, - jobs: jobs, - duration: duration, - workers: workers, + return Mode{} } } -func (l List) Run( - ctx context.Context, - schema *typedef.Schema, - schemaConfig typedef.SchemaConfig, - s store.Store, - pump <-chan time.Duration, - generators []*generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, - seed uint64, - stopFlag *stop.Flag, - failFast, verbose bool, -) error { - logger = logger.Named(l.name) - ctx = stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop) - g, gCtx := errgroup.WithContext(ctx) - time.AfterFunc(l.duration, func() { - logger.Info("jobs time is up, begins jobs completion") - stopFlag.SetSoft(true) - }) - - partitionRangeConfig := schemaConfig.GetPartitionRangeConfig() - logger.Info("start jobs") - for j := range schema.Tables { - gen := generators[j] - table := schema.Tables[j] - for i := 0; i < int(l.workers); i++ { - for idx := range l.jobs { - jobF := l.jobs[idx].function - r := rand.New(rand.NewSource(seed)) - g.Go(func() error { - return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, gen, globalStatus, logger, stopFlag, failFast, verbose) - }) - } - } - } - return g.Wait() +type List struct { + name string + duration time.Duration + logger *zap.Logger + random *rand.Rand + stopFlag *stop.Flag + workers uint64 + jobs []Job + generators []*generators.Generator + schema *typedef.Schema + verbose bool + failFast bool } -// mutationJob continuously applies mutations against the database -// for as long as the pump is active. -func mutationJob( - ctx context.Context, - pump <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, - stopFlag *stop.Flag, - failFast, verbose bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("mutation_job") - logger.Info("starting mutation loop") - defer func() { - logger.Info("ending mutation loop") - }() - for { - if stopFlag.IsHardOrSoft() { - return nil - } - select { - case <-stopFlag.SignalChannel(): - logger.Debug("mutation job terminated") - return nil - case hb := <-pump: - time.Sleep(hb) - } - ind := r.Intn(1000000) - if ind%100000 == 0 { - _ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose) - } else { - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger) - } - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } +type Job interface { + Name() string + Do(context.Context, generators.Interface) error } -// validationJob continuously applies validations against the database -// for as long as the pump is active. -func validationJob( - ctx context.Context, - pump <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, +func New( + mode string, + duration time.Duration, + workers uint64, stopFlag *stop.Flag, - failFast, _ bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("validation_job") - logger.Info("starting validation loop") - defer func() { - logger.Info("ending validation loop") - }() - - for { - if stopFlag.IsHardOrSoft() { - return nil - } - select { - case <-stopFlag.SignalChannel(): - return nil - case hb := <-pump: - time.Sleep(hb) - } - stmt := GenCheckStmt(schema, table, g, r, p) - if stmt == nil { - logger.Info("Validation. No statement generated from GenCheckStmt.") - continue - } - err := validation(ctx, schemaConfig, table, s, stmt, logger) - if stmt.ValuesWithToken != nil { - for _, token := range stmt.ValuesWithToken { - g.ReleaseToken(token.Token) - } - } - switch { - case err == nil: - globalStatus.ReadOps.Add(1) - case errors.Is(err, context.Canceled): - return nil - default: - globalStatus.AddReadError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: stmt.QueryType.ToString(), - Message: "Validation failed: " + err.Error(), - Query: stmt.PrettyCQL(), - }) - } - - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } -} - -// warmupJob continuously applies mutations against the database -// for as long as the pump is active or the supplied duration expires. -func warmupJob( - ctx context.Context, - _ <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, logger *zap.Logger, - stopFlag *stop.Flag, - failFast, _ bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("warmup") - logger.Info("starting warmup loop") - defer func() { - logger.Info("ending warmup loop") - }() - for { - if stopFlag.IsHardOrSoft() { - logger.Debug("warmup job terminated") - return nil - } - // Do we care about errors during warmup? - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger) - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } -} - -func ddl( - ctx context.Context, schema *typedef.Schema, - sc *typedef.SchemaConfig, table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, + store store.Store, globalStatus *status.GlobalStatus, - logger *zap.Logger, + schemaCfg *typedef.SchemaConfig, + seed uint64, + gens []*generators.Generator, + pump <-chan time.Duration, + failFast bool, verbose bool, -) error { - if sc.CQLFeature != typedef.CQL_FEATURE_ALL { - logger.Debug("ddl statements disabled") - return nil - } - if len(table.MaterializedViews) > 0 { - // Scylla does not allow changing the DDL of a table with materialized views. - return nil - } - table.Lock() - defer table.Unlock() - ddlStmts, err := GenDDLStmt(schema, table, r, p, sc) - if err != nil { - logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err)) - globalStatus.WriteErrors.Add(1) - return err - } - if ddlStmts == nil { - if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { - w.Write(zap.String("job", "ddl")) - } - return nil - } - for _, ddlStmt := range ddlStmts.List { - if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil { - w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) - } - if err = s.Mutate(ctx, ddlStmt); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - globalStatus.AddWriteError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: ddlStmts.QueryType.ToString(), - Message: "DDL failed: " + err.Error(), - Query: ddlStmt.PrettyCQL(), - }) - return err - } - globalStatus.WriteOps.Add(1) - } - ddlStmts.PostStmtHook() - if verbose { - jsonSchema, _ := json.MarshalIndent(schema, "", " ") - fmt.Printf("New schema: %v\n", string(jsonSchema)) - } - return nil -} +) List { + partitionRangeConfig := schemaCfg.GetPartitionRangeConfig() + rnd := rand.New(rand.NewSource(seed)) -func mutation( - ctx context.Context, - schema *typedef.Schema, - _ *typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - deletes bool, - logger *zap.Logger, -) error { - mutateStmt, err := GenMutateStmt(schema, table, g, r, p, deletes) - if err != nil { - logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) - globalStatus.WriteErrors.Add(1) - return err - } - if mutateStmt == nil { - if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { - w.Write(zap.String("job", "mutation")) + jobs := make([]Job, 0, 2) + name := "work cycle" + for _, m := range ModeFromString(mode) { + switch m { + case WriteMode: + jobs = append(jobs, NewMutation(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, schemaCfg, pump, failFast, verbose)) + case ReadMode: + jobs = append(jobs, NewValidation(logger, pump, schema, schemaCfg, table, store, rnd, &partitionRangeConfig, globalStatus, stopFlag, failFast)) + case WarmupMode: + jobs = append(jobs, NewWarmup(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, failFast, verbose)) + name = "warmup cycle" } - return err } - if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil { - w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) - } - if err = s.Mutate(ctx, mutateStmt); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - globalStatus.AddWriteError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: mutateStmt.QueryType.ToString(), - Message: "Mutation failed: " + err.Error(), - Query: mutateStmt.PrettyCQL(), - }) - } else { - globalStatus.WriteOps.Add(1) - g.GiveOlds(mutateStmt.ValuesWithToken) + return List{ + name: name, + jobs: jobs, + duration: duration, + workers: workers, + stopFlag: stopFlag, + failFast: failFast, + verbose: verbose, + random: rnd, + generators: gens, + schema: schema, } - return nil } -func validation( - ctx context.Context, - sc *typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - stmt *typedef.Stmt, - logger *zap.Logger, -) error { - if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { - w.Write(zap.String("pretty_cql", stmt.PrettyCQL())) - } +func (l List) Name() string { + return l.name +} - maxAttempts := 1 - delay := 10 * time.Millisecond - if stmt.QueryType.PossibleAsyncOperation() { - maxAttempts = sc.AsyncObjectStabilizationAttempts - if maxAttempts < 1 { - maxAttempts = 1 - } - delay = sc.AsyncObjectStabilizationDelay - } +func (l List) Do(ctx context.Context) error { + ctx = l.stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop) + g, gCtx := errgroup.WithContext(ctx) + time.AfterFunc(l.duration, func() { + l.logger.Info("jobs time is up, begins jobs completion") + l.stopFlag.SetSoft(true) + }) - var lastErr, err error - attempt := 1 - for { - lastErr = err - err = s.Check(ctx, table, stmt, attempt == maxAttempts) + l.logger.Info("start jobs") - if err == nil { - if attempt > 1 { - logger.Info(fmt.Sprintf("Validation successfully completed on %d attempt.", attempt)) + for j := range l.schema.Tables { + gen := l.generators[j] + for i := 0; i < int(l.workers); i++ { + for idx := range l.jobs { + jobF := l.jobs[idx] + g.Go(func() error { + return jobF.Do(gCtx, gen) + }) } - return nil - } - if errors.Is(err, context.Canceled) { - // When context is canceled it means that test was commanded to stop - // to skip logging part it is returned here - return err - } - if attempt == maxAttempts { - break - } - if errors.Is(err, unWrapErr(lastErr)) { - logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error same as at attempt before. ", attempt, maxAttempts)) - } else { - logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) } - - select { - case <-time.After(delay): - case <-ctx.Done(): - logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) - return nil - } - attempt++ - } - - if attempt > 1 { - logger.Info(fmt.Sprintf("Retring failed validation stoped by reach of max attempts %d. Error: %s", maxAttempts, err)) - } else { - logger.Info(fmt.Sprintf("Validation failed. Error: %s", err)) } - return err -} - -func unWrapErr(err error) error { - nextErr := err - for nextErr != nil { - err = nextErr - nextErr = errors.Unwrap(err) - } - return err + return g.Wait() } diff --git a/pkg/jobs/mutation.go b/pkg/jobs/mutation.go new file mode 100644 index 00000000..0ba2d6ab --- /dev/null +++ b/pkg/jobs/mutation.go @@ -0,0 +1,162 @@ +package jobs + +import ( + "context" + "github.com/scylladb/gemini/pkg/generators" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" +) + +type ( + Mutation struct { + logger *zap.Logger + mutation mutation + stopFlag *stop.Flag + pump <-chan time.Duration + failFast bool + verbose bool + } + + mutation struct { + logger *zap.Logger + schema *typedef.Schema + table *typedef.Table + store store.Store + partitionRangeConfig *typedef.PartitionRangeConfig + schemaCfg *typedef.SchemaConfig + globalStatus *status.GlobalStatus + random *rand.Rand + deletes bool + } +) + +func NewMutation( + logger *zap.Logger, + schema *typedef.Schema, + table *typedef.Table, + store store.Store, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + schemaCfg *typedef.SchemaConfig, + pump <-chan time.Duration, + failFast bool, + verbose bool, +) *Mutation { + return &Mutation{ + logger: logger.Named("mutation"), + mutation: mutation{ + logger: logger.Named("mutation-with-deletes"), + schema: schema, + table: table, + store: store, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + deletes: true, + schemaCfg: schemaCfg, + }, + stopFlag: stopFlag, + pump: pump, + failFast: failFast, + verbose: verbose, + } +} + +func (m *Mutation) Name() string { + return "Mutation" +} + +func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error { + m.logger.Info("starting mutation loop") + defer m.logger.Info("ending mutation loop") + + for { + if m.stopFlag.IsHardOrSoft() { + return nil + } + select { + case <-m.stopFlag.SignalChannel(): + m.logger.Debug("mutation job terminated") + return nil + case hb := <-m.pump: + time.Sleep(hb) + } + + var err error + + if m.mutation.ShouldDoDDL() { + err = m.mutation.DDL(ctx) + } else { + err = m.mutation.Statement(ctx, generator) + } + + if err != nil { + // TODO: handle error + } + + if m.failFast && m.mutation.HasErrors() { + m.stopFlag.SetSoft(true) + return nil + } + } +} + +func (m *mutation) Statement(ctx context.Context, generator generators.Interface) error { + mutateStmt, err := statements.GenMutateStmt(m.schema, m.table, generator, m.random, m.partitionRangeConfig, m.deletes) + if err != nil { + m.logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) + m.globalStatus.WriteErrors.Add(1) + return err + } + if mutateStmt == nil { + if w := m.logger.Check(zap.DebugLevel, "no statement generated"); w != nil { + w.Write(zap.String("job", "mutation")) + } + return err + } + + if w := m.logger.Check(zap.DebugLevel, "mutation statement"); w != nil { + w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) + } + if err = m.store.Mutate(ctx, mutateStmt); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + m.globalStatus.AddWriteError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: mutateStmt.QueryType.ToString(), + Message: "Mutation failed: " + err.Error(), + Query: mutateStmt.PrettyCQL(), + }) + + return err + } + + m.globalStatus.WriteOps.Add(1) + generator.GiveOld(mutateStmt.ValuesWithToken...) + + return nil +} + +func (m *mutation) HasErrors() bool { + return m.globalStatus.HasErrors() +} + +func (m *mutation) ShouldDoDDL() bool { + if m.schemaCfg.CQLFeature != typedef.CQL_FEATURE_ALL { + return false + } + + ind := m.random.Intn(1000000) + return ind%100000 == 0 +} diff --git a/pkg/jobs/validation.go b/pkg/jobs/validation.go new file mode 100644 index 00000000..3b31f8df --- /dev/null +++ b/pkg/jobs/validation.go @@ -0,0 +1,180 @@ +package jobs + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" + "github.com/scylladb/gemini/pkg/utils" +) + +type Validation struct { + logger *zap.Logger + pump <-chan time.Duration + schema *typedef.Schema + schemaConfig *typedef.SchemaConfig + table *typedef.Table + store store.Store + random *rand.Rand + partitionRangeConfig *typedef.PartitionRangeConfig + globalStatus *status.GlobalStatus + stopFlag *stop.Flag + failFast bool +} + +func NewValidation( + logger *zap.Logger, + pump <-chan time.Duration, + schema *typedef.Schema, + schemaConfig *typedef.SchemaConfig, + table *typedef.Table, + store store.Store, + random *rand.Rand, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + failFast bool, +) *Validation { + return &Validation{ + logger: logger.Named("validation"), + pump: pump, + schema: schema, + schemaConfig: schemaConfig, + table: table, + store: store, + random: random, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + stopFlag: stopFlag, + failFast: failFast, + } +} + +func (v *Validation) Name() string { + return "Validation" +} + +func (v *Validation) validate(ctx context.Context, generator generators.Interface) error { + stmt, cleanup := statements.GenCheckStmt(v.schema, v.table, generator, v.random, v.partitionRangeConfig) + defer cleanup() + + err := validation(ctx, v.schemaConfig, v.table, v.store, stmt, v.logger) + + switch { + case err == nil: + v.globalStatus.ReadOps.Add(1) + case errors.Is(err, context.Canceled): + return context.Canceled + default: + v.globalStatus.AddReadError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: stmt.QueryType.ToString(), + Message: "Validation failed: " + err.Error(), + Query: stmt.PrettyCQL(), + }) + } + + return nil +} + +func (v *Validation) Do(ctx context.Context, generator generators.Interface) error { + v.logger.Info("starting validation loop") + defer v.logger.Info("ending validation loop") + + for { + if v.stopFlag.IsHardOrSoft() { + return nil + } + select { + case <-v.stopFlag.SignalChannel(): + return nil + case hb := <-v.pump: + time.Sleep(hb) + } + + if err := v.validate(ctx, generator); errors.Is(err, context.Canceled) { + return nil + } + + if v.failFast && v.globalStatus.HasErrors() { + v.stopFlag.SetSoft(true) + return nil + } + } +} + +func validation( + ctx context.Context, + sc *typedef.SchemaConfig, + table *typedef.Table, + s store.Store, + stmt *typedef.Stmt, + logger *zap.Logger, +) error { + if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { + w.Write(zap.String("pretty_cql", stmt.PrettyCQL())) + } + + maxAttempts := 1 + delay := 10 * time.Millisecond + if stmt.QueryType.PossibleAsyncOperation() { + maxAttempts = sc.AsyncObjectStabilizationAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + delay = sc.AsyncObjectStabilizationDelay + } + + var lastErr, err error + attempt := 1 + for ; ; attempt++ { + lastErr = err + err = s.Check(ctx, table, stmt, attempt == maxAttempts) + + if err == nil { + if attempt > 1 { + logger.Info(fmt.Sprintf("Validation successfully completed on %d attempt.", attempt)) + } + return nil + } + if errors.Is(err, context.Canceled) { + // When context is canceled it means that test was commanded to stop + // to skip logging part it is returned here + return err + } + if attempt == maxAttempts { + break + } + if errors.Is(err, utils.UnwrapErr(lastErr)) { + logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error same as at attempt before. ", attempt, maxAttempts)) + } else { + logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) + } + + select { + case <-time.After(delay): + case <-ctx.Done(): + logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) + return nil + } + } + + if attempt > 1 { + logger.Info(fmt.Sprintf("Retring failed validation stoped by reach of max attempts %d. Error: %s", maxAttempts, err)) + } else { + logger.Info(fmt.Sprintf("Validation failed. Error: %s", err)) + } + + return err +} diff --git a/pkg/jobs/warmup.go b/pkg/jobs/warmup.go new file mode 100644 index 00000000..6ef14777 --- /dev/null +++ b/pkg/jobs/warmup.go @@ -0,0 +1,71 @@ +package jobs + +import ( + "context" + + "go.uber.org/zap" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" +) + +type Warmup struct { + mutation mutation + logger *zap.Logger + stopFlag *stop.Flag + failFast bool + verbose bool +} + +func NewWarmup( + logger *zap.Logger, + schema *typedef.Schema, + table *typedef.Table, + store store.Store, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + failFast bool, + verbose bool, +) *Warmup { + return &Warmup{ + logger: logger.Named("mutation"), + mutation: mutation{ + logger: logger.Named("mutation-without-deletes"), + schema: schema, + table: table, + store: store, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + deletes: false, + }, + stopFlag: stopFlag, + failFast: failFast, + verbose: verbose, + } +} + +func (w *Warmup) Name() string { + return "Warmup" +} + +func (w *Warmup) Do(ctx context.Context, generator generators.Interface) error { + w.logger.Info("starting warmup loop") + defer w.logger.Info("ending warmup loop") + + for { + if w.stopFlag.IsHardOrSoft() { + w.logger.Debug("warmup job terminated") + return nil + } + + _ = w.mutation.Statement(ctx, generator) + if w.failFast && w.mutation.globalStatus.HasErrors() { + w.stopFlag.SetSoft(true) + return nil + } + } +} diff --git a/pkg/stop/flag.go b/pkg/stop/flag.go index 54c6e1f2..9ce08624 100644 --- a/pkg/stop/flag.go +++ b/pkg/stop/flag.go @@ -143,8 +143,8 @@ func (s *Flag) IsHardOrSoft() bool { func (s *Flag) AddHandler(handler func(signal uint32)) { s.stopHandlers.Append(handler) val := s.val.Load() - switch val { - case SignalSoftStop, SignalHardStop: + + if val == SignalSoftStop || val == SignalHardStop { handler(val) } } diff --git a/pkg/typedef/table.go b/pkg/typedef/table.go index 1af82947..2479d58b 100644 --- a/pkg/typedef/table.go +++ b/pkg/typedef/table.go @@ -105,14 +105,13 @@ func (t *Table) ValidColumnsForDelete() Columns { } } } - if len(t.MaterializedViews) != 0 { - for _, mv := range t.MaterializedViews { - if mv.HaveNonPrimaryKey() { - for j := range validCols { - if validCols[j].Name == mv.NonPrimaryKey.Name { - validCols = append(validCols[:j], validCols[j+1:]...) - break - } + + for _, mv := range t.MaterializedViews { + if mv.HaveNonPrimaryKey() { + for j := range validCols { + if validCols[j].Name == mv.NonPrimaryKey.Name { + validCols = append(validCols[:j], validCols[j+1:]...) + break } } } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 0b775d48..4f3ab17a 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -17,6 +17,7 @@ package utils import ( "encoding/hex" "fmt" + "github.com/pkg/errors" "strconv" "strings" "time" @@ -107,3 +108,12 @@ func UUIDFromTime(rnd *rand.Rand) string { } return gocql.UUIDFromTime(RandDate(rnd)).String() } + +func UnwrapErr(err error) error { + nextErr := err + for nextErr != nil { + err = nextErr + nextErr = errors.Unwrap(err) + } + return err +}