Skip to content

Commit

Permalink
update of cluster configuration to be ignored, if configuration is no…
Browse files Browse the repository at this point in the history
…t change (#30)
  • Loading branch information
ebirukov authored Jan 26, 2024
1 parent 557bbbc commit f621efb
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 7 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/gobwas/pool v0.2.1
github.com/google/go-cmp v0.5.9
github.com/mailru/mapstructure v0.0.0-20230117153631-a4140f9ccc45
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.4
Expand All @@ -20,7 +21,6 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
)
50 changes: 44 additions & 6 deletions pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package activerecord

import (
"bytes"
"context"
"fmt"
"hash"
"hash/crc32"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -130,25 +134,37 @@ func (s *Shard) NextReplica() ShardInstance {
return s.Replicas[rand.Int()%length]
}

// Instances копия всех инстансов шарды
// Instances копия списка конфигураций всех инстансов шарды. В начале списка следуют мастера, потом реплики
func (c *Shard) Instances() []ShardInstance {
instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas))
instances = append(instances, c.Masters...)
// сортировка в подсписках чтобы не зависет от порядка в котором инстансы добавлялись в конфигурацию
sort.Slice(instances, func(i, j int) bool {
return instances[i].ParamsID < instances[j].ParamsID
})

instances = append(instances, c.Replicas...)

replicas := instances[len(c.Masters):]
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].ParamsID < replicas[j].ParamsID
})

return instances
}

// Тип описывающий кластер. Сейчас кластер - это набор шардов.
type Cluster struct {
m sync.RWMutex
shards []Shard
hash hash.Hash
}

func NewCluster(shardCnt int) *Cluster {
return &Cluster{
m: sync.RWMutex{},
shards: make([]Shard, 0, shardCnt),
hash: crc32.NewIEEE(),
}
}

Expand Down Expand Up @@ -183,6 +199,13 @@ func (c *Cluster) Append(shard Shard) {
defer c.m.Unlock()

c.shards = append(c.shards, shard)

c.hash.Reset()
for i := 0; i < len(c.shards); i++ {
for _, instance := range c.shards[i].Instances() {
c.hash.Write([]byte(instance.ParamsID))
}
}
}

// ShardInstances копия всех инстансов из шарды shardNum
Expand Down Expand Up @@ -216,7 +239,19 @@ func (c *Cluster) SetShardInstances(shardNum int, instances []ShardInstance) {
}

c.shards[shardNum] = shard
}

// Equal сравнивает загруженные конфигурации кластеров на основе контрольной суммы всех инстансов кластера
func (c *Cluster) Equal(c2 *Cluster) bool {
if c == nil {
return false
}

if c2 == nil {
return false
}

return bytes.Equal(c.hash.Sum(nil), c2.hash.Sum(nil))
}

// Тип используемый для передачи набора значений по умолчанию для параметров
Expand All @@ -230,9 +265,7 @@ type MapGlobParam struct {
// непосредственно в декларации модели, а не в конфиге.
// Так же используется при тестировании.
func NewClusterInfo(opts ...clusterOption) *Cluster {
cl := &Cluster{
m: sync.RWMutex{},
}
cl := NewCluster(0)

for _, opt := range opts {
opt.apply(cl)
Expand Down Expand Up @@ -388,6 +421,7 @@ func NewConfigCacher() *DefaultConfigCacher {
// Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб
// если в кеше нет, то достаём из конфига и кешируем.
func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
curConf := cc.container[path]
if cc.lock.TryLock() {
if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 {
// Очищаем кеш если поменялся конфиг
Expand All @@ -402,13 +436,13 @@ func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGl
cc.lock.RUnlock()

if !ex {
return cc.loadClusterInfo(ctx, path, globs, optionCreator)
return cc.loadClusterInfo(ctx, curConf, path, globs, optionCreator)
}

return conf, nil
}

func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, curConf *Cluster, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
cc.lock.Lock()
defer cc.lock.Unlock()

Expand All @@ -417,6 +451,10 @@ func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string,
return nil, fmt.Errorf("can't get config: %w", err)
}

if conf.Equal(curConf) {
conf = curConf
}

cc.container[path] = conf

return conf, nil
Expand Down
56 changes: 56 additions & 0 deletions pkg/activerecord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,3 +184,59 @@ func TestGetClusterInfoFromCfg(t *testing.T) {
})
}
}

func TestShard_Instances(t *testing.T) {
tests := []struct {
name string
shard Shard
want []ShardInstance
}{
{
name: "unordered sequence",
shard: Shard{
Masters: []ShardInstance{
{
ParamsID: "Master2",
},
{
ParamsID: "Master1",
},
},
Replicas: []ShardInstance{
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
{
ParamsID: "Replica1",
},
},
},
want: []ShardInstance{
{
ParamsID: "Master1",
},
{
ParamsID: "Master2",
},
{
ParamsID: "Replica1",
},
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.shard.Instances()
assert.Check(t, cmp.DeepEqual(tt.want, got))
})
}
}

0 comments on commit f621efb

Please sign in to comment.