Skip to content

Commit

Permalink
#28 refactoring to simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
ebirukov committed Jan 12, 2024
1 parent 76098c4 commit 204f1fb
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 117 deletions.
2 changes: 1 addition & 1 deletion pkg/activerecord/activerecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type ClusterCheckerInterface interface {

type ConfigCacherInterface interface {
Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error)
Actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error)
UpdateConfig(ctx context.Context, path string, clusterConfig *Cluster) error
}

type SerializerInterface interface {
Expand Down
176 changes: 71 additions & 105 deletions pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package activerecord
import (
"context"
"fmt"
"math/rand"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"golang.org/x/sync/errgroup"
)

// Интерфейс которому должен соответствовать билдер опций подключения к конретному инстансу
Expand Down Expand Up @@ -55,52 +54,85 @@ type ShardInstance struct {
type Shard struct {
Masters []ShardInstance
Replicas []ShardInstance
Offlines []ShardInstance
curMaster int32
curReplica int32
}

// Функция выбирающая следующий инстанс мастера в конкретном шарде
// Функция выбирающая следующий доступный инстанс мастера в конкретном шарде
func (s *Shard) NextMaster() ShardInstance {
length := len(s.Masters)
switch length {
case 0:
panic("no master configured")
case 1:
return s.Masters[0]
master := s.Masters[0]
if master.Offline {
panic("no available master")
}

return master
}

newVal := atomic.AddInt32(&s.curMaster, 1)
newValMod := newVal % int32(len(s.Masters))
// Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел
attempt := 10 * length

for i := 0; i < attempt; i++ {
newVal := atomic.AddInt32(&s.curMaster, 1)
newValMod := newVal % int32(len(s.Masters))

if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod)
if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod)
}

master := s.Masters[newValMod]
if master.Offline {
continue
}

return master
}

return s.Masters[newValMod]
//nolint:gosec
// Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов можно залипнуть на доступном узле
// Чтобы не паниковать выбираем рандомный узел
return s.Masters[rand.Int()%length]
}

// Инстанс выбирающий конкретный инстанс реплики в конкретном шарде
// Инстанс выбирающий следующий доступный инстанс реплики в конкретном шарде
func (s *Shard) NextReplica() ShardInstance {
length := len(s.Replicas)
if length == 1 {
if length == 1 && !s.Replicas[0].Offline {
return s.Replicas[0]
}

newVal := atomic.AddInt32(&s.curReplica, 1)
newValMod := newVal % int32(len(s.Replicas))
// Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел
attempt := 10 * length

for i := 0; i < attempt; i++ {
newVal := atomic.AddInt32(&s.curReplica, 1)
newValMod := newVal % int32(len(s.Replicas))

if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod)
}

replica := s.Replicas[newValMod]
if replica.Offline {
continue
}

if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod)
return replica
}

return s.Replicas[newValMod]
//nolint:gosec
// Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов поиск может залипнуть на недоступном узле
// Чтобы не паниковать выбираем рандомный узел
return s.Replicas[rand.Int()%length]
}

// Instances копия всех инстансов шарды
func (c *Shard) Instances() []ShardInstance {
instances := make([]ShardInstance, 0, len(c.Offlines)+len(c.Masters)+len(c.Replicas))
instances = append(instances, c.Offlines...)
instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas))
instances = append(instances, c.Masters...)
instances = append(instances, c.Replicas...)

