diff --git a/pkg/activerecord/cluster.go b/pkg/activerecord/cluster.go index 5399da3..bdac67f 100644 --- a/pkg/activerecord/cluster.go +++ b/pkg/activerecord/cluster.go @@ -51,74 +51,50 @@ type ShardInstance struct { Offline bool } -func (s *ShardInstance) IsOffline() bool { - return s.Offline -} - // Структура описывающая конкретный шард. Каждый шард может состоять из набора мастеров и реплик type Shard struct { Masters []ShardInstance Replicas []ShardInstance + Offlines []ShardInstance curMaster int32 curReplica int32 } // Функция выбирающая следующий инстанс мастера в конкретном шарде func (s *Shard) NextMaster() ShardInstance { - masters := Online(s.Masters) - length := len(masters) + length := len(s.Masters) switch length { case 0: panic("no master configured") case 1: - return masters[0] + return s.Masters[0] } newVal := atomic.AddInt32(&s.curMaster, 1) - newValMod := newVal % int32(len(masters)) + newValMod := newVal % int32(len(s.Masters)) if newValMod != newVal { atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod) } - return masters[newValMod] -} - -func Online(shards []ShardInstance) []ShardInstance { - ret := make([]ShardInstance, 0, len(shards)) - for _, replica := range shards { - if replica.Offline { - continue - } - - ret = append(ret, replica) - } - - return ret + return s.Masters[newValMod] } // Инстанс выбирающий конкретный инстанс реплики в конкретном шарде func (s *Shard) NextReplica() ShardInstance { - replicas := Online(s.Replicas) - length := len(replicas) + length := len(s.Replicas) if length == 1 { - return replicas[0] + return s.Replicas[0] } newVal := atomic.AddInt32(&s.curReplica, 1) - newValMod := newVal % int32(len(replicas)) + newValMod := newVal % int32(len(s.Replicas)) if newValMod != newVal { atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod) } - return replicas[newValMod] -} - -// Тип описывающий кластер. Сейчас кластер - это набор шардов. -type Cluster struct { - m sync.RWMutex - shards []Shard + return s.Replicas[newValMod] } type ClusterConfigParameters struct { @@ -143,6 +119,12 @@ func (c ClusterConfigParameters) Validate() bool { return c.optionCreator != nil && c.optionChecker != nil && c.globs.PoolSize > 0 } +// Тип описывающий кластер. Сейчас кластер - это набор шардов. +type Cluster struct { + m sync.RWMutex + shards []Shard +} + func NewCluster(shardCnt int) *Cluster { return &Cluster{ m: sync.RWMutex{}, @@ -150,6 +132,7 @@ func NewCluster(shardCnt int) *Cluster { } } +// NextMaster выбирает следующий доступный инстанс мастера в шарде shardNum func (c *Cluster) NextMaster(shardNum int) ShardInstance { c.m.RLock() defer c.m.RUnlock() @@ -157,6 +140,7 @@ func (c *Cluster) NextMaster(shardNum int) ShardInstance { return c.shards[shardNum].NextMaster() } +// NextMaster выбирает следующий доступный инстанс реплики в шарде shardNum func (c *Cluster) NextReplica(shardNum int) ShardInstance { c.m.RLock() defer c.m.RUnlock() @@ -164,13 +148,15 @@ func (c *Cluster) NextReplica(shardNum int) ShardInstance { return c.shards[shardNum].NextReplica() } -func (c *Cluster) OnlineReplicas(i int) []ShardInstance { +// HasReplicas наличие доступных инстансов реплик в шарде shardNum +func (c *Cluster) HasReplicas(shard int) bool { c.m.RLock() defer c.m.RUnlock() - return Online(c.shards[i].Replicas) + return len(c.shards[shard].Replicas) > 0 } +// append добавляет новый шард в кластер func (c *Cluster) append(shard Shard) { c.m.Lock() defer c.m.Unlock() @@ -178,14 +164,22 @@ func (c *Cluster) append(shard Shard) { c.shards = append(c.shards, shard) } -func (c *Cluster) Shard(i int) *Shard { - c.m.RLock() - defer c.m.RUnlock() +// ShardInstances копия всех инстансов в шарде shardNum +func (c *Cluster) ShardInstances(shardNum int) []ShardInstance { + c.m.Lock() + defer c.m.Unlock() + + shard := c.shards[shardNum] + instances := make([]ShardInstance, 0, len(shard.Offlines)+len(shard.Masters)+len(shard.Replicas)) + instances = append(instances, shard.Offlines...) + instances = append(instances, shard.Masters...) + instances = append(instances, shard.Replicas...) - return &c.shards[i] + return instances } -func (c *Cluster) Len() int { +// Shards кол-во доступных шард кластера +func (c *Cluster) Shards() int { if c == nil { return 0 } @@ -193,6 +187,7 @@ func (c *Cluster) Len() int { return len(c.shards) } +// setShardInstances заменяет инстансы кластера в шарде shardNum на инстансы из instances func (c *Cluster) setShardInstances(shardNum int, instances []ShardInstance) { c.m.Lock() defer c.m.Unlock() @@ -200,7 +195,13 @@ func (c *Cluster) setShardInstances(shardNum int, instances []ShardInstance) { shard := c.shards[shardNum] shard.Masters = shard.Masters[:0] shard.Replicas = shard.Replicas[:0] + shard.Offlines = shard.Offlines[:0] for _, shardInstance := range instances { + if shardInstance.Offline { + shard.Offlines = append(shard.Offlines, shardInstance) + continue + } + switch shardInstance.Config.Mode { case ModeMaster: shard.Masters = append(shard.Masters, shardInstance) @@ -223,11 +224,13 @@ type MapGlobParam struct { // сколько шардов, столько и опций. Используется в случаях, когда информация по кластеру прописана // непосредственно в декларации модели, а не в конфиге. // Так же используется при тестировании. -func NewClusterInfo(opts ...clusterOption) Cluster { - cl := Cluster{} +func NewClusterInfo(opts ...clusterOption) *Cluster { + cl := &Cluster{ + m: sync.RWMutex{}, + } for _, opt := range opts { - opt.apply(&cl) + opt.apply(cl) } return cl @@ -432,16 +435,14 @@ func (cc *DefaultConfigCacher) Actualize(ctx context.Context, path string, param return nil, fmt.Errorf("can't load cluster info: %w", err) } - for i := 0; i < clusterConfig.Len(); i++ { - shard := clusterConfig.Shard(i) - + for i := 0; i < clusterConfig.Shards(); i++ { var instances []ShardInstance eg := &errgroup.Group{} instancesCh := make(chan ShardInstance) - for _, si := range append(shard.Masters, shard.Replicas...) { + for _, si := range clusterConfig.ShardInstances(i) { si := si eg.Go(func() error { opts, connErr := params.optionChecker(ctx, si) diff --git a/pkg/activerecord/connection_w_test.go b/pkg/activerecord/connection_w_test.go index ff98621..026af1f 100644 --- a/pkg/activerecord/connection_w_test.go +++ b/pkg/activerecord/connection_w_test.go @@ -61,7 +61,7 @@ func Test_connectionPool_Add(t *testing.T) { { name: "first connection", args: args{ - shard: clusterInfo.Shard(0).NextMaster(), + shard: clusterInfo.NextMaster(0), connector: connectorFunc, }, wantErr: false, @@ -71,7 +71,7 @@ func Test_connectionPool_Add(t *testing.T) { { name: "again first connection", args: args{ - shard: clusterInfo.Shard(0).NextMaster(), + shard: clusterInfo.NextMaster(0), connector: connectorFunc, }, wantErr: true, diff --git a/pkg/activerecord/pinger.go b/pkg/activerecord/pinger.go index bccb0f3..8a64643 100644 --- a/pkg/activerecord/pinger.go +++ b/pkg/activerecord/pinger.go @@ -105,9 +105,9 @@ func (p *Pinger) StartWatch(ctx context.Context) { p.logger.Info(ctx, "starting ping") for cfgPath, params := range p.clusterParams { - clusterConf, err := p.clusterConfig().Actualize(ctx, cfgPath, params) - if err != nil { - p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, err)) + clusterConf, e := p.clusterConfig().Actualize(ctx, cfgPath, params) + if e != nil { + p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, e)) } p.collectInfo(ctx, cfgPath, clusterConf) @@ -186,9 +186,8 @@ func (p *Pinger) collectInfo(ctx context.Context, path string, clusterConf *Clus shardInstances := make([]ShardInstance, 0, len(p.instances[path])) - for i := 0; i < clusterConf.Len(); i++ { - shard := clusterConf.Shard(i) - instances := append(shard.Masters, shard.Replicas...) + for i := 0; i < clusterConf.Shards(); i++ { + instances := clusterConf.ShardInstances(i) for _, instance := range instances { if !instance.Offline { continue diff --git a/pkg/octopus/box.go b/pkg/octopus/box.go index ee5c0e8..222324d 100644 --- a/pkg/octopus/box.go +++ b/pkg/octopus/box.go @@ -46,21 +46,21 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType return nil, fmt.Errorf("can't get cluster %s info: %w", configPath, err) } - if clusterInfo.Len() < shard { - return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Len()) + if clusterInfo.Shards() < shard { + return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Shards()) } var configBox activerecord.ShardInstance switch instType { case activerecord.ReplicaInstanceType: - if len(clusterInfo.OnlineReplicas(shard)) == 0 { + if !clusterInfo.HasReplicas(shard) { return nil, fmt.Errorf("replicas not set") } configBox = clusterInfo.NextReplica(shard) case activerecord.ReplicaOrMasterInstanceType: - if len(clusterInfo.OnlineReplicas(shard)) != 0 { + if clusterInfo.HasReplicas(shard) { configBox = clusterInfo.NextReplica(shard) break }