Skip to content

Commit

Permalink
Merge pull request #221 from scylladb/issue_220
Browse files Browse the repository at this point in the history
schema: optionally apply different replication strategy
  • Loading branch information
Henrik Johansson authored Feb 18, 2020
2 parents 2ff1a4c + 0d69f09 commit 07e8843
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 8 deletions.
8 changes: 8 additions & 0 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
warmup time.Duration
compactionStrategy string
replicationStrategy string
oracleReplicationStrategy string
consistency string
maxTables int
maxPartitionKeys int
Expand Down Expand Up @@ -214,6 +215,12 @@ func run(cmd *cobra.Command, args []string) error {
}
}
}

testKeyspace, oracleKeyspace := schema.GetCreateKeyspaces()
if err := store.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil {
return errors.Wrap(err, "unable to create keyspace")
}

for _, stmt := range schema.GetCreateSchema() {
logger.Debug(stmt)
if err := store.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
Expand Down Expand Up @@ -452,6 +459,7 @@ func init() {
rootCmd.Flags().DurationVarP(&warmup, "warmup", "", 30*time.Second, "Specify the warmup perid as a duration for example 30s or 10h")
rootCmd.Flags().StringVarP(&compactionStrategy, "compaction-strategy", "", "", "Specify the desired CS as either the coded short hand stcs|twcs|lcs to get the default for each type or provide the entire specification in the form {'class':'....'}")
rootCmd.Flags().StringVarP(&replicationStrategy, "replication-strategy", "", "simple", "Specify the desired replication strategy as either the coded short hand simple|network to get the default for each type or provide the entire specification in the form {'class':'....'}")
rootCmd.Flags().StringVarP(&oracleReplicationStrategy, "oracle-replication-strategy", "", "simple", "Specify the desired replication strategy of the oracle cluster as either the coded short hand simple|network to get the default for each type or provide the entire specification in the form {'class':'....'}")
rootCmd.Flags().StringVarP(&consistency, "consistency", "", "QUORUM", "Specify the desired consistency as ANY|ONE|TWO|THREE|QUORUM|LOCAL_QUORUM|EACH_QUORUM|LOCAL_ONE")
rootCmd.Flags().IntVarP(&maxTables, "max-tables", "", 1, "Maximum number of generated tables")
rootCmd.Flags().IntVarP(&maxPartitionKeys, "max-partition-keys", "", 6, "Maximum number of generated partition keys")
Expand Down
6 changes: 5 additions & 1 deletion cmd/gemini/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func createSchemaConfig(logger *zap.Logger) gemini.SchemaConfig {
return gemini.SchemaConfig{
CompactionStrategy: defaultConfig.CompactionStrategy,
ReplicationStrategy: defaultConfig.ReplicationStrategy,
OracleReplicationStrategy: defaultConfig.OracleReplicationStrategy,
MaxTables: defaultConfig.MaxTables,
MaxPartitionKeys: defaultConfig.MaxPartitionKeys,
MinPartitionKeys: defaultConfig.MinPartitionKeys,
Expand Down Expand Up @@ -59,9 +60,12 @@ func createDefaultSchemaConfig(logger *zap.Logger) gemini.SchemaConfig {
MaxTupleParts = 20
MaxUDTParts = 20
)
rs := getReplicationStrategy(replicationStrategy, replication.NewSimpleStrategy(), logger)
ors := getReplicationStrategy(oracleReplicationStrategy, rs, logger)
return gemini.SchemaConfig{
CompactionStrategy: getCompactionStrategy(compactionStrategy, logger),
ReplicationStrategy: getReplicationStrategy(replicationStrategy, replication.NewSimpleStrategy(), logger),
ReplicationStrategy: rs,
OracleReplicationStrategy: ors,
MaxTables: maxTables,
MaxPartitionKeys: maxPartitionKeys,
MinPartitionKeys: minPartitionKeys,
Expand Down
21 changes: 14 additions & 7 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Value []interface{}
type SchemaConfig struct {
CompactionStrategy *CompactionStrategy
ReplicationStrategy *replication.Replication
OracleReplicationStrategy *replication.Replication
MaxTables int
MaxPartitionKeys int
MinPartitionKeys int
Expand Down Expand Up @@ -110,8 +111,9 @@ func (sc *SchemaConfig) GetMinColumns() int {
}

type Keyspace struct {
Name string `json:"name"`
Replication *replication.Replication `json:"replication"`
Name string `json:"name"`
Replication *replication.Replication `json:"replication"`
OracleReplication *replication.Replication `json:"oracle_replication"`
}

type ColumnDef struct {
Expand Down Expand Up @@ -409,9 +411,11 @@ func (s *Schema) GetDropSchema() []string {
func GenSchema(sc SchemaConfig) *Schema {
builder := NewSchemaBuilder()
keyspace := Keyspace{
Name: "ks1",
Replication: sc.ReplicationStrategy,
Name: "ks1",
Replication: sc.ReplicationStrategy,
OracleReplication: sc.OracleReplicationStrategy,
}
fmt.Println(keyspace)
builder.Keyspace(keyspace)
numTables := 1 + rand.Intn(sc.GetMaxTables())
for i := 0; i < numTables; i++ {
Expand Down Expand Up @@ -554,10 +558,13 @@ func randomCompactionStrategy() *CompactionStrategy {
}
}

func (s *Schema) GetCreateSchema() []string {
createKeyspace := fmt.Sprintf("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s", s.Keyspace.Name, s.Keyspace.Replication.ToCQL())
func (s *Schema) GetCreateKeyspaces() (string, string) {
return fmt.Sprintf("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s", s.Keyspace.Name, s.Keyspace.Replication.ToCQL()),
fmt.Sprintf("CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s", s.Keyspace.Name, s.Keyspace.OracleReplication.ToCQL())
}

stmts := []string{createKeyspace}
func (s *Schema) GetCreateSchema() []string {
var stmts []string

for _, t := range s.Tables {
createTypes := t.GetCreateTypes(s.Keyspace)
Expand Down
12 changes: 12 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type storeLoader interface {
}

type Store interface {
Create(context.Context, qb.Builder, qb.Builder) error
Mutate(context.Context, qb.Builder, ...interface{}) error
Check(context.Context, *gemini.Table, qb.Builder, ...interface{}) error
Close() error
Expand Down Expand Up @@ -136,6 +137,17 @@ type delegatingStore struct {
logger *zap.Logger
}

func (ds delegatingStore) Create(ctx context.Context, testBuilder qb.Builder, oracleBuilder qb.Builder) error {
ts := time.Now()
if err := mutate(ctx, ds.oracleStore, ts, oracleBuilder, []interface{}{}); err != nil {
return errors.Wrap(err, "oracle failed store creation")
}
if err := mutate(ctx, ds.testStore, ts, testBuilder, []interface{}{}); err != nil {
return errors.Wrap(err, "test failed store creation")
}
return nil
}

func (ds delegatingStore) Mutate(ctx context.Context, builder qb.Builder, values ...interface{}) error {
ts := time.Now()
if err := mutate(ctx, ds.oracleStore, ts, builder, values...); err != nil {
Expand Down

0 comments on commit 07e8843

Please sign in to comment.