From 68d722049215c0f81dc6ef96a1f012ea235b64d6 Mon Sep 17 00:00:00 2001 From: Dusan Malusev Date: Wed, 23 Oct 2024 17:16:20 +0200 Subject: [PATCH] improvement(stmt): refactoring statement genration Statement generation currently does not work as intended, gemini generates values for primary and clustering keys, that are completply invalid. The are not just off by value, but also off by type, e.g. generates `text` for `timeuuid`. This is the first step in resolving this issue, by refactoring how, when and where statements are generated. Signed-off-by: Dusan Malusev --- .run/Run Gemini.run.xml | 14 + cmd/gemini/root.go | 12 +- pkg/generators/generator.go | 18 +- pkg/generators/statement_generator.go | 2 +- .../statements}/gen_check_stmt.go | 196 +++---- .../statements}/gen_check_stmt_test.go | 107 ++-- .../statements}/gen_const_test.go | 2 +- .../statements}/gen_ddl_stmt.go | 2 +- .../statements}/gen_ddl_stmt_test.go | 15 +- .../statements}/gen_mutate_stmt.go | 4 +- .../statements}/gen_mutate_stmt_test.go | 27 +- .../check/clustering_range.json | 0 .../check/clustering_range_mv.json | 0 .../check/multiple_partition.json | 0 .../multiple_partition_clustering_range.json | 0 ...ultiple_partition_clustering_range_mv.json | 0 .../check/multiple_partition_mv.json | 0 .../check/single_index.json | 0 .../check/single_partition.json | 0 .../check/single_partition_mv.json | 0 .../test_expected_data/ddl/add_column.json | 0 .../test_expected_data/ddl/drop_column.json | 0 .../test_expected_data/mutate/delete.json | 0 .../test_expected_data/mutate/insert.json | 0 .../test_expected_data/mutate/insert_j.json | 0 .../test_expected_data/mutate/update.json | 0 pkg/jobs/ddl.go | 63 +++ pkg/jobs/jobs.go | 495 +++--------------- pkg/jobs/mutation.go | 162 ++++++ pkg/jobs/validation.go | 180 +++++++ pkg/jobs/warmup.go | 71 +++ pkg/stop/flag.go | 4 +- pkg/typedef/table.go | 15 +- pkg/utils/utils.go | 10 + 34 files changed, 792 insertions(+), 607 deletions(-) create mode 100644 .run/Run Gemini.run.xml rename pkg/{jobs => generators/statements}/gen_check_stmt.go (77%) rename pkg/{jobs => generators/statements}/gen_check_stmt_test.go (70%) rename pkg/{jobs => generators/statements}/gen_const_test.go (99%) rename pkg/{jobs => generators/statements}/gen_ddl_stmt.go (99%) rename pkg/{jobs => generators/statements}/gen_ddl_stmt_test.go (83%) rename pkg/{jobs => generators/statements}/gen_mutate_stmt.go (97%) rename pkg/{jobs => generators/statements}/gen_mutate_stmt_test.go (74%) rename pkg/{jobs => generators/statements}/test_expected_data/check/clustering_range.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/clustering_range_mv.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/multiple_partition.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/multiple_partition_clustering_range.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/multiple_partition_clustering_range_mv.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/multiple_partition_mv.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/single_index.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/single_partition.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/check/single_partition_mv.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/ddl/add_column.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/ddl/drop_column.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/mutate/delete.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/mutate/insert.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/mutate/insert_j.json (100%) rename pkg/{jobs => generators/statements}/test_expected_data/mutate/update.json (100%) create mode 100644 pkg/jobs/ddl.go create mode 100644 pkg/jobs/mutation.go create mode 100644 pkg/jobs/validation.go create mode 100644 pkg/jobs/warmup.go diff --git a/.run/Run Gemini.run.xml b/.run/Run Gemini.run.xml new file mode 100644 index 00000000..3e4f4ec2 --- /dev/null +++ b/.run/Run Gemini.run.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + + + \ No newline at end of file diff --git a/cmd/gemini/root.go b/cmd/gemini/root.go index 0b13d7ad..fbfbcebb 100644 --- a/cmd/gemini/root.go +++ b/cmd/gemini/root.go @@ -292,16 +292,16 @@ func run(_ *cobra.Command, _ []string) error { } if warmup > 0 && !stopFlag.IsHardOrSoft() { - jobsList := jobs.ListFromMode(jobs.WarmupMode, warmup, concurrency) - if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil { + jobsList := jobs.New(jobs.WarmupMode, warmup, concurrency) + if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, warmupStopFlag, failFast, verbose); err != nil { logger.Error("warmup encountered an error", zap.Error(err)) stopFlag.SetHard(true) } } if !stopFlag.IsHardOrSoft() { - jobsList := jobs.ListFromMode(mode, duration, concurrency) - if err = jobsList.Run(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil { + jobsList := jobs.New(mode, duration, concurrency) + if err = jobsList.Do(ctx, schema, schemaConfig, st, pump, gens, globalStatus, logger, intSeed, stopFlag.CreateChild("workload"), failFast, verbose); err != nil { logger.Debug("error detected", zap.Error(err)) } } @@ -473,7 +473,7 @@ func init() { rootCmd.Flags().BoolVarP(&dropSchema, "drop-schema", "d", false, "Drop schema before starting tests run") rootCmd.Flags().BoolVarP(&verbose, "verbose", "v", false, "Verbose output during test run") rootCmd.Flags().BoolVarP(&failFast, "fail-fast", "f", false, "Stop on the first failure") - rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Run in non-interactive mode (disable progress indicator)") + rootCmd.Flags().BoolVarP(&nonInteractive, "non-interactive", "", false, "Statement in non-interactive mode (disable progress indicator)") rootCmd.Flags().DurationVarP(&duration, "duration", "", 30*time.Second, "") rootCmd.Flags().StringVarP(&outFileArg, "outfile", "", "", "Specify the name of the file where the results should go") rootCmd.Flags().StringVarP(&bind, "bind", "b", ":2112", "Specify the interface and port which to bind prometheus metrics on. Default is ':2112'") @@ -497,7 +497,7 @@ func init() { rootCmd.Flags().IntVarP(&minColumns, "min-columns", "", 8, "Minimum number of generated columns") rootCmd.Flags().StringVarP(&datasetSize, "dataset-size", "", "large", "Specify the type of dataset size to use, small|large") rootCmd.Flags().StringVarP(&cqlFeatures, "cql-features", "", "basic", "Specify the type of cql features to use, basic|normal|all") - rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Run gemini with materialized views support") + rootCmd.Flags().BoolVarP(&useMaterializedViews, "materialized-views", "", false, "Statement gemini with materialized views support") rootCmd.Flags().StringVarP(&level, "level", "", "info", "Specify the logging level, debug|info|warn|error|dpanic|panic|fatal") rootCmd.Flags().IntVarP(&maxRetriesMutate, "max-mutation-retries", "", 2, "Maximum number of attempts to apply a mutation") rootCmd.Flags().DurationVarP( diff --git a/pkg/generators/generator.go b/pkg/generators/generator.go index 500fc759..699dcd8b 100644 --- a/pkg/generators/generator.go +++ b/pkg/generators/generator.go @@ -37,12 +37,11 @@ type TokenIndex uint64 type DistributionFunc func() TokenIndex -type GeneratorInterface interface { +type Interface interface { Get() *typedef.ValueWithToken GetOld() *typedef.ValueWithToken - GiveOld(_ *typedef.ValueWithToken) - GiveOlds(_ []*typedef.ValueWithToken) - ReleaseToken(_ uint64) + GiveOld(...*typedef.ValueWithToken) + ReleaseToken(uint64) } type Generator struct { @@ -118,15 +117,10 @@ func (g *Generator) GetOld() *typedef.ValueWithToken { return targetPart.getOld() } -// GiveOld returns the supplied value for later reuse unless -func (g *Generator) GiveOld(v *typedef.ValueWithToken) { - g.GetPartitionForToken(TokenIndex(v.Token)).giveOld(v) -} - -// GiveOlds returns the supplied values for later reuse unless -func (g *Generator) GiveOlds(tokens []*typedef.ValueWithToken) { +// GiveOld returns the supplied value for later reuse +func (g *Generator) GiveOld(tokens ...*typedef.ValueWithToken) { for _, token := range tokens { - g.GiveOld(token) + g.GetPartitionForToken(TokenIndex(token.Token)).giveOld(token) } } diff --git a/pkg/generators/statement_generator.go b/pkg/generators/statement_generator.go index 25503a89..1374dec9 100644 --- a/pkg/generators/statement_generator.go +++ b/pkg/generators/statement_generator.go @@ -87,7 +87,7 @@ func genTable(sc typedef.SchemaConfig, tableName string, r *rand.Rand) *typedef. table.Indexes = indexes var mvs []typedef.MaterializedView - if sc.CQLFeature > typedef.CQL_FEATURE_BASIC && sc.UseMaterializedViews && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 { + if sc.UseMaterializedViews && sc.CQLFeature > typedef.CQL_FEATURE_BASIC && len(clusteringKeys) > 0 && columns.ValidColumnsForPrimaryKey().Len() != 0 { mvs = CreateMaterializedViews(columns, table.Name, partitionKeys, clusteringKeys, r) } diff --git a/pkg/jobs/gen_check_stmt.go b/pkg/generators/statements/gen_check_stmt.go similarity index 77% rename from pkg/jobs/gen_check_stmt.go rename to pkg/generators/statements/gen_check_stmt.go index 3822b409..bd406ce2 100644 --- a/pkg/jobs/gen_check_stmt.go +++ b/pkg/generators/statements/gen_check_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "math" @@ -28,94 +28,105 @@ import ( func GenCheckStmt( s *typedef.Schema, table *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, rnd *rand.Rand, p *typedef.PartitionRangeConfig, -) *typedef.Stmt { - n := 0 - mvNum := -1 - maxClusteringRels := 0 - numQueryPKs := 0 - if len(table.MaterializedViews) > 0 && rnd.Int()%2 == 0 { - mvNum = utils.RandInt2(rnd, 0, len(table.MaterializedViews)) - } - - switch mvNum { - case -1: - if len(table.Indexes) > 0 { - n = rnd.Intn(5) - } else { - n = rnd.Intn(4) - } - switch n { - case 0: - return genSinglePartitionQuery(s, table, g) - case 1: - numQueryPKs = utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) - multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) - if multiplier > 100 { - numQueryPKs = 1 - } - return genMultiplePartitionQuery(s, table, g, numQueryPKs) - case 2: - maxClusteringRels = utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) - return genClusteringRangeQuery(s, table, g, rnd, p, maxClusteringRels) - case 3: - numQueryPKs = utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) - multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) - if multiplier > 100 { - numQueryPKs = 1 - } - maxClusteringRels = utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) - return genMultiplePartitionClusteringRangeQuery(s, table, g, rnd, p, numQueryPKs, maxClusteringRels) - case 4: - // Reducing the probability to hit these since they often take a long time to run - switch rnd.Intn(5) { - case 0: - idxCount := utils.RandInt2(rnd, 1, len(table.Indexes)) - return genSingleIndexQuery(s, table, g, rnd, p, idxCount) - default: - return genSinglePartitionQuery(s, table, g) - } - } - default: - n = rnd.Intn(4) - switch n { - case 0: - return genSinglePartitionQueryMv(s, table, g, rnd, p, mvNum) - case 1: - lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() - numQueryPKs = utils.RandInt2(rnd, 1, lenPartitionKeys) - multiplier := int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) - if multiplier > 100 { - numQueryPKs = 1 - } - return genMultiplePartitionQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs) - case 2: - lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() - maxClusteringRels = utils.RandInt2(rnd, 0, lenClusteringKeys) - return genClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, maxClusteringRels) - case 3: - lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() - numQueryPKs = utils.RandInt2(rnd, 1, lenPartitionKeys) - multiplier := int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) - if multiplier > 100 { - numQueryPKs = 1 +) (*typedef.Stmt, func()) { + var stmt *typedef.Stmt + + if shouldGenerateCheckStatementForMV(table, rnd) { + stmt = genCheckStmtMV(s, table, g, rnd, p) + } else { + stmt = genCheckTableStmt(s, table, g, rnd, p) + } + + return stmt, func() { + if stmt.ValuesWithToken != nil { + for _, v := range stmt.ValuesWithToken { + g.ReleaseToken(v.Token) } - lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() - maxClusteringRels = utils.RandInt2(rnd, 0, lenClusteringKeys) - return genMultiplePartitionClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs, maxClusteringRels) } } +} + +// shouldGenerateCheckStatementForMV should be true if we have materialized views +// and the random number is even. So this means that we have a 50% chance of +// checking materialized views. +func shouldGenerateCheckStatementForMV(table *typedef.Table, rnd *rand.Rand) bool { + return len(table.MaterializedViews) > 0 && rnd.Int()%2 == 0 +} - return nil +func genCheckStmtMV(s *typedef.Schema, table *typedef.Table, g generators.Interface, rnd *rand.Rand, p *typedef.PartitionRangeConfig) *typedef.Stmt { + mvNum := utils.RandInt2(rnd, 0, len(table.MaterializedViews)) + lenClusteringKeys := table.MaterializedViews[mvNum].ClusteringKeys.Len() + lenPartitionKeys := table.MaterializedViews[mvNum].PartitionKeys.Len() + + maxClusteringRels := utils.RandInt2(rnd, 0, lenClusteringKeys) + numQueryPKs := utils.RandInt2(rnd, 1, lenPartitionKeys) + if int(math.Pow(float64(numQueryPKs), float64(lenPartitionKeys))) > 100 { + numQueryPKs = 1 + } + + switch rnd.Intn(4) { + case 0: + return genSinglePartitionQueryMv(s, table, g, rnd, p, mvNum) + case 1: + return genMultiplePartitionQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs) + case 2: + return genClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, maxClusteringRels) + case 3: + return genMultiplePartitionClusteringRangeQueryMv(s, table, g, rnd, p, mvNum, numQueryPKs, maxClusteringRels) + default: + panic("random number generator does not work correctly, unreachable statement") + } } -func genSinglePartitionQuery( +func genCheckTableStmt( s *typedef.Schema, - t *typedef.Table, - g generators.GeneratorInterface, + table *typedef.Table, + g generators.Interface, + rnd *rand.Rand, + p *typedef.PartitionRangeConfig, ) *typedef.Stmt { + var n int + + if len(table.Indexes) > 0 { + n = rnd.Intn(5) + } else { + n = rnd.Intn(4) + } + + maxClusteringRels := utils.RandInt2(rnd, 0, table.ClusteringKeys.Len()) + numQueryPKs := utils.RandInt2(rnd, 1, table.PartitionKeys.Len()) + multiplier := int(math.Pow(float64(numQueryPKs), float64(table.PartitionKeys.Len()))) + if multiplier > 100 { + numQueryPKs = 1 + } + + switch n { + case 0: + return genSinglePartitionQuery(s, table, g) + case 1: + return genMultiplePartitionQuery(s, table, g, numQueryPKs) + case 2: + return genClusteringRangeQuery(s, table, g, rnd, p, maxClusteringRels) + case 3: + return genMultiplePartitionClusteringRangeQuery(s, table, g, rnd, p, numQueryPKs, maxClusteringRels) + case 4: + // Reducing the probability to hit these since they often take a long time to run + // One in five chance to hit this + if rnd.Intn(5) == 0 { + idxCount := utils.RandInt2(rnd, 1, len(table.Indexes)) + return genSingleIndexQuery(s, table, g, rnd, p, idxCount) + } + + return genSinglePartitionQuery(s, table, g) + default: + panic("random number generator does not work correctly, unreachable statement") + } +} + +func genSinglePartitionQuery(s *typedef.Schema, t *typedef.Table, g generators.Interface) *typedef.Stmt { t.RLock() defer t.RUnlock() valuesWithToken := g.GetOld() @@ -124,7 +135,8 @@ func genSinglePartitionQuery( } values := valuesWithToken.Value.Copy() builder := qb.Select(s.Keyspace.Name + "." + t.Name) - typs := make([]typedef.Type, 0, 10) + typs := make([]typedef.Type, 0, len(t.PartitionKeys)) + for _, pk := range t.PartitionKeys { builder = builder.Where(qb.Eq(pk.Name)) typs = append(typs, pk.Type) @@ -144,7 +156,7 @@ func genSinglePartitionQuery( func genSinglePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum int, @@ -183,7 +195,7 @@ func genSinglePartitionQueryMv( func genMultiplePartitionQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, numQueryPKs int, ) *typedef.Stmt { t.RLock() @@ -197,7 +209,7 @@ func genMultiplePartitionQuery( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -223,7 +235,7 @@ func genMultiplePartitionQuery( func genMultiplePartitionQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs int, @@ -241,7 +253,7 @@ func genMultiplePartitionQueryMv( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -274,7 +286,7 @@ func genMultiplePartitionQueryMv( func genClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, maxClusteringRels int, @@ -321,7 +333,7 @@ func genClusteringRangeQuery( func genClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, maxClusteringRels int, @@ -374,7 +386,7 @@ func genClusteringRangeQueryMv( func genMultiplePartitionClusteringRangeQuery( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, numQueryPKs, maxClusteringRels int, @@ -397,7 +409,7 @@ func genMultiplePartitionClusteringRangeQuery( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -435,7 +447,7 @@ func genMultiplePartitionClusteringRangeQuery( func genMultiplePartitionClusteringRangeQueryMv( s *typedef.Schema, t *typedef.Table, - g generators.GeneratorInterface, + g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, mvNum, numQueryPKs, maxClusteringRels int, @@ -478,7 +490,7 @@ func genMultiplePartitionClusteringRangeQueryMv( for j := 0; j < numQueryPKs; j++ { vs := g.GetOld() if vs == nil { - g.GiveOlds(tokens) + g.GiveOld(tokens...) return nil } tokens = append(tokens, vs) @@ -516,7 +528,7 @@ func genMultiplePartitionClusteringRangeQueryMv( func genSingleIndexQuery( s *typedef.Schema, t *typedef.Table, - _ generators.GeneratorInterface, + _ generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, idxCount int, diff --git a/pkg/jobs/gen_check_stmt_test.go b/pkg/generators/statements/gen_check_stmt_test.go similarity index 70% rename from pkg/jobs/gen_check_stmt_test.go rename to pkg/generators/statements/gen_check_stmt_test.go index 912c9cf2..cea4d3d0 100644 --- a/pkg/jobs/gen_check_stmt_test.go +++ b/pkg/generators/statements/gen_check_stmt_test.go @@ -13,9 +13,10 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( + "github.com/scylladb/gemini/pkg/jobs" "path" "testing" @@ -26,63 +27,63 @@ import ( var checkDataPath = "./test_expected_data/check/" func TestGenSinglePartitionQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_partition.json"), genSinglePartitionQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_partition.json"), genSinglePartitionQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) stmt := genSinglePartitionQuery(schema, schema.Tables[0], gen) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenSinglePartitionQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_partition_mv.json"), genSinglePartitionQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_partition_mv.json"), genSinglePartitionQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt := genSinglePartitionQueryMv(schema, schema.Tables[0], gen, rnd, &prc, len(schema.Tables[0].MaterializedViews)-1) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition.json"), genMultiplePartitionQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition.json"), genMultiplePartitionQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) - stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_mv.json"), genMultiplePartitionQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_mv.json"), genMultiplePartitionQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { options := testutils.GetOptionsFromCaseName(caseName) schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) - stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenClusteringRangeQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "clustering_range.json"), genClusteringRangeQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "clustering_range.json"), genClusteringRangeQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() - stmt := genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + stmt := genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenClusteringRangeQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "clustering_range_mv.json"), genClusteringRangeQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "clustering_range_mv.json"), genClusteringRangeQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -93,15 +94,15 @@ func TestGenClusteringRangeQueryMv(t *testing.T) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionClusteringRangeQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_clustering_range.json"), genMultiplePartitionClusteringRangeQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_clustering_range.json"), genMultiplePartitionClusteringRangeQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -111,16 +112,16 @@ func TestGenMultiplePartitionClusteringRangeQuery(t *testing.T) { gen, rnd, &prc, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenMultiplePartitionClusteringRangeQueryMv(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "multiple_partition_clustering_range_mv.json"), genMultiplePartitionClusteringRangeQueryMvCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "multiple_partition_clustering_range_mv.json"), genMultiplePartitionClusteringRangeQueryMvCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { options := testutils.GetOptionsFromCaseName(caseName) schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() @@ -131,21 +132,21 @@ func TestGenMultiplePartitionClusteringRangeQueryMv(t *testing.T) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenSingleIndexQuery(t *testing.T) { - RunStmtTest[results](t, path.Join(checkDataPath, "single_index.json"), genSingleIndexQueryCases, - func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(checkDataPath, "single_index.json"), genSingleIndexQueryCases, + func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(subT, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt := genSingleIndexQuery(schema, schema.Tables[0], gen, rnd, &prc, len(schema.Tables[0].Indexes)) - validateStmt(subT, stmt, nil) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, nil) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } @@ -190,7 +191,7 @@ func BenchmarkGenMultiplePartitionQuery(t *testing.B) { schema, gen, _ := testutils.GetAllForTestStmt(subT, caseName) subT.ResetTimer() for x := 0; x < subT.N; x++ { - _ = genMultiplePartitionQuery(schema, schema.Tables[0], gen, GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + _ = genMultiplePartitionQuery(schema, schema.Tables[0], gen, jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) } }) } @@ -214,7 +215,7 @@ func BenchmarkGenMultiplePartitionQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys))) } }) } @@ -231,7 +232,7 @@ func BenchmarkGenClusteringRangeQuery(t *testing.B) { prc := schema.Config.GetPartitionRangeConfig() subT.ResetTimer() for x := 0; x < subT.N; x++ { - _ = genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + _ = genClusteringRangeQuery(schema, schema.Tables[0], gen, rnd, &prc, jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -255,7 +256,7 @@ func BenchmarkGenClusteringRangeQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -278,8 +279,8 @@ func BenchmarkGenMultiplePartitionClusteringRangeQuery(t *testing.B) { gen, rnd, &prc, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } @@ -303,8 +304,8 @@ func BenchmarkGenMultiplePartitionClusteringRangeQueryMv(t *testing.B) { rnd, &prc, len(schema.Tables[0].MaterializedViews)-1, - GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), - GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) + jobs.GetPkCountFromOptions(options, len(schema.Tables[0].PartitionKeys)), + jobs.GetCkCountFromOptions(options, len(schema.Tables[0].ClusteringKeys)-1)) } }) } diff --git a/pkg/jobs/gen_const_test.go b/pkg/generators/statements/gen_const_test.go similarity index 99% rename from pkg/jobs/gen_const_test.go rename to pkg/generators/statements/gen_const_test.go index 1cf5922c..f602f3ef 100644 --- a/pkg/jobs/gen_const_test.go +++ b/pkg/generators/statements/gen_const_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements var ( genInsertStmtCases = []string{ diff --git a/pkg/jobs/gen_ddl_stmt.go b/pkg/generators/statements/gen_ddl_stmt.go similarity index 99% rename from pkg/jobs/gen_ddl_stmt.go rename to pkg/generators/statements/gen_ddl_stmt.go index a8866fd4..358f9bce 100644 --- a/pkg/jobs/gen_ddl_stmt.go +++ b/pkg/generators/statements/gen_ddl_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "fmt" diff --git a/pkg/jobs/gen_ddl_stmt_test.go b/pkg/generators/statements/gen_ddl_stmt_test.go similarity index 83% rename from pkg/jobs/gen_ddl_stmt_test.go rename to pkg/generators/statements/gen_ddl_stmt_test.go index e5f81a73..7b3670de 100644 --- a/pkg/jobs/gen_ddl_stmt_test.go +++ b/pkg/generators/statements/gen_ddl_stmt_test.go @@ -13,10 +13,11 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( "fmt" + "github.com/scylladb/gemini/pkg/jobs" "path" "strconv" "testing" @@ -30,24 +31,24 @@ import ( var ddlDataPath = "./test_expected_data/ddl/" func TestGenDropColumnStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(ddlDataPath, "drop_column.json"), genDropColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(ddlDataPath, "drop_column.json"), genDropColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, _, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) columnToDelete := getColumnToDeleteFromOptions(options, schema.Tables[0].Columns) stmt, err := genDropColumnStmt(schema.Tables[0], schema.Keyspace.Name, columnToDelete) - validateStmt(subT, stmt, err) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, err) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenAddColumnStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(ddlDataPath, "add_column.json"), genAddColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(ddlDataPath, "add_column.json"), genAddColumnStmtCases, func(subT *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, _, _ := testutils.GetAllForTestStmt(subT, caseName) options := testutils.GetOptionsFromCaseName(caseName) columnToAdd := getColumnToAddFromOptions(options, len(schema.Tables[0].Columns)) stmt, err := genAddColumnStmt(schema.Tables[0], schema.Keyspace.Name, columnToAdd) - validateStmt(subT, stmt, err) - expected.CompareOrStore(subT, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(subT, stmt, err) + expected.CompareOrStore(subT, caseName, jobs.convertStmtsToResults(stmt)) }) } diff --git a/pkg/jobs/gen_mutate_stmt.go b/pkg/generators/statements/gen_mutate_stmt.go similarity index 97% rename from pkg/jobs/gen_mutate_stmt.go rename to pkg/generators/statements/gen_mutate_stmt.go index 6f65caab..f727da77 100644 --- a/pkg/jobs/gen_mutate_stmt.go +++ b/pkg/generators/statements/gen_mutate_stmt.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package jobs +package statements import ( "encoding/json" @@ -26,7 +26,7 @@ import ( "github.com/scylladb/gemini/pkg/typedef" ) -func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.GeneratorInterface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { +func GenMutateStmt(s *typedef.Schema, t *typedef.Table, g generators.Interface, r *rand.Rand, p *typedef.PartitionRangeConfig, deletes bool) (*typedef.Stmt, error) { t.RLock() defer t.RUnlock() diff --git a/pkg/jobs/gen_mutate_stmt_test.go b/pkg/generators/statements/gen_mutate_stmt_test.go similarity index 74% rename from pkg/jobs/gen_mutate_stmt_test.go rename to pkg/generators/statements/gen_mutate_stmt_test.go index 006f5329..910174b9 100644 --- a/pkg/jobs/gen_mutate_stmt_test.go +++ b/pkg/generators/statements/gen_mutate_stmt_test.go @@ -13,9 +13,10 @@ // limitations under the License. //nolint:thelper -package jobs +package statements import ( + "github.com/scylladb/gemini/pkg/jobs" "path" "testing" @@ -26,43 +27,43 @@ import ( var mutateDataPath = "./test_expected_data/mutate/" func TestGenInsertStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "insert.json"), genInsertStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "insert.json"), genInsertStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() useLWT := testutils.GetOptionsFromCaseName(caseName).GetBool("lwt") stmt, err := genInsertStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc, useLWT) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenInsertJSONStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "insert_j.json"), genInsertJSONStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "insert_j.json"), genInsertJSONStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genInsertJSONStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenUpdateStmt(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "update.json"), genUpdateStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "update.json"), genUpdateStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genUpdateStmt(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } func TestGenDeleteRows(t *testing.T) { - RunStmtTest[results](t, path.Join(mutateDataPath, "delete.json"), genDeleteStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[results]) { + jobs.RunStmtTest[jobs.results](t, path.Join(mutateDataPath, "delete.json"), genDeleteStmtCases, func(t *testing.T, caseName string, expected *testutils.ExpectedStore[jobs.results]) { schema, gen, rnd := testutils.GetAllForTestStmt(t, caseName) prc := schema.Config.GetPartitionRangeConfig() stmt, err := genDeleteRows(schema, schema.Tables[0], gen.Get(), rnd, &prc) - validateStmt(t, stmt, err) - expected.CompareOrStore(t, caseName, convertStmtsToResults(stmt)) + jobs.validateStmt(t, stmt, err) + expected.CompareOrStore(t, caseName, jobs.convertStmtsToResults(stmt)) }) } diff --git a/pkg/jobs/test_expected_data/check/clustering_range.json b/pkg/generators/statements/test_expected_data/check/clustering_range.json similarity index 100% rename from pkg/jobs/test_expected_data/check/clustering_range.json rename to pkg/generators/statements/test_expected_data/check/clustering_range.json diff --git a/pkg/jobs/test_expected_data/check/clustering_range_mv.json b/pkg/generators/statements/test_expected_data/check/clustering_range_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/clustering_range_mv.json rename to pkg/generators/statements/test_expected_data/check/clustering_range_mv.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition.json b/pkg/generators/statements/test_expected_data/check/multiple_partition.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_clustering_range.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_clustering_range.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_clustering_range_mv.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_clustering_range_mv.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_clustering_range_mv.json diff --git a/pkg/jobs/test_expected_data/check/multiple_partition_mv.json b/pkg/generators/statements/test_expected_data/check/multiple_partition_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/multiple_partition_mv.json rename to pkg/generators/statements/test_expected_data/check/multiple_partition_mv.json diff --git a/pkg/jobs/test_expected_data/check/single_index.json b/pkg/generators/statements/test_expected_data/check/single_index.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_index.json rename to pkg/generators/statements/test_expected_data/check/single_index.json diff --git a/pkg/jobs/test_expected_data/check/single_partition.json b/pkg/generators/statements/test_expected_data/check/single_partition.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_partition.json rename to pkg/generators/statements/test_expected_data/check/single_partition.json diff --git a/pkg/jobs/test_expected_data/check/single_partition_mv.json b/pkg/generators/statements/test_expected_data/check/single_partition_mv.json similarity index 100% rename from pkg/jobs/test_expected_data/check/single_partition_mv.json rename to pkg/generators/statements/test_expected_data/check/single_partition_mv.json diff --git a/pkg/jobs/test_expected_data/ddl/add_column.json b/pkg/generators/statements/test_expected_data/ddl/add_column.json similarity index 100% rename from pkg/jobs/test_expected_data/ddl/add_column.json rename to pkg/generators/statements/test_expected_data/ddl/add_column.json diff --git a/pkg/jobs/test_expected_data/ddl/drop_column.json b/pkg/generators/statements/test_expected_data/ddl/drop_column.json similarity index 100% rename from pkg/jobs/test_expected_data/ddl/drop_column.json rename to pkg/generators/statements/test_expected_data/ddl/drop_column.json diff --git a/pkg/jobs/test_expected_data/mutate/delete.json b/pkg/generators/statements/test_expected_data/mutate/delete.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/delete.json rename to pkg/generators/statements/test_expected_data/mutate/delete.json diff --git a/pkg/jobs/test_expected_data/mutate/insert.json b/pkg/generators/statements/test_expected_data/mutate/insert.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/insert.json rename to pkg/generators/statements/test_expected_data/mutate/insert.json diff --git a/pkg/jobs/test_expected_data/mutate/insert_j.json b/pkg/generators/statements/test_expected_data/mutate/insert_j.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/insert_j.json rename to pkg/generators/statements/test_expected_data/mutate/insert_j.json diff --git a/pkg/jobs/test_expected_data/mutate/update.json b/pkg/generators/statements/test_expected_data/mutate/update.json similarity index 100% rename from pkg/jobs/test_expected_data/mutate/update.json rename to pkg/generators/statements/test_expected_data/mutate/update.json diff --git a/pkg/jobs/ddl.go b/pkg/jobs/ddl.go new file mode 100644 index 00000000..9cd98d6f --- /dev/null +++ b/pkg/jobs/ddl.go @@ -0,0 +1,63 @@ +package jobs + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" +) + +func (m *mutation) DDL(ctx context.Context) error { + m.table.RLock() + // Scylla does not allow changing the DDL of a table with materialized views. + if len(m.table.MaterializedViews) > 0 { + m.table.RUnlock() + return nil + } + m.table.RUnlock() + + m.table.Lock() + defer m.table.Unlock() + ddlStmts, err := statements.GenDDLStmt(m.schema, m.table, m.random, m.partitionRangeConfig, m.schemaCfg) + if err != nil { + m.logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err)) + m.globalStatus.WriteErrors.Add(1) + return err + } + if ddlStmts == nil { + if w := m.logger.Check(zap.DebugLevel, "no statement generated"); w != nil { + w.Write(zap.String("job", "ddl")) + } + return nil + } + for _, ddlStmt := range ddlStmts.List { + if w := m.logger.Check(zap.DebugLevel, "ddl statement"); w != nil { + w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) + } + if err = m.store.Mutate(ctx, ddlStmt); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + m.globalStatus.AddWriteError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: ddlStmts.QueryType.ToString(), + Message: "DDL failed: " + err.Error(), + Query: ddlStmt.PrettyCQL(), + }) + return err + } + m.globalStatus.WriteOps.Add(1) + } + ddlStmts.PostStmtHook() + + jsonSchema, _ := json.MarshalIndent(m.schema, "", " ") + fmt.Printf("New schema: %v\n", string(jsonSchema)) + + return nil +} diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index ac21c09f..2389464b 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -16,24 +16,21 @@ package jobs import ( "context" - "encoding/json" - "errors" - "fmt" "time" - "github.com/scylladb/gemini/pkg/generators" - "github.com/scylladb/gemini/pkg/store" - "github.com/scylladb/gemini/pkg/typedef" - - "github.com/scylladb/gemini/pkg/joberror" - "github.com/scylladb/gemini/pkg/status" - "github.com/scylladb/gemini/pkg/stop" - "go.uber.org/zap" "golang.org/x/exp/rand" "golang.org/x/sync/errgroup" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" ) +type Mode []string + const ( WriteMode = "write" ReadMode = "read" @@ -41,433 +38,113 @@ const ( WarmupMode = "warmup" ) -const ( - warmupName = "Warmup" - validateName = "Validation" - mutateName = "Mutation" -) - -var ( - warmup = job{name: warmupName, function: warmupJob} - validate = job{name: validateName, function: validationJob} - mutate = job{name: mutateName, function: mutationJob} -) - -type List struct { - name string - jobs []job - duration time.Duration - workers uint64 -} - -type job struct { - function func( - context.Context, - <-chan time.Duration, - *typedef.Schema, - typedef.SchemaConfig, - *typedef.Table, - store.Store, - *rand.Rand, - *typedef.PartitionRangeConfig, - *generators.Generator, - *status.GlobalStatus, - *zap.Logger, - *stop.Flag, - bool, - bool, - ) error - name string -} - -func ListFromMode(mode string, duration time.Duration, workers uint64) List { - jobs := make([]job, 0, 2) - name := "work cycle" - switch mode { +func ModeFromString(m string) Mode { + switch m { case WriteMode: - jobs = append(jobs, mutate) + return Mode{WriteMode} case ReadMode: - jobs = append(jobs, validate) + return Mode{ReadMode} + case MixedMode: + return Mode{WriteMode, ReadMode} case WarmupMode: - jobs = append(jobs, warmup) - name = "warmup cycle" + return Mode{WarmupMode} default: - jobs = append(jobs, mutate, validate) - } - return List{ - name: name, - jobs: jobs, - duration: duration, - workers: workers, + return Mode{} } } -func (l List) Run( - ctx context.Context, - schema *typedef.Schema, - schemaConfig typedef.SchemaConfig, - s store.Store, - pump <-chan time.Duration, - generators []*generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, - seed uint64, - stopFlag *stop.Flag, - failFast, verbose bool, -) error { - logger = logger.Named(l.name) - ctx = stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop) - g, gCtx := errgroup.WithContext(ctx) - time.AfterFunc(l.duration, func() { - logger.Info("jobs time is up, begins jobs completion") - stopFlag.SetSoft(true) - }) - - partitionRangeConfig := schemaConfig.GetPartitionRangeConfig() - logger.Info("start jobs") - for j := range schema.Tables { - gen := generators[j] - table := schema.Tables[j] - for i := 0; i < int(l.workers); i++ { - for idx := range l.jobs { - jobF := l.jobs[idx].function - r := rand.New(rand.NewSource(seed)) - g.Go(func() error { - return jobF(gCtx, pump, schema, schemaConfig, table, s, r, &partitionRangeConfig, gen, globalStatus, logger, stopFlag, failFast, verbose) - }) - } - } - } - return g.Wait() +type List struct { + name string + duration time.Duration + logger *zap.Logger + random *rand.Rand + stopFlag *stop.Flag + workers uint64 + jobs []Job + generators []*generators.Generator + schema *typedef.Schema + verbose bool + failFast bool } -// mutationJob continuously applies mutations against the database -// for as long as the pump is active. -func mutationJob( - ctx context.Context, - pump <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, - stopFlag *stop.Flag, - failFast, verbose bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("mutation_job") - logger.Info("starting mutation loop") - defer func() { - logger.Info("ending mutation loop") - }() - for { - if stopFlag.IsHardOrSoft() { - return nil - } - select { - case <-stopFlag.SignalChannel(): - logger.Debug("mutation job terminated") - return nil - case hb := <-pump: - time.Sleep(hb) - } - ind := r.Intn(1000000) - if ind%100000 == 0 { - _ = ddl(ctx, schema, schemaConfig, table, s, r, p, globalStatus, logger, verbose) - } else { - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, true, logger) - } - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } +type Job interface { + Name() string + Do(context.Context, generators.Interface) error } -// validationJob continuously applies validations against the database -// for as long as the pump is active. -func validationJob( - ctx context.Context, - pump <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - logger *zap.Logger, +func New( + mode string, + duration time.Duration, + workers uint64, stopFlag *stop.Flag, - failFast, _ bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("validation_job") - logger.Info("starting validation loop") - defer func() { - logger.Info("ending validation loop") - }() - - for { - if stopFlag.IsHardOrSoft() { - return nil - } - select { - case <-stopFlag.SignalChannel(): - return nil - case hb := <-pump: - time.Sleep(hb) - } - stmt := GenCheckStmt(schema, table, g, r, p) - if stmt == nil { - logger.Info("Validation. No statement generated from GenCheckStmt.") - continue - } - err := validation(ctx, schemaConfig, table, s, stmt, logger) - if stmt.ValuesWithToken != nil { - for _, token := range stmt.ValuesWithToken { - g.ReleaseToken(token.Token) - } - } - switch { - case err == nil: - globalStatus.ReadOps.Add(1) - case errors.Is(err, context.Canceled): - return nil - default: - globalStatus.AddReadError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: stmt.QueryType.ToString(), - Message: "Validation failed: " + err.Error(), - Query: stmt.PrettyCQL(), - }) - } - - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } -} - -// warmupJob continuously applies mutations against the database -// for as long as the pump is active or the supplied duration expires. -func warmupJob( - ctx context.Context, - _ <-chan time.Duration, - schema *typedef.Schema, - schemaCfg typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, logger *zap.Logger, - stopFlag *stop.Flag, - failFast, _ bool, -) error { - schemaConfig := &schemaCfg - logger = logger.Named("warmup") - logger.Info("starting warmup loop") - defer func() { - logger.Info("ending warmup loop") - }() - for { - if stopFlag.IsHardOrSoft() { - logger.Debug("warmup job terminated") - return nil - } - // Do we care about errors during warmup? - _ = mutation(ctx, schema, schemaConfig, table, s, r, p, g, globalStatus, false, logger) - if failFast && globalStatus.HasErrors() { - stopFlag.SetSoft(true) - return nil - } - } -} - -func ddl( - ctx context.Context, schema *typedef.Schema, - sc *typedef.SchemaConfig, table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, + store store.Store, globalStatus *status.GlobalStatus, - logger *zap.Logger, + schemaCfg *typedef.SchemaConfig, + seed uint64, + gens []*generators.Generator, + pump <-chan time.Duration, + failFast bool, verbose bool, -) error { - if sc.CQLFeature != typedef.CQL_FEATURE_ALL { - logger.Debug("ddl statements disabled") - return nil - } - if len(table.MaterializedViews) > 0 { - // Scylla does not allow changing the DDL of a table with materialized views. - return nil - } - table.Lock() - defer table.Unlock() - ddlStmts, err := GenDDLStmt(schema, table, r, p, sc) - if err != nil { - logger.Error("Failed! DDL Mutation statement generation failed", zap.Error(err)) - globalStatus.WriteErrors.Add(1) - return err - } - if ddlStmts == nil { - if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { - w.Write(zap.String("job", "ddl")) - } - return nil - } - for _, ddlStmt := range ddlStmts.List { - if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil { - w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL())) - } - if err = s.Mutate(ctx, ddlStmt); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - globalStatus.AddWriteError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: ddlStmts.QueryType.ToString(), - Message: "DDL failed: " + err.Error(), - Query: ddlStmt.PrettyCQL(), - }) - return err - } - globalStatus.WriteOps.Add(1) - } - ddlStmts.PostStmtHook() - if verbose { - jsonSchema, _ := json.MarshalIndent(schema, "", " ") - fmt.Printf("New schema: %v\n", string(jsonSchema)) - } - return nil -} +) List { + partitionRangeConfig := schemaCfg.GetPartitionRangeConfig() + rnd := rand.New(rand.NewSource(seed)) -func mutation( - ctx context.Context, - schema *typedef.Schema, - _ *typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - r *rand.Rand, - p *typedef.PartitionRangeConfig, - g *generators.Generator, - globalStatus *status.GlobalStatus, - deletes bool, - logger *zap.Logger, -) error { - mutateStmt, err := GenMutateStmt(schema, table, g, r, p, deletes) - if err != nil { - logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) - globalStatus.WriteErrors.Add(1) - return err - } - if mutateStmt == nil { - if w := logger.Check(zap.DebugLevel, "no statement generated"); w != nil { - w.Write(zap.String("job", "mutation")) + jobs := make([]Job, 0, 2) + name := "work cycle" + for _, m := range ModeFromString(mode) { + switch m { + case WriteMode: + jobs = append(jobs, NewMutation(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, schemaCfg, pump, failFast, verbose)) + case ReadMode: + jobs = append(jobs, NewValidation(logger, pump, schema, schemaCfg, table, store, rnd, &partitionRangeConfig, globalStatus, stopFlag, failFast)) + case WarmupMode: + jobs = append(jobs, NewWarmup(logger, schema, table, store, &partitionRangeConfig, globalStatus, stopFlag, failFast, verbose)) + name = "warmup cycle" } - return err } - if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil { - w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) - } - if err = s.Mutate(ctx, mutateStmt); err != nil { - if errors.Is(err, context.Canceled) { - return nil - } - globalStatus.AddWriteError(&joberror.JobError{ - Timestamp: time.Now(), - StmtType: mutateStmt.QueryType.ToString(), - Message: "Mutation failed: " + err.Error(), - Query: mutateStmt.PrettyCQL(), - }) - } else { - globalStatus.WriteOps.Add(1) - g.GiveOlds(mutateStmt.ValuesWithToken) + return List{ + name: name, + jobs: jobs, + duration: duration, + workers: workers, + stopFlag: stopFlag, + failFast: failFast, + verbose: verbose, + random: rnd, + generators: gens, + schema: schema, } - return nil } -func validation( - ctx context.Context, - sc *typedef.SchemaConfig, - table *typedef.Table, - s store.Store, - stmt *typedef.Stmt, - logger *zap.Logger, -) error { - if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { - w.Write(zap.String("pretty_cql", stmt.PrettyCQL())) - } +func (l List) Name() string { + return l.name +} - maxAttempts := 1 - delay := 10 * time.Millisecond - if stmt.QueryType.PossibleAsyncOperation() { - maxAttempts = sc.AsyncObjectStabilizationAttempts - if maxAttempts < 1 { - maxAttempts = 1 - } - delay = sc.AsyncObjectStabilizationDelay - } +func (l List) Do(ctx context.Context) error { + ctx = l.stopFlag.CancelContextOnSignal(ctx, stop.SignalHardStop) + g, gCtx := errgroup.WithContext(ctx) + time.AfterFunc(l.duration, func() { + l.logger.Info("jobs time is up, begins jobs completion") + l.stopFlag.SetSoft(true) + }) - var lastErr, err error - attempt := 1 - for { - lastErr = err - err = s.Check(ctx, table, stmt, attempt == maxAttempts) + l.logger.Info("start jobs") - if err == nil { - if attempt > 1 { - logger.Info(fmt.Sprintf("Validation successfully completed on %d attempt.", attempt)) + for j := range l.schema.Tables { + gen := l.generators[j] + for i := 0; i < int(l.workers); i++ { + for idx := range l.jobs { + jobF := l.jobs[idx] + g.Go(func() error { + return jobF.Do(gCtx, gen) + }) } - return nil - } - if errors.Is(err, context.Canceled) { - // When context is canceled it means that test was commanded to stop - // to skip logging part it is returned here - return err - } - if attempt == maxAttempts { - break - } - if errors.Is(err, unWrapErr(lastErr)) { - logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error same as at attempt before. ", attempt, maxAttempts)) - } else { - logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) } - - select { - case <-time.After(delay): - case <-ctx.Done(): - logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) - return nil - } - attempt++ - } - - if attempt > 1 { - logger.Info(fmt.Sprintf("Retring failed validation stoped by reach of max attempts %d. Error: %s", maxAttempts, err)) - } else { - logger.Info(fmt.Sprintf("Validation failed. Error: %s", err)) } - return err -} - -func unWrapErr(err error) error { - nextErr := err - for nextErr != nil { - err = nextErr - nextErr = errors.Unwrap(err) - } - return err + return g.Wait() } diff --git a/pkg/jobs/mutation.go b/pkg/jobs/mutation.go new file mode 100644 index 00000000..0ba2d6ab --- /dev/null +++ b/pkg/jobs/mutation.go @@ -0,0 +1,162 @@ +package jobs + +import ( + "context" + "github.com/scylladb/gemini/pkg/generators" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" +) + +type ( + Mutation struct { + logger *zap.Logger + mutation mutation + stopFlag *stop.Flag + pump <-chan time.Duration + failFast bool + verbose bool + } + + mutation struct { + logger *zap.Logger + schema *typedef.Schema + table *typedef.Table + store store.Store + partitionRangeConfig *typedef.PartitionRangeConfig + schemaCfg *typedef.SchemaConfig + globalStatus *status.GlobalStatus + random *rand.Rand + deletes bool + } +) + +func NewMutation( + logger *zap.Logger, + schema *typedef.Schema, + table *typedef.Table, + store store.Store, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + schemaCfg *typedef.SchemaConfig, + pump <-chan time.Duration, + failFast bool, + verbose bool, +) *Mutation { + return &Mutation{ + logger: logger.Named("mutation"), + mutation: mutation{ + logger: logger.Named("mutation-with-deletes"), + schema: schema, + table: table, + store: store, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + deletes: true, + schemaCfg: schemaCfg, + }, + stopFlag: stopFlag, + pump: pump, + failFast: failFast, + verbose: verbose, + } +} + +func (m *Mutation) Name() string { + return "Mutation" +} + +func (m *Mutation) Do(ctx context.Context, generator generators.Interface) error { + m.logger.Info("starting mutation loop") + defer m.logger.Info("ending mutation loop") + + for { + if m.stopFlag.IsHardOrSoft() { + return nil + } + select { + case <-m.stopFlag.SignalChannel(): + m.logger.Debug("mutation job terminated") + return nil + case hb := <-m.pump: + time.Sleep(hb) + } + + var err error + + if m.mutation.ShouldDoDDL() { + err = m.mutation.DDL(ctx) + } else { + err = m.mutation.Statement(ctx, generator) + } + + if err != nil { + // TODO: handle error + } + + if m.failFast && m.mutation.HasErrors() { + m.stopFlag.SetSoft(true) + return nil + } + } +} + +func (m *mutation) Statement(ctx context.Context, generator generators.Interface) error { + mutateStmt, err := statements.GenMutateStmt(m.schema, m.table, generator, m.random, m.partitionRangeConfig, m.deletes) + if err != nil { + m.logger.Error("Failed! Mutation statement generation failed", zap.Error(err)) + m.globalStatus.WriteErrors.Add(1) + return err + } + if mutateStmt == nil { + if w := m.logger.Check(zap.DebugLevel, "no statement generated"); w != nil { + w.Write(zap.String("job", "mutation")) + } + return err + } + + if w := m.logger.Check(zap.DebugLevel, "mutation statement"); w != nil { + w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL())) + } + if err = m.store.Mutate(ctx, mutateStmt); err != nil { + if errors.Is(err, context.Canceled) { + return nil + } + m.globalStatus.AddWriteError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: mutateStmt.QueryType.ToString(), + Message: "Mutation failed: " + err.Error(), + Query: mutateStmt.PrettyCQL(), + }) + + return err + } + + m.globalStatus.WriteOps.Add(1) + generator.GiveOld(mutateStmt.ValuesWithToken...) + + return nil +} + +func (m *mutation) HasErrors() bool { + return m.globalStatus.HasErrors() +} + +func (m *mutation) ShouldDoDDL() bool { + if m.schemaCfg.CQLFeature != typedef.CQL_FEATURE_ALL { + return false + } + + ind := m.random.Intn(1000000) + return ind%100000 == 0 +} diff --git a/pkg/jobs/validation.go b/pkg/jobs/validation.go new file mode 100644 index 00000000..3b31f8df --- /dev/null +++ b/pkg/jobs/validation.go @@ -0,0 +1,180 @@ +package jobs + +import ( + "context" + "fmt" + "time" + + "github.com/pkg/errors" + "go.uber.org/zap" + "golang.org/x/exp/rand" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/generators/statements" + "github.com/scylladb/gemini/pkg/joberror" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" + "github.com/scylladb/gemini/pkg/utils" +) + +type Validation struct { + logger *zap.Logger + pump <-chan time.Duration + schema *typedef.Schema + schemaConfig *typedef.SchemaConfig + table *typedef.Table + store store.Store + random *rand.Rand + partitionRangeConfig *typedef.PartitionRangeConfig + globalStatus *status.GlobalStatus + stopFlag *stop.Flag + failFast bool +} + +func NewValidation( + logger *zap.Logger, + pump <-chan time.Duration, + schema *typedef.Schema, + schemaConfig *typedef.SchemaConfig, + table *typedef.Table, + store store.Store, + random *rand.Rand, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + failFast bool, +) *Validation { + return &Validation{ + logger: logger.Named("validation"), + pump: pump, + schema: schema, + schemaConfig: schemaConfig, + table: table, + store: store, + random: random, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + stopFlag: stopFlag, + failFast: failFast, + } +} + +func (v *Validation) Name() string { + return "Validation" +} + +func (v *Validation) validate(ctx context.Context, generator generators.Interface) error { + stmt, cleanup := statements.GenCheckStmt(v.schema, v.table, generator, v.random, v.partitionRangeConfig) + defer cleanup() + + err := validation(ctx, v.schemaConfig, v.table, v.store, stmt, v.logger) + + switch { + case err == nil: + v.globalStatus.ReadOps.Add(1) + case errors.Is(err, context.Canceled): + return context.Canceled + default: + v.globalStatus.AddReadError(&joberror.JobError{ + Timestamp: time.Now(), + StmtType: stmt.QueryType.ToString(), + Message: "Validation failed: " + err.Error(), + Query: stmt.PrettyCQL(), + }) + } + + return nil +} + +func (v *Validation) Do(ctx context.Context, generator generators.Interface) error { + v.logger.Info("starting validation loop") + defer v.logger.Info("ending validation loop") + + for { + if v.stopFlag.IsHardOrSoft() { + return nil + } + select { + case <-v.stopFlag.SignalChannel(): + return nil + case hb := <-v.pump: + time.Sleep(hb) + } + + if err := v.validate(ctx, generator); errors.Is(err, context.Canceled) { + return nil + } + + if v.failFast && v.globalStatus.HasErrors() { + v.stopFlag.SetSoft(true) + return nil + } + } +} + +func validation( + ctx context.Context, + sc *typedef.SchemaConfig, + table *typedef.Table, + s store.Store, + stmt *typedef.Stmt, + logger *zap.Logger, +) error { + if w := logger.Check(zap.DebugLevel, "validation statement"); w != nil { + w.Write(zap.String("pretty_cql", stmt.PrettyCQL())) + } + + maxAttempts := 1 + delay := 10 * time.Millisecond + if stmt.QueryType.PossibleAsyncOperation() { + maxAttempts = sc.AsyncObjectStabilizationAttempts + if maxAttempts < 1 { + maxAttempts = 1 + } + delay = sc.AsyncObjectStabilizationDelay + } + + var lastErr, err error + attempt := 1 + for ; ; attempt++ { + lastErr = err + err = s.Check(ctx, table, stmt, attempt == maxAttempts) + + if err == nil { + if attempt > 1 { + logger.Info(fmt.Sprintf("Validation successfully completed on %d attempt.", attempt)) + } + return nil + } + if errors.Is(err, context.Canceled) { + // When context is canceled it means that test was commanded to stop + // to skip logging part it is returned here + return err + } + if attempt == maxAttempts { + break + } + if errors.Is(err, utils.UnwrapErr(lastErr)) { + logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error same as at attempt before. ", attempt, maxAttempts)) + } else { + logger.Info(fmt.Sprintf("Retring failed validation. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) + } + + select { + case <-time.After(delay): + case <-ctx.Done(): + logger.Info(fmt.Sprintf("Retring failed validation stoped by done context. %d attempt from %d attempts. Error: %s", attempt, maxAttempts, err)) + return nil + } + } + + if attempt > 1 { + logger.Info(fmt.Sprintf("Retring failed validation stoped by reach of max attempts %d. Error: %s", maxAttempts, err)) + } else { + logger.Info(fmt.Sprintf("Validation failed. Error: %s", err)) + } + + return err +} diff --git a/pkg/jobs/warmup.go b/pkg/jobs/warmup.go new file mode 100644 index 00000000..6ef14777 --- /dev/null +++ b/pkg/jobs/warmup.go @@ -0,0 +1,71 @@ +package jobs + +import ( + "context" + + "go.uber.org/zap" + + "github.com/scylladb/gemini/pkg/generators" + "github.com/scylladb/gemini/pkg/status" + "github.com/scylladb/gemini/pkg/stop" + "github.com/scylladb/gemini/pkg/store" + "github.com/scylladb/gemini/pkg/typedef" +) + +type Warmup struct { + mutation mutation + logger *zap.Logger + stopFlag *stop.Flag + failFast bool + verbose bool +} + +func NewWarmup( + logger *zap.Logger, + schema *typedef.Schema, + table *typedef.Table, + store store.Store, + partitionRangeConfig *typedef.PartitionRangeConfig, + globalStatus *status.GlobalStatus, + stopFlag *stop.Flag, + failFast bool, + verbose bool, +) *Warmup { + return &Warmup{ + logger: logger.Named("mutation"), + mutation: mutation{ + logger: logger.Named("mutation-without-deletes"), + schema: schema, + table: table, + store: store, + partitionRangeConfig: partitionRangeConfig, + globalStatus: globalStatus, + deletes: false, + }, + stopFlag: stopFlag, + failFast: failFast, + verbose: verbose, + } +} + +func (w *Warmup) Name() string { + return "Warmup" +} + +func (w *Warmup) Do(ctx context.Context, generator generators.Interface) error { + w.logger.Info("starting warmup loop") + defer w.logger.Info("ending warmup loop") + + for { + if w.stopFlag.IsHardOrSoft() { + w.logger.Debug("warmup job terminated") + return nil + } + + _ = w.mutation.Statement(ctx, generator) + if w.failFast && w.mutation.globalStatus.HasErrors() { + w.stopFlag.SetSoft(true) + return nil + } + } +} diff --git a/pkg/stop/flag.go b/pkg/stop/flag.go index 54c6e1f2..9ce08624 100644 --- a/pkg/stop/flag.go +++ b/pkg/stop/flag.go @@ -143,8 +143,8 @@ func (s *Flag) IsHardOrSoft() bool { func (s *Flag) AddHandler(handler func(signal uint32)) { s.stopHandlers.Append(handler) val := s.val.Load() - switch val { - case SignalSoftStop, SignalHardStop: + + if val == SignalSoftStop || val == SignalHardStop { handler(val) } } diff --git a/pkg/typedef/table.go b/pkg/typedef/table.go index 1af82947..2479d58b 100644 --- a/pkg/typedef/table.go +++ b/pkg/typedef/table.go @@ -105,14 +105,13 @@ func (t *Table) ValidColumnsForDelete() Columns { } } } - if len(t.MaterializedViews) != 0 { - for _, mv := range t.MaterializedViews { - if mv.HaveNonPrimaryKey() { - for j := range validCols { - if validCols[j].Name == mv.NonPrimaryKey.Name { - validCols = append(validCols[:j], validCols[j+1:]...) - break - } + + for _, mv := range t.MaterializedViews { + if mv.HaveNonPrimaryKey() { + for j := range validCols { + if validCols[j].Name == mv.NonPrimaryKey.Name { + validCols = append(validCols[:j], validCols[j+1:]...) + break } } } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 0b775d48..4f3ab17a 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -17,6 +17,7 @@ package utils import ( "encoding/hex" "fmt" + "github.com/pkg/errors" "strconv" "strings" "time" @@ -107,3 +108,12 @@ func UUIDFromTime(rnd *rand.Rand) string { } return gocql.UUIDFromTime(RandDate(rnd)).String() } + +func UnwrapErr(err error) error { + nextErr := err + for nextErr != nil { + err = nextErr + nextErr = errors.Unwrap(err) + } + return err +}