diff --git a/go.mod b/go.mod index 14188f3..d0db24b 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/gobwas/pool v0.2.1 + github.com/google/go-cmp v0.5.9 github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.4 @@ -20,7 +21,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect ) diff --git a/pkg/activerecord/cluster.go b/pkg/activerecord/cluster.go index 0671b4a..731dae2 100644 --- a/pkg/activerecord/cluster.go +++ b/pkg/activerecord/cluster.go @@ -1,9 +1,13 @@ package activerecord import ( + "bytes" "context" "fmt" + "hash" + "hash/crc32" "math/rand" + "sort" "strconv" "strings" "sync" @@ -130,12 +134,22 @@ func (s *Shard) NextReplica() ShardInstance { return s.Replicas[rand.Int()%length] } -// Instances копия всех инстансов шарды +// Instances копия списка конфигураций всех инстансов шарды. В начале списка следуют мастера, потом реплики func (c *Shard) Instances() []ShardInstance { instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas)) instances = append(instances, c.Masters...) + // сортировка в подсписках чтобы не зависет от порядка в котором инстансы добавлялись в конфигурацию + sort.Slice(instances, func(i, j int) bool { + return instances[i].ParamsID < instances[j].ParamsID + }) + instances = append(instances, c.Replicas...) + replicas := instances[len(c.Masters):] + sort.Slice(replicas, func(i, j int) bool { + return replicas[i].ParamsID < replicas[j].ParamsID + }) + return instances } @@ -143,12 +157,14 @@ func (c *Shard) Instances() []ShardInstance { type Cluster struct { m sync.RWMutex shards []Shard + hash hash.Hash } func NewCluster(shardCnt int) *Cluster { return &Cluster{ m: sync.RWMutex{}, shards: make([]Shard, 0, shardCnt), + hash: crc32.NewIEEE(), } } @@ -183,6 +199,13 @@ func (c *Cluster) Append(shard Shard) { defer c.m.Unlock() c.shards = append(c.shards, shard) + + c.hash.Reset() + for i := 0; i < len(c.shards); i++ { + for _, instance := range c.shards[i].Instances() { + c.hash.Write([]byte(instance.ParamsID)) + } + } } // ShardInstances копия всех инстансов из шарды shardNum @@ -216,7 +239,19 @@ func (c *Cluster) SetShardInstances(shardNum int, instances []ShardInstance) { } c.shards[shardNum] = shard +} +// Equal сравнивает загруженные конфигурации кластеров на основе контрольной суммы всех инстансов кластера +func (c *Cluster) Equal(c2 *Cluster) bool { + if c == nil { + return false + } + + if c2 == nil { + return false + } + + return bytes.Equal(c.hash.Sum(nil), c2.hash.Sum(nil)) } // Тип используемый для передачи набора значений по умолчанию для параметров @@ -230,9 +265,7 @@ type MapGlobParam struct { // непосредственно в декларации модели, а не в конфиге. // Так же используется при тестировании. func NewClusterInfo(opts ...clusterOption) *Cluster { - cl := &Cluster{ - m: sync.RWMutex{}, - } + cl := NewCluster(0) for _, opt := range opts { opt.apply(cl) @@ -388,6 +421,7 @@ func NewConfigCacher() *DefaultConfigCacher { // Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб // если в кеше нет, то достаём из конфига и кешируем. func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { + curConf := cc.container[path] if cc.lock.TryLock() { if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 { // Очищаем кеш если поменялся конфиг @@ -402,13 +436,13 @@ func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGl cc.lock.RUnlock() if !ex { - return cc.loadClusterInfo(ctx, path, globs, optionCreator) + return cc.loadClusterInfo(ctx, curConf, path, globs, optionCreator) } return conf, nil } -func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { +func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, curConf *Cluster, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { cc.lock.Lock() defer cc.lock.Unlock() @@ -417,6 +451,10 @@ func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string, return nil, fmt.Errorf("can't get config: %w", err) } + if conf.Equal(curConf) { + conf = curConf + } + cc.container[path] = conf return conf, nil diff --git a/pkg/activerecord/cluster_test.go b/pkg/activerecord/cluster_test.go index 67e8082..3106c05 100644 --- a/pkg/activerecord/cluster_test.go +++ b/pkg/activerecord/cluster_test.go @@ -184,3 +184,59 @@ func TestGetClusterInfoFromCfg(t *testing.T) { }) } } + +func TestShard_Instances(t *testing.T) { + tests := []struct { + name string + shard Shard + want []ShardInstance + }{ + { + name: "unordered sequence", + shard: Shard{ + Masters: []ShardInstance{ + { + ParamsID: "Master2", + }, + { + ParamsID: "Master1", + }, + }, + Replicas: []ShardInstance{ + { + ParamsID: "Replica2", + }, + { + ParamsID: "Replica3", + }, + { + ParamsID: "Replica1", + }, + }, + }, + want: []ShardInstance{ + { + ParamsID: "Master1", + }, + { + ParamsID: "Master2", + }, + { + ParamsID: "Replica1", + }, + { + ParamsID: "Replica2", + }, + { + ParamsID: "Replica3", + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.shard.Instances() + assert.Check(t, cmp.DeepEqual(tt.want, got)) + }) + } +}