Skip to content

Commit

Permalink
fix connection leak (#31)
Browse files Browse the repository at this point in the history
* fix connection leak and race
  • Loading branch information
ebirukov authored Feb 2, 2024
1 parent f621efb commit 2df3eea
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 107 deletions.
54 changes: 16 additions & 38 deletions pkg/activerecord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"hash"
"hash/crc32"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -138,18 +137,8 @@ func (s *Shard) NextReplica() ShardInstance {
func (c *Shard) Instances() []ShardInstance {
instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas))
instances = append(instances, c.Masters...)
// сортировка в подсписках чтобы не зависет от порядка в котором инстансы добавлялись в конфигурацию
sort.Slice(instances, func(i, j int) bool {
return instances[i].ParamsID < instances[j].ParamsID
})

instances = append(instances, c.Replicas...)

replicas := instances[len(c.Masters):]
sort.Slice(replicas, func(i, j int) bool {
return replicas[i].ParamsID < replicas[j].ParamsID
})

return instances
}

Expand Down Expand Up @@ -421,41 +410,30 @@ func NewConfigCacher() *DefaultConfigCacher {
// Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб
// если в кеше нет, то достаём из конфига и кешируем.
func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
curConf := cc.container[path]
if cc.lock.TryLock() {
if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 {
// Очищаем кеш если поменялся конфиг
cc.container = make(map[string]*Cluster)
cc.updateTime = time.Now()
}
cc.lock.Unlock()
}

cc.lock.RLock()
conf, ex := cc.container[path]
confUpdateTime := cc.updateTime
cc.lock.RUnlock()

if !ex {
return cc.loadClusterInfo(ctx, curConf, path, globs, optionCreator)
}

return conf, nil
}
// Если конфигурация не найдена в кеше или конфигурация была обновлена, то перегружаем конфигурацию
if !ex || confUpdateTime.Sub(Config().GetLastUpdateTime()) < 0 {
cc.lock.Lock()
newConf, err := GetClusterInfoFromCfg(ctx, path, globs, optionCreator)
if err != nil {
cc.lock.Unlock()

func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, curConf *Cluster, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) {
cc.lock.Lock()
defer cc.lock.Unlock()
return nil, fmt.Errorf("can't get config: %w", err)
}

conf, err := GetClusterInfoFromCfg(ctx, path, globs, optionCreator)
if err != nil {
return nil, fmt.Errorf("can't get config: %w", err)
}
// если конфигурация поменялась, то обновляем её в кеше
if !newConf.Equal(conf) {
conf = newConf
cc.container[path] = conf
cc.updateTime = time.Now()
}

if conf.Equal(curConf) {
conf = curConf
cc.lock.Unlock()
}

cc.container[path] = conf

return conf, nil
}
56 changes: 0 additions & 56 deletions pkg/activerecord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,59 +184,3 @@ func TestGetClusterInfoFromCfg(t *testing.T) {
})
}
}

func TestShard_Instances(t *testing.T) {
tests := []struct {
name string
shard Shard
want []ShardInstance
}{
{
name: "unordered sequence",
shard: Shard{
Masters: []ShardInstance{
{
ParamsID: "Master2",
},
{
ParamsID: "Master1",
},
},
Replicas: []ShardInstance{
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
{
ParamsID: "Replica1",
},
},
},
want: []ShardInstance{
{
ParamsID: "Master1",
},
{
ParamsID: "Master2",
},
{
ParamsID: "Replica1",
},
{
ParamsID: "Replica2",
},
{
ParamsID: "Replica3",
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := tt.shard.Instances()
assert.Check(t, cmp.DeepEqual(tt.want, got))
})
}
}
26 changes: 13 additions & 13 deletions pkg/octopus/box.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,16 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType
}

func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance) (activerecord.OptionInterface, error) {
octopusOpt, ok := instance.Options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", instance.Options)
}

var err error
c := activerecord.ConnectionCacher().Get(instance)
if c == nil {
c, err = GetConnection(ctx, octopusOpt)
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
c, err := activerecord.ConnectionCacher().GetOrAdd(instance, func(options interface{}) (activerecord.ConnectionInterface, error) {
octopusOpt, ok := options.(*ConnectionOptions)
if !ok {
return nil, fmt.Errorf("invalit type of options %T, want Options", options)
}

return GetConnection(ctx, octopusOpt)
})
if err != nil {
return nil, fmt.Errorf("error from connectionCacher: %w", err)
}

conn, ok := c.(*Connection)
Expand All @@ -123,10 +121,12 @@ func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance
ret := td[0]
switch string(ret.Data[0]) {
case "primary":
return NewOptions(octopusOpt.server, ModeMaster)
instance.Config.Mode = activerecord.ServerModeType(ModeMaster)
default:
return NewOptions(octopusOpt.server, ModeReplica)
instance.Config.Mode = activerecord.ServerModeType(ModeReplica)
}

return DefaultOptionCreator(instance.Config)
}

return nil, fmt.Errorf("can't parse status: %w", err)
Expand Down

0 comments on commit 2df3eea

Please sign in to comment.