Skip to content

Commit

Permalink
#28 extract offline cluster instances
Browse files Browse the repository at this point in the history
  • Loading branch information
ebirukov committed Jan 11, 2024
1 parent e1dde12 commit 6934fae
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 59 deletions.
95 changes: 48 additions & 47 deletions pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,74 +51,50 @@ type ShardInstance struct {
Offline bool
}

func (s *ShardInstance) IsOffline() bool {
return s.Offline
}

// Структура описывающая конкретный шард. Каждый шард может состоять из набора мастеров и реплик
type Shard struct {
Masters []ShardInstance
Replicas []ShardInstance
Offlines []ShardInstance
curMaster int32
curReplica int32
}

// Функция выбирающая следующий инстанс мастера в конкретном шарде
func (s *Shard) NextMaster() ShardInstance {
masters := Online(s.Masters)
length := len(masters)
length := len(s.Masters)
switch length {
case 0:
panic("no master configured")
case 1:
return masters[0]
return s.Masters[0]
}

newVal := atomic.AddInt32(&s.curMaster, 1)
newValMod := newVal % int32(len(masters))
newValMod := newVal % int32(len(s.Masters))

if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod)
}

return masters[newValMod]
}

func Online(shards []ShardInstance) []ShardInstance {
ret := make([]ShardInstance, 0, len(shards))
for _, replica := range shards {
if replica.Offline {
continue
}

ret = append(ret, replica)
}

return ret
return s.Masters[newValMod]
}

// Инстанс выбирающий конкретный инстанс реплики в конкретном шарде
func (s *Shard) NextReplica() ShardInstance {
replicas := Online(s.Replicas)
length := len(replicas)
length := len(s.Replicas)
if length == 1 {
return replicas[0]
return s.Replicas[0]
}

newVal := atomic.AddInt32(&s.curReplica, 1)
newValMod := newVal % int32(len(replicas))
newValMod := newVal % int32(len(s.Replicas))

if newValMod != newVal {
atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod)
}

return replicas[newValMod]
}

// Тип описывающий кластер. Сейчас кластер - это набор шардов.
type Cluster struct {
m sync.RWMutex
shards []Shard
return s.Replicas[newValMod]
}