Expand All @@ -109,11 +141,6 @@ func (c *Shard) Instances() []ShardInstance {

// Append добавляет инстанс в шарду
func (c *Shard) Append(instance ShardInstance) {
if instance.Offline {
c.Offlines = append(c.Offlines, instance)
return
}

switch instance.Config.Mode {
case ModeMaster:
c.Masters = append(c.Masters, instance)
Expand All @@ -124,7 +151,7 @@ func (c *Shard) Append(instance ShardInstance) {

// Equal проверяет что инстансы в instances эквивалентны инстансам в шарде
func (c *Shard) Equal(instances []ShardInstance) bool {
if len(c.Masters)+len(c.Replicas)+len(c.Offlines) != len(instances) {
if len(c.Masters)+len(c.Replicas) != len(instances) {
return false
}

Expand All @@ -133,20 +160,17 @@ func (c *Shard) Equal(instances []ShardInstance) bool {
m[instance.ParamsID] = instance
}

for _, online := range append(c.Masters, c.Replicas...) {
instance, ok := m[online.ParamsID]
if !ok || instance.Offline {
for _, cur := range append(c.Masters, c.Replicas...) {
instance, ok := m[cur.ParamsID]
if !ok {
return false
}

if instance.Config.Mode != online.Config.Mode {
if instance.Offline != cur.Offline {
return false
}
}

for _, replica := range c.Offlines {
instance, ok := m[replica.ParamsID]
if !ok || !instance.Offline {
if instance.Config.Mode != cur.Config.Mode {
return false
}
}
Expand Down Expand Up @@ -198,19 +222,20 @@ func (c *Cluster) NextMaster(shardNum int) ShardInstance {
}

// NextMaster выбирает следующий доступный инстанс реплики в шарде shardNum
func (c *Cluster) NextReplica(shardNum int) ShardInstance {
func (c *Cluster) NextReplica(shardNum int) (ShardInstance, bool) {
c.m.RLock()
defer c.m.RUnlock()

return c.shards[shardNum].NextReplica()
}
for _, replica := range c.shards[shardNum].Replicas {
if replica.Offline {
continue
}

// HasReplicas наличие доступных инстансов реплик в шарде shardNum
func (c *Cluster) HasReplicas(shard int) bool {
c.m.RLock()
defer c.m.RUnlock()
return c.shards[shardNum].NextReplica(), true
}

return ShardInstance{}, false

return len(c.shards[shard].Replicas) > 0
}

// Append добавляет новый шард в кластер
Expand Down Expand Up @@ -246,13 +271,7 @@ func (c *Cluster) setShardInstances(shardNum int, instances []ShardInstance) {
shard := c.shards[shardNum]
shard.Masters = shard.Masters[:0]
shard.Replicas = shard.Replicas[:0]
shard.Offlines = shard.Offlines[:0]
for _, shardInstance := range instances {
if shardInstance.Offline {
shard.Offlines = append(shard.Offlines, shardInstance)
continue
}

switch shardInstance.Config.Mode {
case ModeMaster:
shard.Masters = append(shard.Masters, shardInstance)
Expand Down Expand Up @@ -478,65 +497,12 @@ func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string,
return conf, nil
}

// Актуализирует конфигурацию кластера path, синхронизируя состояние конфигурации узлов кластера с серверной стороной (тип узла и его доступность)
// Проверка типа и доступности узлов выполняется с помощью функции instanceChecker
func (cc *DefaultConfigCacher) Actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) {
clusterConfig, err := cc.Get(ctx, path, params.globs, params.optionCreator)
if err != nil {
return nil, fmt.Errorf("can't load cluster info: %w", err)
}

for i := 0; i < clusterConfig.Shards(); i++ {
var actualShard Shard

eg := &errgroup.Group{}

instancesCh := make(chan ShardInstance)

curInstances := clusterConfig.ShardInstances(i)

for _, si := range curInstances {
si := si
eg.Go(func() error {
opts, connErr := params.optionChecker(ctx, si)

if opts != nil {
si.Config.Mode = opts.InstanceMode()
}

instancesCh <- ShardInstance{
ParamsID: si.ParamsID,
Config: si.Config,
Options: si.Options,
Offline: connErr != nil,
}

return nil
})
}

egAcc := &errgroup.Group{}

egAcc.Go(func() error {
for instance := range instancesCh {
actualShard.Append(instance)
}

return nil
})

_ = eg.Wait()
close(instancesCh)
_ = egAcc.Wait()

if !actualShard.Equal(curInstances) {
clusterConfig.setShardInstances(i, actualShard.Instances())
}
}

// UpdateClusterConfig Актуализирует конфигурацию кластера path
func (cc *DefaultConfigCacher) UpdateConfig(ctx context.Context, path string, clusterConfig *Cluster) error {
cc.lock.Lock()
defer cc.lock.Unlock()

cc.container[path] = clusterConfig
cc.lock.Unlock()

return clusterConfig, nil
return nil
}
75 changes: 71 additions & 4 deletions pkg/activerecord/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func WithPingInterval(interval time.Duration) OptionPinger {
})
}

func WithStart() OptionPinger {
func WithStart(ctx context.Context) OptionPinger {
return OptionPingerFunc(func(p *Pinger) {
p.StartWatch(p.ctx)
p.StartWatch(ctx)
})
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func (p *Pinger) StartWatch(ctx context.Context) {
p.logger.Info(p.ctx, "starting ping")

for cfgPath, params := range p.clusterParams {
clusterConf, e := p.clusterConfig().Actualize(p.ctx, cfgPath, params)
clusterConf, e := p.actualize(p.ctx, cfgPath, params)
if e != nil {
p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, e))
}
Expand All @@ -121,6 +121,69 @@ func (p *Pinger) StartWatch(ctx context.Context) {
p.started = true
}

// Актуализирует конфигурацию кластера path, синхронизируя состояние конфигурации узлов кластера с серверной стороной (тип узла и его доступность)
// Проверка типа и доступности узлов выполняется с помощью функции instanceChecker
func (p *Pinger) actualize(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) {
clusterConfig, err := p.clusterConfig().Get(ctx, path, params.globs, params.optionCreator)
if err != nil {
return nil, fmt.Errorf("can't load cluster info: %w", err)
}

for i := 0; i < clusterConfig.Shards(); i++ {
var actualShard Shard

eg := &errgroup.Group{}

instancesCh := make(chan ShardInstance)

curInstances := clusterConfig.ShardInstances(i)

for _, si := range curInstances {
si := si
eg.Go(func() error {
opts, connErr := params.optionChecker(ctx, si)

if opts != nil {
si.Config.Mode = opts.InstanceMode()
}

instancesCh <- ShardInstance{
ParamsID: si.ParamsID,
Config: si.Config,
Options: si.Options,
Offline: connErr != nil,
}

return nil
})
}

egAcc := &errgroup.Group{}

egAcc.Go(func() error {
for instance := range instancesCh {
actualShard.Append(instance)
}

return nil
})

_ = eg.Wait()
close(instancesCh)
_ = egAcc.Wait()

if !actualShard.Equal(curInstances) {
clusterConfig.setShardInstances(i, actualShard.Instances())
}
}

if err := p.clusterConfig().UpdateConfig(ctx, path, clusterConfig); err != nil {
return nil, err
}

return clusterConfig, nil
}

func (p *Pinger) StopWatch() error {
if !p.isStarted() {
return nil
Expand Down Expand Up @@ -155,11 +218,15 @@ func (p *Pinger) AddClusterChecker(ctx context.Context, path string, params Clus

// если пингер для конфигурации кластер не зарегистрировался ранее (конфигурация загружена впервые)
// актуализируем конфигурацию кластера
clusterConf, err := p.clusterConfig().Actualize(ctx, path, params)
clusterConf, err := p.actualize(ctx, path, params)
if err != nil {
return nil, err
}

if err := p.clusterConfig().UpdateConfig(ctx, path, clusterConf); err != nil {
return nil, err
}

p.collectInfo(ctx, path, clusterConf)

p.StartWatch(ctx)
Expand Down
Loading

0 comments on commit 204f1fb

Please sign in to comment.