From 204f1fbef56b6190dc0aae99b62fd8a497941331 Mon Sep 17 00:00:00 2001 From: ebirukov <15173395+ebirukov@users.noreply.github.com> Date: Fri, 12 Jan 2024 11:24:38 +0300 Subject: [PATCH] #28 refactoring to simplify --- pkg/activerecord/activerecord.go | 2 +- pkg/activerecord/cluster.go | 176 +++++++++++++------------------ pkg/activerecord/pinger.go | 75 ++++++++++++- pkg/activerecord/pinger_test.go | 2 +- pkg/octopus/box.go | 14 +-- 5 files changed, 152 insertions(+), 117 deletions(-) diff --git a/pkg/activerecord/activerecord.go b/pkg/activerecord/activerecord.go index fb54e1a..ac753ff 100644 --- a/pkg/activerecord/activerecord.go +++ b/pkg/activerecord/activerecord.go @@ -97,7 +97,7 @@ type ClusterCheckerInterface interface { type ConfigCacherInterface interface { Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) - Actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) + UpdateConfig(ctx context.Context, path string, clusterConfig *Cluster) error } type SerializerInterface interface { diff --git a/pkg/activerecord/cluster.go b/pkg/activerecord/cluster.go index 676c545..2c9da3a 100644 --- a/pkg/activerecord/cluster.go +++ b/pkg/activerecord/cluster.go @@ -3,13 +3,12 @@ package activerecord import ( "context" "fmt" + "math/rand" "strconv" "strings" "sync" "sync/atomic" "time" - - "golang.org/x/sync/errgroup" ) // Интерфейс которому должен соответствовать билдер опций подключения к конретному инстансу @@ -55,52 +54,85 @@ type ShardInstance struct { type Shard struct { Masters []ShardInstance Replicas []ShardInstance - Offlines []ShardInstance curMaster int32 curReplica int32 } -// Функция выбирающая следующий инстанс мастера в конкретном шарде +// Функция выбирающая следующий доступный инстанс мастера в конкретном шарде func (s *Shard) NextMaster() ShardInstance { length := len(s.Masters) switch length { case 0: panic("no master configured") case 1: - return s.Masters[0] + master := s.Masters[0] + if master.Offline { + panic("no available master") + } + + return master } - newVal := atomic.AddInt32(&s.curMaster, 1) - newValMod := newVal % int32(len(s.Masters)) + // Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел + attempt := 10 * length + + for i := 0; i < attempt; i++ { + newVal := atomic.AddInt32(&s.curMaster, 1) + newValMod := newVal % int32(len(s.Masters)) - if newValMod != newVal { - atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod) + if newValMod != newVal { + atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod) + } + + master := s.Masters[newValMod] + if master.Offline { + continue + } + + return master } - return s.Masters[newValMod] + //nolint:gosec + // Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов можно залипнуть на доступном узле + // Чтобы не паниковать выбираем рандомный узел + return s.Masters[rand.Int()%length] } -// Инстанс выбирающий конкретный инстанс реплики в конкретном шарде +// Инстанс выбирающий следующий доступный инстанс реплики в конкретном шарде func (s *Shard) NextReplica() ShardInstance { length := len(s.Replicas) - if length == 1 { + if length == 1 && !s.Replicas[0].Offline { return s.Replicas[0] } - newVal := atomic.AddInt32(&s.curReplica, 1) - newValMod := newVal % int32(len(s.Replicas)) + // Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел + attempt := 10 * length + + for i := 0; i < attempt; i++ { + newVal := atomic.AddInt32(&s.curReplica, 1) + newValMod := newVal % int32(len(s.Replicas)) + + if newValMod != newVal { + atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod) + } + + replica := s.Replicas[newValMod] + if replica.Offline { + continue + } - if newValMod != newVal { - atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod) + return replica } - return s.Replicas[newValMod] + //nolint:gosec + // Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов поиск может залипнуть на недоступном узле + // Чтобы не паниковать выбираем рандомный узел + return s.Replicas[rand.Int()%length] } // Instances копия всех инстансов шарды func (c *Shard) Instances() []ShardInstance { - instances := make([]ShardInstance, 0, len(c.Offlines)+len(c.Masters)+len(c.Replicas)) - instances = append(instances, c.Offlines...) + instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas)) instances = append(instances, c.Masters...) instances = append(instances, c.Replicas...) @@ -109,11 +141,6 @@ func (c *Shard) Instances() []ShardInstance { // Append добавляет инстанс в шарду func (c *Shard) Append(instance ShardInstance) { - if instance.Offline { - c.Offlines = append(c.Offlines, instance) - return - } - switch instance.Config.Mode { case ModeMaster: c.Masters = append(c.Masters, instance) @@ -124,7 +151,7 @@ func (c *Shard) Append(instance ShardInstance) { // Equal проверяет что инстансы в instances эквивалентны инстансам в шарде func (c *Shard) Equal(instances []ShardInstance) bool { - if len(c.Masters)+len(c.Replicas)+len(c.Offlines) != len(instances) { + if len(c.Masters)+len(c.Replicas) != len(instances) { return false } @@ -133,20 +160,17 @@ func (c *Shard) Equal(instances []ShardInstance) bool { m[instance.ParamsID] = instance } - for _, online := range append(c.Masters, c.Replicas...) { - instance, ok := m[online.ParamsID] - if !ok || instance.Offline { + for _, cur := range append(c.Masters, c.Replicas...) { + instance, ok := m[cur.ParamsID] + if !ok { return false } - if instance.Config.Mode != online.Config.Mode { + if instance.Offline != cur.Offline { return false } - } - for _, replica := range c.Offlines { - instance, ok := m[replica.ParamsID] - if !ok || !instance.Offline { + if instance.Config.Mode != cur.Config.Mode { return false } } @@ -198,19 +222,20 @@ func (c *Cluster) NextMaster(shardNum int) ShardInstance { } // NextMaster выбирает следующий доступный инстанс реплики в шарде shardNum -func (c *Cluster) NextReplica(shardNum int) ShardInstance { +func (c *Cluster) NextReplica(shardNum int) (ShardInstance, bool) { c.m.RLock() defer c.m.RUnlock() - return c.shards[shardNum].NextReplica() -} + for _, replica := range c.shards[shardNum].Replicas { + if replica.Offline { + continue + } -// HasReplicas наличие доступных инстансов реплик в шарде shardNum -func (c *Cluster) HasReplicas(shard int) bool { - c.m.RLock() - defer c.m.RUnlock() + return c.shards[shardNum].NextReplica(), true + } + + return ShardInstance{}, false - return len(c.shards[shard].Replicas) > 0 } // Append добавляет новый шард в кластер @@ -246,13 +271,7 @@ func (c *Cluster) setShardInstances(shardNum int, instances []ShardInstance) { shard := c.shards[shardNum] shard.Masters = shard.Masters[:0] shard.Replicas = shard.Replicas[:0] - shard.Offlines = shard.Offlines[:0] for _, shardInstance := range instances { - if shardInstance.Offline { - shard.Offlines = append(shard.Offlines, shardInstance) - continue - } - switch shardInstance.Config.Mode { case ModeMaster: shard.Masters = append(shard.Masters, shardInstance) @@ -478,65 +497,12 @@ func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string, return conf, nil } -// Актуализирует конфигурацию кластера path, синхронизируя состояние конфигурации узлов кластера с серверной стороной (тип узла и его доступность) -// Проверка типа и доступности узлов выполняется с помощью функции instanceChecker -func (cc *DefaultConfigCacher) Actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) { - clusterConfig, err := cc.Get(ctx, path, params.globs, params.optionCreator) - if err != nil { - return nil, fmt.Errorf("can't load cluster info: %w", err) - } - - for i := 0; i < clusterConfig.Shards(); i++ { - var actualShard Shard - - eg := &errgroup.Group{} - - instancesCh := make(chan ShardInstance) - - curInstances := clusterConfig.ShardInstances(i) - - for _, si := range curInstances { - si := si - eg.Go(func() error { - opts, connErr := params.optionChecker(ctx, si) - - if opts != nil { - si.Config.Mode = opts.InstanceMode() - } - - instancesCh <- ShardInstance{ - ParamsID: si.ParamsID, - Config: si.Config, - Options: si.Options, - Offline: connErr != nil, - } - - return nil - }) - } - - egAcc := &errgroup.Group{} - - egAcc.Go(func() error { - for instance := range instancesCh { - actualShard.Append(instance) - } - - return nil - }) - - _ = eg.Wait() - close(instancesCh) - _ = egAcc.Wait() - - if !actualShard.Equal(curInstances) { - clusterConfig.setShardInstances(i, actualShard.Instances()) - } - } - +// UpdateClusterConfig Актуализирует конфигурацию кластера path +func (cc *DefaultConfigCacher) UpdateConfig(ctx context.Context, path string, clusterConfig *Cluster) error { cc.lock.Lock() + defer cc.lock.Unlock() + cc.container[path] = clusterConfig - cc.lock.Unlock() - return clusterConfig, nil + return nil } diff --git a/pkg/activerecord/pinger.go b/pkg/activerecord/pinger.go index ecdc6d7..731caa4 100644 --- a/pkg/activerecord/pinger.go +++ b/pkg/activerecord/pinger.go @@ -27,9 +27,9 @@ func WithPingInterval(interval time.Duration) OptionPinger { }) } -func WithStart() OptionPinger { +func WithStart(ctx context.Context) OptionPinger { return OptionPingerFunc(func(p *Pinger) { - p.StartWatch(p.ctx) + p.StartWatch(ctx) }) } @@ -103,7 +103,7 @@ func (p *Pinger) StartWatch(ctx context.Context) { p.logger.Info(p.ctx, "starting ping") for cfgPath, params := range p.clusterParams { - clusterConf, e := p.clusterConfig().Actualize(p.ctx, cfgPath, params) + clusterConf, e := p.actualize(p.ctx, cfgPath, params) if e != nil { p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, e)) } @@ -121,6 +121,69 @@ func (p *Pinger) StartWatch(ctx context.Context) { p.started = true } +// Актуализирует конфигурацию кластера path, синхронизируя состояние конфигурации узлов кластера с серверной стороной (тип узла и его доступность) +// Проверка типа и доступности узлов выполняется с помощью функции instanceChecker +func (p *Pinger) actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) { + clusterConfig, err := p.clusterConfig().Get(ctx, path, params.globs, params.optionCreator) + if err != nil { + return nil, fmt.Errorf("can't load cluster info: %w", err) + } + + for i := 0; i < clusterConfig.Shards(); i++ { + var actualShard Shard + + eg := &errgroup.Group{} + + instancesCh := make(chan ShardInstance) + + curInstances := clusterConfig.ShardInstances(i) + + for _, si := range curInstances { + si := si + eg.Go(func() error { + opts, connErr := params.optionChecker(ctx, si) + + if opts != nil { + si.Config.Mode = opts.InstanceMode() + } + + instancesCh <- ShardInstance{ + ParamsID: si.ParamsID, + Config: si.Config, + Options: si.Options, + Offline: connErr != nil, + } + + return nil + }) + } + + egAcc := &errgroup.Group{} + + egAcc.Go(func() error { + for instance := range instancesCh { + actualShard.Append(instance) + } + + return nil + }) + + _ = eg.Wait() + close(instancesCh) + _ = egAcc.Wait() + + if !actualShard.Equal(curInstances) { + clusterConfig.setShardInstances(i, actualShard.Instances()) + } + } + + if err := p.clusterConfig().UpdateConfig(ctx, path, clusterConfig); err != nil { + return nil, err + } + + return clusterConfig, nil +} + func (p *Pinger) StopWatch() error { if !p.isStarted() { return nil @@ -155,11 +218,15 @@ func (p *Pinger) AddClusterChecker(ctx context.Context, path string, params Clus // если пингер для конфигурации кластер не зарегистрировался ранее (конфигурация загружена впервые) // актуализируем конфигурацию кластера - clusterConf, err := p.clusterConfig().Actualize(ctx, path, params) + clusterConf, err := p.actualize(ctx, path, params) if err != nil { return nil, err } + if err := p.clusterConfig().UpdateConfig(ctx, path, clusterConf); err != nil { + return nil, err + } + p.collectInfo(ctx, path, clusterConf) p.StartWatch(ctx) diff --git a/pkg/activerecord/pinger_test.go b/pkg/activerecord/pinger_test.go index 3446f21..e709ffd 100644 --- a/pkg/activerecord/pinger_test.go +++ b/pkg/activerecord/pinger_test.go @@ -23,7 +23,7 @@ func TestNewPinger(t *testing.T) { }, { name: "started without pinger funcs", - opts: []OptionPinger{WithPingInterval(time.Microsecond), WithStart()}, + opts: []OptionPinger{WithPingInterval(time.Microsecond), WithStart(context.Background())}, want: true, }, { diff --git a/pkg/octopus/box.go b/pkg/octopus/box.go index d4904c3..686f2bc 100644 --- a/pkg/octopus/box.go +++ b/pkg/octopus/box.go @@ -50,18 +50,20 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Shards()) } - var configBox activerecord.ShardInstance + var ( + configBox activerecord.ShardInstance + ok bool + ) switch instType { case activerecord.ReplicaInstanceType: - if !clusterInfo.HasReplicas(shard) { + configBox, ok = clusterInfo.NextReplica(shard) + if !ok { return nil, fmt.Errorf("replicas not set") } - - configBox = clusterInfo.NextReplica(shard) case activerecord.ReplicaOrMasterInstanceType: - if clusterInfo.HasReplicas(shard) { - configBox = clusterInfo.NextReplica(shard) + configBox, ok = clusterInfo.NextReplica(shard) + if ok { break }