type ClusterConfigParameters struct {
Expand All @@ -143,64 +119,89 @@ func (c ClusterConfigParameters) Validate() bool {
return c.optionCreator != nil && c.optionChecker != nil && c.globs.PoolSize > 0
}

// Тип описывающий кластер. Сейчас кластер - это набор шардов.
type Cluster struct {
m sync.RWMutex
shards []Shard
}

func NewCluster(shardCnt int) *Cluster {
return &Cluster{
m: sync.RWMutex{},
shards: make([]Shard, 0, shardCnt),
}
}

// NextMaster выбирает следующий доступный инстанс мастера в шарде shardNum
func (c *Cluster) NextMaster(shardNum int) ShardInstance {
c.m.RLock()
defer c.m.RUnlock()

return c.shards[shardNum].NextMaster()
}

// NextMaster выбирает следующий доступный инстанс реплики в шарде shardNum
func (c *Cluster) NextReplica(shardNum int) ShardInstance {
c.m.RLock()
defer c.m.RUnlock()

return c.shards[shardNum].NextReplica()
}

func (c *Cluster) OnlineReplicas(i int) []ShardInstance {
// HasReplicas наличие доступных инстансов реплик в шарде shardNum
func (c *Cluster) HasReplicas(shard int) bool {
c.m.RLock()
defer c.m.RUnlock()

return Online(c.shards[i].Replicas)
return len(c.shards[shard].Replicas) > 0
}

// append добавляет новый шард в кластер
func (c *Cluster) append(shard Shard) {
c.m.Lock()
defer c.m.Unlock()

c.shards = append(c.shards, shard)
}

func (c *Cluster) Shard(i int) *Shard {
c.m.RLock()
defer c.m.RUnlock()
// ShardInstances копия всех инстансов в шарде shardNum
func (c *Cluster) ShardInstances(shardNum int) []ShardInstance {
c.m.Lock()
defer c.m.Unlock()

shard := c.shards[shardNum]
instances := make([]ShardInstance, 0, len(shard.Offlines)+len(shard.Masters)+len(shard.Replicas))
instances = append(instances, shard.Offlines...)
instances = append(instances, shard.Masters...)
instances = append(instances, shard.Replicas...)

return &c.shards[i]
return instances
}

func (c *Cluster) Len() int {
// Shards кол-во доступных шард кластера
func (c *Cluster) Shards() int {
if c == nil {
return 0
}

return len(c.shards)
}

// setShardInstances заменяет инстансы кластера в шарде shardNum на инстансы из instances
func (c *Cluster) setShardInstances(shardNum int, instances []ShardInstance) {
c.m.Lock()
defer c.m.Unlock()

shard := c.shards[shardNum]
shard.Masters = shard.Masters[:0]
shard.Replicas = shard.Replicas[:0]
shard.Offlines = shard.Offlines[:0]
for _, shardInstance := range instances {
if shardInstance.Offline {
shard.Offlines = append(shard.Offlines, shardInstance)
continue
}

switch shardInstance.Config.Mode {
case ModeMaster:
shard.Masters = append(shard.Masters, shardInstance)
Expand All @@ -223,11 +224,13 @@ type MapGlobParam struct {
// сколько шардов, столько и опций. Используется в случаях, когда информация по кластеру прописана
// непосредственно в декларации модели, а не в конфиге.
// Так же используется при тестировании.
func NewClusterInfo(opts ...clusterOption) Cluster {
cl := Cluster{}
func NewClusterInfo(opts ...clusterOption) *Cluster {
cl := &Cluster{
m: sync.RWMutex{},
}

for _, opt := range opts {
opt.apply(&cl)
opt.apply(cl)
}

return cl
Expand Down Expand Up @@ -432,16 +435,14 @@ func (cc *DefaultConfigCacher) Actualize(ctx context.Context, path string, param
return nil, fmt.Errorf("can't load cluster info: %w", err)
}

for i := 0; i < clusterConfig.Len(); i++ {
shard := clusterConfig.Shard(i)

for i := 0; i < clusterConfig.Shards(); i++ {
var instances []ShardInstance

eg := &errgroup.Group{}

instancesCh := make(chan ShardInstance)

for _, si := range append(shard.Masters, shard.Replicas...) {
for _, si := range clusterConfig.ShardInstances(i) {
si := si
eg.Go(func() error {
opts, connErr := params.optionChecker(ctx, si)
Expand Down
4 changes: 2 additions & 2 deletions pkg/activerecord/connection_w_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func Test_connectionPool_Add(t *testing.T) {
{
name: "first connection",
args: args{
shard: clusterInfo.Shard(0).NextMaster(),
shard: clusterInfo.NextMaster(0),
connector: connectorFunc,
},
wantErr: false,
Expand All @@ -71,7 +71,7 @@ func Test_connectionPool_Add(t *testing.T) {
{
name: "again first connection",
args: args{
shard: clusterInfo.Shard(0).NextMaster(),
shard: clusterInfo.NextMaster(0),
connector: connectorFunc,
},
wantErr: true,
Expand Down
11 changes: 5 additions & 6 deletions pkg/activerecord/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ func (p *Pinger) StartWatch(ctx context.Context) {
p.logger.Info(ctx, "starting ping")

for cfgPath, params := range p.clusterParams {
clusterConf, err := p.clusterConfig().Actualize(ctx, cfgPath, params)
if err != nil {
p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, err))
clusterConf, e := p.clusterConfig().Actualize(ctx, cfgPath, params)
if e != nil {
p.logger.Error(p.ctx, fmt.Errorf("can't actualize '%s' configuration: %w", cfgPath, e))
}

p.collectInfo(ctx, cfgPath, clusterConf)
Expand Down Expand Up @@ -186,9 +186,8 @@ func (p *Pinger) collectInfo(ctx context.Context, path string, clusterConf *Clus

shardInstances := make([]ShardInstance, 0, len(p.instances[path]))

for i := 0; i < clusterConf.Len(); i++ {
shard := clusterConf.Shard(i)
instances := append(shard.Masters, shard.Replicas...)
for i := 0; i < clusterConf.Shards(); i++ {
instances := clusterConf.ShardInstances(i)
for _, instance := range instances {
if !instance.Offline {
continue
Expand Down
8 changes: 4 additions & 4 deletions pkg/octopus/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,21 +46,21 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType
return nil, fmt.Errorf("can't get cluster %s info: %w", configPath, err)
}

if clusterInfo.Len() < shard {
return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Len())
if clusterInfo.Shards() < shard {
return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Shards())
}

var configBox activerecord.ShardInstance

switch instType {
case activerecord.ReplicaInstanceType:
if len(clusterInfo.OnlineReplicas(shard)) == 0 {
if !clusterInfo.HasReplicas(shard) {
return nil, fmt.Errorf("replicas not set")
}

configBox = clusterInfo.NextReplica(shard)
case activerecord.ReplicaOrMasterInstanceType:
if len(clusterInfo.OnlineReplicas(shard)) != 0 {
if clusterInfo.HasReplicas(shard) {
configBox = clusterInfo.NextReplica(shard)
break
}
Expand Down

0 comments on commit 6934fae

Please sign in to comment.