diff --git a/go.mod b/go.mod index 7a7d820..14188f3 100644 --- a/go.mod +++ b/go.mod @@ -9,10 +9,12 @@ require ( github.com/stretchr/testify v1.8.4 golang.org/x/mod v0.7.0 golang.org/x/net v0.7.0 + golang.org/x/sync v0.1.0 golang.org/x/sys v0.5.0 golang.org/x/text v0.7.0 golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 golang.org/x/tools v0.5.0 + gopkg.in/yaml.v3 v3.0.1 gotest.tools v2.2.0+incompatible ) @@ -21,5 +23,4 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.5.0 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 72e735a..4d072af 100644 --- a/go.sum +++ b/go.sum @@ -23,6 +23,8 @@ golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA= golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= diff --git a/internal/pkg/generator/tmpl/octopus/main.tmpl b/internal/pkg/generator/tmpl/octopus/main.tmpl index 17f0135..5d454cb 100644 --- a/internal/pkg/generator/tmpl/octopus/main.tmpl +++ b/internal/pkg/generator/tmpl/octopus/main.tmpl @@ -824,8 +824,8 @@ func selectBox (ctx context.Context, indexnum uint32, keysPacked [][][]byte, lim logger.Warn(ctx, "Select limit reached. Result may less than db records.") } - mode, ok := connection.InstanceMode().(octopus.ServerModeType) - if !ok || activerecord.ServerModeType(mode) == activerecord.ModeReplica { + mode, ok := connection.InstanceMode().(activerecord.ServerModeType) + if !ok || mode == activerecord.ModeReplica { if !ok { logger.Error(ctx, "Invalid server mode type: %T", connection.InstanceMode()) } diff --git a/pkg/activerecord/activerecord.go b/pkg/activerecord/activerecord.go index bbfdf6b..6e47d74 100644 --- a/pkg/activerecord/activerecord.go +++ b/pkg/activerecord/activerecord.go @@ -55,6 +55,7 @@ func (l Limiter) String() string { return fmt.Sprintf("Limit: %d, Offset: %d, Is Threshold: %t", l.limit, l.offset, l.fullfillWarn) } +//go:generate mockery --name ConfigInterface --filename mock_config.go --structname MockConfig --with-expecter=true --inpackage type ConfigInterface interface { GetBool(ctx context.Context, confPath string, dfl ...bool) bool GetBoolIfExists(ctx context.Context, confPath string) (value bool, ok bool) @@ -90,8 +91,12 @@ type ConnectionCacherInterface interface { CloseConnection(context.Context) } +type ClusterCheckerInterface interface { + AddClusterChecker(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error) +} + type ConfigCacherInterface interface { - Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (Cluster, error) + Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) } type SerializerInterface interface { @@ -125,6 +130,7 @@ type ActiveRecord struct { metric MetricInterface connectionCacher ConnectionCacherInterface configCacher ConfigCacherInterface + pinger ClusterCheckerInterface } var instance *ActiveRecord @@ -157,7 +163,7 @@ func InitActiveRecord(opts ...Option) { config: NewDefaultConfig(), metric: NewDefaultNoopMetric(), connectionCacher: newConnectionPool(), - configCacher: newConfigCacher(), + configCacher: NewConfigCacher(), } for _, opt := range opts { @@ -192,3 +198,12 @@ func ConnectionCacher() ConnectionCacherInterface { func ConfigCacher() ConfigCacherInterface { return GetInstance().configCacher } + +// AddClusterChecker регистрирует конфигурацию кластера в локальном пингере +func AddClusterChecker(ctx context.Context, configPath string, params ClusterConfigParameters) (*Cluster, error) { + if GetInstance().pinger == nil { + return nil, fmt.Errorf("connection pinger is not configured. Configure it with function InitActiveRecord and WithConnectionPinger option ") + } + + return GetInstance().pinger.AddClusterChecker(ctx, configPath, params) +} diff --git a/pkg/activerecord/cluster.go b/pkg/activerecord/cluster.go index 3ab875c..0671b4a 100644 --- a/pkg/activerecord/cluster.go +++ b/pkg/activerecord/cluster.go @@ -3,6 +3,7 @@ package activerecord import ( "context" "fmt" + "math/rand" "strconv" "strings" "sync" @@ -13,7 +14,7 @@ import ( // Интерфейс которому должен соответствовать билдер опций подключения к конретному инстансу type OptionInterface interface { GetConnectionID() string - InstanceMode() any + InstanceMode() ServerModeType } // Тип и константы для выбора инстанса в шарде @@ -46,6 +47,7 @@ type ShardInstance struct { ParamsID string Config ShardInstanceConfig Options interface{} + Offline bool } // Структура описывающая конкретный шард. Каждый шард может состоять из набора мастеров и реплик @@ -56,45 +58,166 @@ type Shard struct { curReplica int32 } -// Функция выбирающая следующий инстанс мастера в конкретном шарде +// Функция выбирающая следующий доступный инстанс мастера в конкретном шарде func (s *Shard) NextMaster() ShardInstance { length := len(s.Masters) switch length { case 0: panic("no master configured") case 1: - return s.Masters[0] + master := s.Masters[0] + if master.Offline { + panic("no available master") + } + + return master } - newVal := atomic.AddInt32(&s.curMaster, 1) - newValMod := newVal % int32(len(s.Masters)) + // Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел + attempt := 10 * length + + for i := 0; i < attempt; i++ { + newVal := atomic.AddInt32(&s.curMaster, 1) + newValMod := newVal % int32(len(s.Masters)) + + if newValMod != newVal { + atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod) + } + + master := s.Masters[newValMod] + if master.Offline { + continue + } - if newValMod != newVal { - atomic.CompareAndSwapInt32(&s.curMaster, newVal, newValMod) + return master } - return s.Masters[newValMod] + //nolint:gosec + // Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов можно залипнуть на доступном узле + // Чтобы не паниковать выбираем рандомный узел + return s.Masters[rand.Int()%length] } -// Инстанс выбирающий конкретный инстанс реплики в конкретном шарде +// Инстанс выбирающий следующий доступный инстанс реплики в конкретном шарде func (s *Shard) NextReplica() ShardInstance { length := len(s.Replicas) - if length == 1 { + if length == 1 && !s.Replicas[0].Offline { return s.Replicas[0] } - newVal := atomic.AddInt32(&s.curReplica, 1) - newValMod := newVal % int32(len(s.Replicas)) + // Из-за гонок при большом кол-ве недоступных инстансов может потребоватся много попыток найти доступный узел + attempt := 10 * length + + for i := 0; i < attempt; i++ { + newVal := atomic.AddInt32(&s.curReplica, 1) + newValMod := newVal % int32(len(s.Replicas)) - if newValMod != newVal { - atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod) + if newValMod != newVal { + atomic.CompareAndSwapInt32(&s.curReplica, newVal, newValMod) + } + + replica := s.Replicas[newValMod] + if replica.Offline { + continue + } + + return replica } - return s.Replicas[newValMod] + //nolint:gosec + // Есть небольшая вероятность при большой нагрузке и большом проценте недоступных инстансов поиск может залипнуть на недоступном узле + // Чтобы не паниковать выбираем рандомный узел + return s.Replicas[rand.Int()%length] +} + +// Instances копия всех инстансов шарды +func (c *Shard) Instances() []ShardInstance { + instances := make([]ShardInstance, 0, len(c.Masters)+len(c.Replicas)) + instances = append(instances, c.Masters...) + instances = append(instances, c.Replicas...) + + return instances } // Тип описывающий кластер. Сейчас кластер - это набор шардов. -type Cluster []Shard +type Cluster struct { + m sync.RWMutex + shards []Shard +} + +func NewCluster(shardCnt int) *Cluster { + return &Cluster{ + m: sync.RWMutex{}, + shards: make([]Shard, 0, shardCnt), + } +} + +// NextMaster выбирает следующий доступный инстанс мастера в шарде shardNum +func (c *Cluster) NextMaster(shardNum int) ShardInstance { + c.m.RLock() + defer c.m.RUnlock() + + return c.shards[shardNum].NextMaster() +} + +// NextMaster выбирает следующий доступный инстанс реплики в шарде shardNum +func (c *Cluster) NextReplica(shardNum int) (ShardInstance, bool) { + c.m.RLock() + defer c.m.RUnlock() + + for _, replica := range c.shards[shardNum].Replicas { + if replica.Offline { + continue + } + + return c.shards[shardNum].NextReplica(), true + } + + return ShardInstance{}, false + +} + +// Append добавляет новый шард в кластер +func (c *Cluster) Append(shard Shard) { + c.m.Lock() + defer c.m.Unlock() + + c.shards = append(c.shards, shard) +} + +// ShardInstances копия всех инстансов из шарды shardNum +func (c *Cluster) ShardInstances(shardNum int) []ShardInstance { + c.m.Lock() + defer c.m.Unlock() + + return c.shards[shardNum].Instances() +} + +// Shards кол-во доступных шард кластера +func (c *Cluster) Shards() int { + return len(c.shards) +} + +// SetShardInstances заменяет инстансы кластера в шарде shardNum на инстансы из instances +func (c *Cluster) SetShardInstances(shardNum int, instances []ShardInstance) { + c.m.Lock() + defer c.m.Unlock() + + shard := c.shards[shardNum] + shard.Masters = shard.Masters[:0] + shard.Replicas = shard.Replicas[:0] + for _, shardInstance := range instances { + switch shardInstance.Config.Mode { + case ModeMaster: + shard.Masters = append(shard.Masters, shardInstance) + case ModeReplica: + shard.Replicas = append(shard.Replicas, shardInstance) + } + } + + c.shards[shardNum] = shard + +} // Тип используемый для передачи набора значений по умолчанию для параметров type MapGlobParam struct { @@ -106,11 +229,13 @@ type MapGlobParam struct { // сколько шардов, столько и опций. Используется в случаях, когда информация по кластеру прописана // непосредственно в декларации модели, а не в конфиге. // Так же используется при тестировании. -func NewClusterInfo(opts ...clusterOption) Cluster { - cl := Cluster{} +func NewClusterInfo(opts ...clusterOption) *Cluster { + cl := &Cluster{ + m: sync.RWMutex{}, + } for _, opt := range opts { - opt.apply(&cl) + opt.apply(cl) } return cl @@ -119,7 +244,7 @@ func NewClusterInfo(opts ...clusterOption) Cluster { // Констркуктор позволяющий проинициализировать кластер их конфигурации. // На вход передаётся путь в конфиге, значения по умолчанию, и ссылка на функцию, которая // создаёт структуру опций и считает контрольную сумму, для того, что бы следить за их изменением в онлайне. -func GetClusterInfoFromCfg(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (Cluster, error) { +func GetClusterInfoFromCfg(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { cfg := Config() shardCnt, exMaxShardOK := cfg.GetIntIfExists(ctx, path+"/max-shard") @@ -127,7 +252,7 @@ func GetClusterInfoFromCfg(ctx context.Context, path string, globs MapGlobParam, shardCnt = 1 } - cluster := make(Cluster, shardCnt) + cluster := NewCluster(shardCnt) globalTimeout, exGlobalTimeout := cfg.GetDurationIfExists(ctx, path+"/Timeout") if exGlobalTimeout { @@ -141,22 +266,24 @@ func GetClusterInfoFromCfg(ctx context.Context, path string, globs MapGlobParam, globs.PoolSize = globalPoolSize - var err error - if exMaxShardOK { // Если используется много шардов for f := 0; f < shardCnt; f++ { - cluster[f], err = getShardInfoFromCfg(ctx, path+"/"+strconv.Itoa(f), globs, optionCreator) + shard, err := getShardInfoFromCfg(ctx, path+"/"+strconv.Itoa(f), globs, optionCreator) if err != nil { return nil, fmt.Errorf("can't get shard %d info: %w", f, err) } + + cluster.Append(shard) } } else { // Когда только один шард - cluster[0], err = getShardInfoFromCfg(ctx, path, globs, optionCreator) + shard, err := getShardInfoFromCfg(ctx, path, globs, optionCreator) if err != nil { return nil, fmt.Errorf("can't get shard info: %w", err) } + + cluster.Append(shard) } return cluster, nil @@ -244,44 +371,53 @@ func getShardInfoFromCfg(ctx context.Context, path string, globParam MapGlobPara // Используется для шаринга конфигов между можелями если они используют одну и ту же // конфигурацию для подключений type DefaultConfigCacher struct { - lock sync.Mutex - container map[string]Cluster + lock sync.RWMutex + container map[string]*Cluster updateTime time.Time } // Конструктор для создания нового кешера конфигов -func newConfigCacher() *DefaultConfigCacher { +func NewConfigCacher() *DefaultConfigCacher { return &DefaultConfigCacher{ - lock: sync.Mutex{}, - container: make(map[string]Cluster), + lock: sync.RWMutex{}, + container: make(map[string]*Cluster), updateTime: time.Now(), } } // Получение конфигурации. Если есть в кеше и он еще валидный, то конфигурация берётся из кешаб // если в кеше нет, то достаём из конфига и кешируем. -func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (Cluster, error) { - cc.lock.Lock() - defer cc.lock.Unlock() - - if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 { - // Очищаем кеш если поменялся конфиг - cc.container = make(map[string]Cluster) - cc.updateTime = time.Now() +func (cc *DefaultConfigCacher) Get(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { + if cc.lock.TryLock() { + if cc.updateTime.Sub(Config().GetLastUpdateTime()) < 0 { + // Очищаем кеш если поменялся конфиг + cc.container = make(map[string]*Cluster) + cc.updateTime = time.Now() + } + cc.lock.Unlock() } + cc.lock.RLock() conf, ex := cc.container[path] + cc.lock.RUnlock() if !ex { - var err error + return cc.loadClusterInfo(ctx, path, globs, optionCreator) + } - conf, err = GetClusterInfoFromCfg(ctx, path, globs, optionCreator) - if err != nil { - return Cluster{}, fmt.Errorf("can't get config: %w", err) - } + return conf, nil +} - cc.container[path] = conf +func (cc *DefaultConfigCacher) loadClusterInfo(ctx context.Context, path string, globs MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error) { + cc.lock.Lock() + defer cc.lock.Unlock() + + conf, err := GetClusterInfoFromCfg(ctx, path, globs, optionCreator) + if err != nil { + return nil, fmt.Errorf("can't get config: %w", err) } + cc.container[path] = conf + return conf, nil } diff --git a/pkg/activerecord/cluster_test.go b/pkg/activerecord/cluster_test.go new file mode 100644 index 0000000..67e8082 --- /dev/null +++ b/pkg/activerecord/cluster_test.go @@ -0,0 +1,186 @@ +package activerecord + +import ( + "context" + "testing" + + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/mock" + "gotest.tools/assert" + "gotest.tools/assert/cmp" +) + +func TestGetClusterInfoFromCfg(t *testing.T) { + ctx := context.Background() + + type args struct { + ctx context.Context + path string + globs MapGlobParam + optionCreator func(ShardInstanceConfig) (OptionInterface, error) + } + tests := []struct { + name string + args args + mocks func(*testing.T, *MockConfig) + want *Cluster + wantErr bool + }{ + { + name: "cluster hosts from root path (no master or replica keys)", + mocks: func(t *testing.T, mockConfig *MockConfig) { + mockConfig.EXPECT().GetIntIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetDuration(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetInt(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetDurationIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/master").Return("", false) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig").Return("host1,host2", true) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/replica").Return("", false) + }, + args: args{ + ctx: ctx, + path: "testconfig", + globs: MapGlobParam{}, + optionCreator: func(c ShardInstanceConfig) (OptionInterface, error) { + return &TestOptions{hash: c.Addr}, nil + }, + }, + want: &Cluster{ + shards: []Shard{ + { + Masters: []ShardInstance{ + { + ParamsID: "host1", + Config: ShardInstanceConfig{ + Mode: ModeMaster, + Addr: "host1", + }, + Options: &TestOptions{hash: "host1"}, + }, + { + ParamsID: "host2", + Config: ShardInstanceConfig{ + Mode: ModeMaster, + Addr: "host2", + }, + Options: &TestOptions{hash: "host2"}, + }, + }, + Replicas: []ShardInstance{}, + }, + }, + }, + }, + { + name: "cluster hosts from master and replica keys path", + mocks: func(t *testing.T, mockConfig *MockConfig) { + mockConfig.EXPECT().GetIntIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetDuration(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetInt(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetDurationIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/master").Return("host2", true) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/replica").Return("host1", true) + }, + args: args{ + ctx: ctx, + path: "testconfig", + globs: MapGlobParam{}, + optionCreator: func(c ShardInstanceConfig) (OptionInterface, error) { + return &TestOptions{hash: c.Addr}, nil + }, + }, + want: &Cluster{ + shards: []Shard{ + { + Masters: []ShardInstance{ + { + ParamsID: "host2", + Config: ShardInstanceConfig{ + Mode: ModeMaster, + Addr: "host2", + }, + Options: &TestOptions{hash: "host2"}, + }, + }, + Replicas: []ShardInstance{ + { + ParamsID: "host1", + Config: ShardInstanceConfig{ + Mode: ModeReplica, + Addr: "host1", + }, + Options: &TestOptions{hash: "host1", mode: ModeReplica}, + }, + }, + }, + }, + }, + }, + { + name: "cluster hosts from root path and replica keys path", + mocks: func(t *testing.T, mockConfig *MockConfig) { + mockConfig.EXPECT().GetIntIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetDuration(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetInt(mock.Anything, mock.Anything, mock.Anything).Return(0) + mockConfig.EXPECT().GetDurationIfExists(mock.Anything, mock.Anything).Return(0, false) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/master").Return("", false) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig").Return("host1", true) + mockConfig.EXPECT().GetStringIfExists(mock.Anything, "testconfig/replica").Return("host2", true) + }, + args: args{ + ctx: ctx, + path: "testconfig", + globs: MapGlobParam{}, + optionCreator: func(c ShardInstanceConfig) (OptionInterface, error) { + return &TestOptions{hash: c.Addr}, nil + }, + }, + want: &Cluster{ + shards: []Shard{ + { + Masters: []ShardInstance{ + { + ParamsID: "host1", + Config: ShardInstanceConfig{ + Mode: ModeMaster, + Addr: "host1", + }, + Options: &TestOptions{hash: "host1"}, + }, + }, + Replicas: []ShardInstance{ + { + ParamsID: "host2", + Config: ShardInstanceConfig{ + Mode: ModeReplica, + Addr: "host2", + }, + Options: &TestOptions{hash: "host2", mode: ModeReplica}, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockConfig := NewMockConfig(t) + + ReinitActiveRecord( + WithConfig(mockConfig), + ) + + if tt.mocks != nil { + tt.mocks(t, mockConfig) + } + + got, err := GetClusterInfoFromCfg(tt.args.ctx, tt.args.path, tt.args.globs, tt.args.optionCreator) + if (err != nil) != tt.wantErr { + t.Errorf("GetClusterInfoFromCfg() error = %v, wantErr %v", err, tt.wantErr) + return + } + assert.Check(t, cmp.DeepEqual(got.ShardInstances(0), tt.want.ShardInstances(0), cmpopts.IgnoreUnexported(Shard{}, TestOptions{})), "GetClusterInfoFromCfg() got = %v, want %v", got, tt.want) + }) + } +} diff --git a/pkg/activerecord/connection_w_test.go b/pkg/activerecord/connection_w_test.go index b8d79af..4d6b0ae 100644 --- a/pkg/activerecord/connection_w_test.go +++ b/pkg/activerecord/connection_w_test.go @@ -8,10 +8,11 @@ import ( type TestOptions struct { hash string + mode ServerModeType } -func (to *TestOptions) InstanceMode() any { - return ModeMaster +func (to *TestOptions) InstanceMode() ServerModeType { + return to.mode } func (to *TestOptions) GetConnectionID() string { @@ -61,7 +62,7 @@ func Test_connectionPool_Add(t *testing.T) { { name: "first connection", args: args{ - shard: clusterInfo[0].NextMaster(), + shard: clusterInfo.NextMaster(0), connector: connectorFunc, }, wantErr: false, @@ -71,7 +72,7 @@ func Test_connectionPool_Add(t *testing.T) { { name: "again first connection", args: args{ - shard: clusterInfo[0].NextMaster(), + shard: clusterInfo.NextMaster(0), connector: connectorFunc, }, wantErr: true, diff --git a/pkg/activerecord/mock_config.go b/pkg/activerecord/mock_config.go new file mode 100644 index 0000000..771e744 --- /dev/null +++ b/pkg/activerecord/mock_config.go @@ -0,0 +1,623 @@ +// Code generated by mockery v2.20.0. DO NOT EDIT. + +package activerecord + +import ( + context "context" + time "time" + + mock "github.com/stretchr/testify/mock" +) + +// MockConfig is an autogenerated mock type for the ConfigInterface type +type MockConfig struct { + mock.Mock +} + +type MockConfig_Expecter struct { + mock *mock.Mock +} + +func (_m *MockConfig) EXPECT() *MockConfig_Expecter { + return &MockConfig_Expecter{mock: &_m.Mock} +} + +// GetBool provides a mock function with given fields: ctx, confPath, dfl +func (_m *MockConfig) GetBool(ctx context.Context, confPath string, dfl ...bool) bool { + _va := make([]interface{}, len(dfl)) + for _i := range dfl { + _va[_i] = dfl[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, confPath) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 bool + if rf, ok := ret.Get(0).(func(context.Context, string, ...bool) bool); ok { + r0 = rf(ctx, confPath, dfl...) + } else { + r0 = ret.Get(0).(bool) + } + + return r0 +} + +// MockConfig_GetBool_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBool' +type MockConfig_GetBool_Call struct { + *mock.Call +} + +// GetBool is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - dfl ...bool +func (_e *MockConfig_Expecter) GetBool(ctx interface{}, confPath interface{}, dfl ...interface{}) *MockConfig_GetBool_Call { + return &MockConfig_GetBool_Call{Call: _e.mock.On("GetBool", + append([]interface{}{ctx, confPath}, dfl...)...)} +} + +func (_c *MockConfig_GetBool_Call) Run(run func(ctx context.Context, confPath string, dfl ...bool)) *MockConfig_GetBool_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]bool, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(bool) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *MockConfig_GetBool_Call) Return(_a0 bool) *MockConfig_GetBool_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetBool_Call) RunAndReturn(run func(context.Context, string, ...bool) bool) *MockConfig_GetBool_Call { + _c.Call.Return(run) + return _c +} + +// GetBoolIfExists provides a mock function with given fields: ctx, confPath +func (_m *MockConfig) GetBoolIfExists(ctx context.Context, confPath string) (bool, bool) { + ret := _m.Called(ctx, confPath) + + var r0 bool + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, string) (bool, bool)); ok { + return rf(ctx, confPath) + } + if rf, ok := ret.Get(0).(func(context.Context, string) bool); ok { + r0 = rf(ctx, confPath) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { + r1 = rf(ctx, confPath) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockConfig_GetBoolIfExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetBoolIfExists' +type MockConfig_GetBoolIfExists_Call struct { + *mock.Call +} + +// GetBoolIfExists is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +func (_e *MockConfig_Expecter) GetBoolIfExists(ctx interface{}, confPath interface{}) *MockConfig_GetBoolIfExists_Call { + return &MockConfig_GetBoolIfExists_Call{Call: _e.mock.On("GetBoolIfExists", ctx, confPath)} +} + +func (_c *MockConfig_GetBoolIfExists_Call) Run(run func(ctx context.Context, confPath string)) *MockConfig_GetBoolIfExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockConfig_GetBoolIfExists_Call) Return(value bool, ok bool) *MockConfig_GetBoolIfExists_Call { + _c.Call.Return(value, ok) + return _c +} + +func (_c *MockConfig_GetBoolIfExists_Call) RunAndReturn(run func(context.Context, string) (bool, bool)) *MockConfig_GetBoolIfExists_Call { + _c.Call.Return(run) + return _c +} + +// GetDuration provides a mock function with given fields: ctx, confPath, dfl +func (_m *MockConfig) GetDuration(ctx context.Context, confPath string, dfl ...time.Duration) time.Duration { + _va := make([]interface{}, len(dfl)) + for _i := range dfl { + _va[_i] = dfl[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, confPath) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 time.Duration + if rf, ok := ret.Get(0).(func(context.Context, string, ...time.Duration) time.Duration); ok { + r0 = rf(ctx, confPath, dfl...) + } else { + r0 = ret.Get(0).(time.Duration) + } + + return r0 +} + +// MockConfig_GetDuration_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDuration' +type MockConfig_GetDuration_Call struct { + *mock.Call +} + +// GetDuration is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - dfl ...time.Duration +func (_e *MockConfig_Expecter) GetDuration(ctx interface{}, confPath interface{}, dfl ...interface{}) *MockConfig_GetDuration_Call { + return &MockConfig_GetDuration_Call{Call: _e.mock.On("GetDuration", + append([]interface{}{ctx, confPath}, dfl...)...)} +} + +func (_c *MockConfig_GetDuration_Call) Run(run func(ctx context.Context, confPath string, dfl ...time.Duration)) *MockConfig_GetDuration_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]time.Duration, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(time.Duration) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *MockConfig_GetDuration_Call) Return(_a0 time.Duration) *MockConfig_GetDuration_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetDuration_Call) RunAndReturn(run func(context.Context, string, ...time.Duration) time.Duration) *MockConfig_GetDuration_Call { + _c.Call.Return(run) + return _c +} + +// GetDurationIfExists provides a mock function with given fields: ctx, confPath +func (_m *MockConfig) GetDurationIfExists(ctx context.Context, confPath string) (time.Duration, bool) { + ret := _m.Called(ctx, confPath) + + var r0 time.Duration + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, string) (time.Duration, bool)); ok { + return rf(ctx, confPath) + } + if rf, ok := ret.Get(0).(func(context.Context, string) time.Duration); ok { + r0 = rf(ctx, confPath) + } else { + r0 = ret.Get(0).(time.Duration) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { + r1 = rf(ctx, confPath) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockConfig_GetDurationIfExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetDurationIfExists' +type MockConfig_GetDurationIfExists_Call struct { + *mock.Call +} + +// GetDurationIfExists is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +func (_e *MockConfig_Expecter) GetDurationIfExists(ctx interface{}, confPath interface{}) *MockConfig_GetDurationIfExists_Call { + return &MockConfig_GetDurationIfExists_Call{Call: _e.mock.On("GetDurationIfExists", ctx, confPath)} +} + +func (_c *MockConfig_GetDurationIfExists_Call) Run(run func(ctx context.Context, confPath string)) *MockConfig_GetDurationIfExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockConfig_GetDurationIfExists_Call) Return(_a0 time.Duration, _a1 bool) *MockConfig_GetDurationIfExists_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockConfig_GetDurationIfExists_Call) RunAndReturn(run func(context.Context, string) (time.Duration, bool)) *MockConfig_GetDurationIfExists_Call { + _c.Call.Return(run) + return _c +} + +// GetInt provides a mock function with given fields: ctx, confPath, dfl +func (_m *MockConfig) GetInt(ctx context.Context, confPath string, dfl ...int) int { + _va := make([]interface{}, len(dfl)) + for _i := range dfl { + _va[_i] = dfl[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, confPath) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 int + if rf, ok := ret.Get(0).(func(context.Context, string, ...int) int); ok { + r0 = rf(ctx, confPath, dfl...) + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockConfig_GetInt_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetInt' +type MockConfig_GetInt_Call struct { + *mock.Call +} + +// GetInt is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - dfl ...int +func (_e *MockConfig_Expecter) GetInt(ctx interface{}, confPath interface{}, dfl ...interface{}) *MockConfig_GetInt_Call { + return &MockConfig_GetInt_Call{Call: _e.mock.On("GetInt", + append([]interface{}{ctx, confPath}, dfl...)...)} +} + +func (_c *MockConfig_GetInt_Call) Run(run func(ctx context.Context, confPath string, dfl ...int)) *MockConfig_GetInt_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(int) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *MockConfig_GetInt_Call) Return(_a0 int) *MockConfig_GetInt_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetInt_Call) RunAndReturn(run func(context.Context, string, ...int) int) *MockConfig_GetInt_Call { + _c.Call.Return(run) + return _c +} + +// GetIntIfExists provides a mock function with given fields: ctx, confPath +func (_m *MockConfig) GetIntIfExists(ctx context.Context, confPath string) (int, bool) { + ret := _m.Called(ctx, confPath) + + var r0 int + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, string) (int, bool)); ok { + return rf(ctx, confPath) + } + if rf, ok := ret.Get(0).(func(context.Context, string) int); ok { + r0 = rf(ctx, confPath) + } else { + r0 = ret.Get(0).(int) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { + r1 = rf(ctx, confPath) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockConfig_GetIntIfExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetIntIfExists' +type MockConfig_GetIntIfExists_Call struct { + *mock.Call +} + +// GetIntIfExists is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +func (_e *MockConfig_Expecter) GetIntIfExists(ctx interface{}, confPath interface{}) *MockConfig_GetIntIfExists_Call { + return &MockConfig_GetIntIfExists_Call{Call: _e.mock.On("GetIntIfExists", ctx, confPath)} +} + +func (_c *MockConfig_GetIntIfExists_Call) Run(run func(ctx context.Context, confPath string)) *MockConfig_GetIntIfExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockConfig_GetIntIfExists_Call) Return(_a0 int, _a1 bool) *MockConfig_GetIntIfExists_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockConfig_GetIntIfExists_Call) RunAndReturn(run func(context.Context, string) (int, bool)) *MockConfig_GetIntIfExists_Call { + _c.Call.Return(run) + return _c +} + +// GetLastUpdateTime provides a mock function with given fields: +func (_m *MockConfig) GetLastUpdateTime() time.Time { + ret := _m.Called() + + var r0 time.Time + if rf, ok := ret.Get(0).(func() time.Time); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(time.Time) + } + + return r0 +} + +// MockConfig_GetLastUpdateTime_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetLastUpdateTime' +type MockConfig_GetLastUpdateTime_Call struct { + *mock.Call +} + +// GetLastUpdateTime is a helper method to define mock.On call +func (_e *MockConfig_Expecter) GetLastUpdateTime() *MockConfig_GetLastUpdateTime_Call { + return &MockConfig_GetLastUpdateTime_Call{Call: _e.mock.On("GetLastUpdateTime")} +} + +func (_c *MockConfig_GetLastUpdateTime_Call) Run(run func()) *MockConfig_GetLastUpdateTime_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockConfig_GetLastUpdateTime_Call) Return(_a0 time.Time) *MockConfig_GetLastUpdateTime_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetLastUpdateTime_Call) RunAndReturn(run func() time.Time) *MockConfig_GetLastUpdateTime_Call { + _c.Call.Return(run) + return _c +} + +// GetString provides a mock function with given fields: ctx, confPath, dfl +func (_m *MockConfig) GetString(ctx context.Context, confPath string, dfl ...string) string { + _va := make([]interface{}, len(dfl)) + for _i := range dfl { + _va[_i] = dfl[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, confPath) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 string + if rf, ok := ret.Get(0).(func(context.Context, string, ...string) string); ok { + r0 = rf(ctx, confPath, dfl...) + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockConfig_GetString_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetString' +type MockConfig_GetString_Call struct { + *mock.Call +} + +// GetString is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - dfl ...string +func (_e *MockConfig_Expecter) GetString(ctx interface{}, confPath interface{}, dfl ...interface{}) *MockConfig_GetString_Call { + return &MockConfig_GetString_Call{Call: _e.mock.On("GetString", + append([]interface{}{ctx, confPath}, dfl...)...)} +} + +func (_c *MockConfig_GetString_Call) Run(run func(ctx context.Context, confPath string, dfl ...string)) *MockConfig_GetString_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]string, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(string) + } + } + run(args[0].(context.Context), args[1].(string), variadicArgs...) + }) + return _c +} + +func (_c *MockConfig_GetString_Call) Return(_a0 string) *MockConfig_GetString_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetString_Call) RunAndReturn(run func(context.Context, string, ...string) string) *MockConfig_GetString_Call { + _c.Call.Return(run) + return _c +} + +// GetStringIfExists provides a mock function with given fields: ctx, confPath +func (_m *MockConfig) GetStringIfExists(ctx context.Context, confPath string) (string, bool) { + ret := _m.Called(ctx, confPath) + + var r0 string + var r1 bool + if rf, ok := ret.Get(0).(func(context.Context, string) (string, bool)); ok { + return rf(ctx, confPath) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, confPath) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) bool); ok { + r1 = rf(ctx, confPath) + } else { + r1 = ret.Get(1).(bool) + } + + return r0, r1 +} + +// MockConfig_GetStringIfExists_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStringIfExists' +type MockConfig_GetStringIfExists_Call struct { + *mock.Call +} + +// GetStringIfExists is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +func (_e *MockConfig_Expecter) GetStringIfExists(ctx interface{}, confPath interface{}) *MockConfig_GetStringIfExists_Call { + return &MockConfig_GetStringIfExists_Call{Call: _e.mock.On("GetStringIfExists", ctx, confPath)} +} + +func (_c *MockConfig_GetStringIfExists_Call) Run(run func(ctx context.Context, confPath string)) *MockConfig_GetStringIfExists_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *MockConfig_GetStringIfExists_Call) Return(_a0 string, _a1 bool) *MockConfig_GetStringIfExists_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockConfig_GetStringIfExists_Call) RunAndReturn(run func(context.Context, string) (string, bool)) *MockConfig_GetStringIfExists_Call { + _c.Call.Return(run) + return _c +} + +// GetStrings provides a mock function with given fields: ctx, confPath, dfl +func (_m *MockConfig) GetStrings(ctx context.Context, confPath string, dfl []string) []string { + ret := _m.Called(ctx, confPath, dfl) + + var r0 []string + if rf, ok := ret.Get(0).(func(context.Context, string, []string) []string); ok { + r0 = rf(ctx, confPath, dfl) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]string) + } + } + + return r0 +} + +// MockConfig_GetStrings_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStrings' +type MockConfig_GetStrings_Call struct { + *mock.Call +} + +// GetStrings is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - dfl []string +func (_e *MockConfig_Expecter) GetStrings(ctx interface{}, confPath interface{}, dfl interface{}) *MockConfig_GetStrings_Call { + return &MockConfig_GetStrings_Call{Call: _e.mock.On("GetStrings", ctx, confPath, dfl)} +} + +func (_c *MockConfig_GetStrings_Call) Run(run func(ctx context.Context, confPath string, dfl []string)) *MockConfig_GetStrings_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].([]string)) + }) + return _c +} + +func (_c *MockConfig_GetStrings_Call) Return(_a0 []string) *MockConfig_GetStrings_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockConfig_GetStrings_Call) RunAndReturn(run func(context.Context, string, []string) []string) *MockConfig_GetStrings_Call { + _c.Call.Return(run) + return _c +} + +// GetStruct provides a mock function with given fields: ctx, confPath, valuePtr +func (_m *MockConfig) GetStruct(ctx context.Context, confPath string, valuePtr interface{}) (bool, error) { + ret := _m.Called(ctx, confPath, valuePtr) + + var r0 bool + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, interface{}) (bool, error)); ok { + return rf(ctx, confPath, valuePtr) + } + if rf, ok := ret.Get(0).(func(context.Context, string, interface{}) bool); ok { + r0 = rf(ctx, confPath, valuePtr) + } else { + r0 = ret.Get(0).(bool) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, interface{}) error); ok { + r1 = rf(ctx, confPath, valuePtr) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockConfig_GetStruct_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStruct' +type MockConfig_GetStruct_Call struct { + *mock.Call +} + +// GetStruct is a helper method to define mock.On call +// - ctx context.Context +// - confPath string +// - valuePtr interface{} +func (_e *MockConfig_Expecter) GetStruct(ctx interface{}, confPath interface{}, valuePtr interface{}) *MockConfig_GetStruct_Call { + return &MockConfig_GetStruct_Call{Call: _e.mock.On("GetStruct", ctx, confPath, valuePtr)} +} + +func (_c *MockConfig_GetStruct_Call) Run(run func(ctx context.Context, confPath string, valuePtr interface{})) *MockConfig_GetStruct_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(interface{})) + }) + return _c +} + +func (_c *MockConfig_GetStruct_Call) Return(_a0 bool, _a1 error) *MockConfig_GetStruct_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockConfig_GetStruct_Call) RunAndReturn(run func(context.Context, string, interface{}) (bool, error)) *MockConfig_GetStruct_Call { + _c.Call.Return(run) + return _c +} + +type mockConstructorTestingTNewMockConfig interface { + mock.TestingT + Cleanup(func()) +} + +// NewMockConfig creates a new instance of MockConfig. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockConfig(t mockConstructorTestingTNewMockConfig) *MockConfig { + mock := &MockConfig{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/activerecord/option.go b/pkg/activerecord/option.go index d316660..e056eb7 100644 --- a/pkg/activerecord/option.go +++ b/pkg/activerecord/option.go @@ -22,12 +22,24 @@ func WithConfig(config ConfigInterface) Option { }) } +func WithConfigCacher(configCacher ConfigCacherInterface) Option { + return optionFunc(func(a *ActiveRecord) { + a.configCacher = configCacher + }) +} + func WithMetrics(metric MetricInterface) Option { return optionFunc(func(a *ActiveRecord) { a.metric = metric }) } +func WithConnectionPinger(pc ClusterCheckerInterface) Option { + return optionFunc(func(a *ActiveRecord) { + a.pinger = pc + }) +} + type clusterOption interface { apply(*Cluster) } @@ -50,6 +62,6 @@ func WithShard(masters []OptionInterface, replicas []OptionInterface) clusterOpt }) } - *c = append(*c, newShard) + c.Append(newShard) }) } diff --git a/pkg/activerecord/pinger.go b/pkg/activerecord/pinger.go new file mode 100644 index 0000000..fc1f21a --- /dev/null +++ b/pkg/activerecord/pinger.go @@ -0,0 +1,15 @@ +package activerecord + +import ( + "context" +) + +type ClusterConfigParameters struct { + Globs MapGlobParam + OptionCreator func(ShardInstanceConfig) (OptionInterface, error) + OptionChecker func(ctx context.Context, instance ShardInstance) (OptionInterface, error) +} + +func (c ClusterConfigParameters) Validate() bool { + return c.OptionCreator != nil && c.OptionChecker != nil && c.Globs.PoolSize > 0 +} diff --git a/pkg/octopus/box.go b/pkg/octopus/box.go index f585e5a..f81ff09 100644 --- a/pkg/octopus/box.go +++ b/pkg/octopus/box.go @@ -10,58 +10,64 @@ import ( "github.com/mailru/activerecord/pkg/iproto/iproto" ) +var DefaultOptionCreator = func(sic activerecord.ShardInstanceConfig) (activerecord.OptionInterface, error) { + return NewOptions( + sic.Addr, + ServerModeType(sic.Mode), + WithTimeout(sic.Timeout, sic.Timeout), + WithPoolSize(sic.PoolSize), + WithPoolLogger(activerecord.IprotoLogger{}), + ) +} + +var DefaultConnectionParams = activerecord.MapGlobParam{ + Timeout: DefaultConnectionTimeout, + PoolSize: DefaultPoolSize, +} + // Box - возвращает коннектор для БД // TODO // - сделать статистику по используемым инстансам // - прикрутить локальный пингер и исключать недоступные инстансы func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType, configPath string, optionCreator func(activerecord.ShardInstanceConfig) (activerecord.OptionInterface, error)) (*Connection, error) { if optionCreator == nil { - optionCreator = func(sic activerecord.ShardInstanceConfig) (activerecord.OptionInterface, error) { - return NewOptions( - sic.Addr, - ServerModeType(sic.Mode), - WithTimeout(sic.Timeout, sic.Timeout), - WithPoolSize(sic.PoolSize), - WithPoolLogger(activerecord.IprotoLogger{}), - ) - } + optionCreator = DefaultOptionCreator } clusterInfo, err := activerecord.ConfigCacher().Get( ctx, configPath, - activerecord.MapGlobParam{ - Timeout: DefaultConnectionTimeout, - PoolSize: DefaultPoolSize, - }, + DefaultConnectionParams, optionCreator, ) if err != nil { return nil, fmt.Errorf("can't get cluster %s info: %w", configPath, err) } - if len(clusterInfo) < shard { - return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, len(clusterInfo)) + if clusterInfo.Shards() < shard { + return nil, fmt.Errorf("invalid shard num %d, max = %d", shard, clusterInfo.Shards()) } - var configBox activerecord.ShardInstance + var ( + configBox activerecord.ShardInstance + ok bool + ) switch instType { case activerecord.ReplicaInstanceType: - if len(clusterInfo[shard].Replicas) == 0 { + configBox, ok = clusterInfo.NextReplica(shard) + if !ok { return nil, fmt.Errorf("replicas not set") } - - configBox = clusterInfo[shard].NextReplica() case activerecord.ReplicaOrMasterInstanceType: - if len(clusterInfo[shard].Replicas) != 0 { - configBox = clusterInfo[shard].NextReplica() + configBox, ok = clusterInfo.NextReplica(shard) + if ok { break } fallthrough case activerecord.MasterInstanceType: - configBox = clusterInfo[shard].NextMaster() + configBox = clusterInfo.NextMaster(shard) } conn, err := activerecord.ConnectionCacher().GetOrAdd(configBox, func(options interface{}) (activerecord.ConnectionInterface, error) { @@ -76,14 +82,56 @@ func Box(ctx context.Context, shard int, instType activerecord.ShardInstanceType return nil, fmt.Errorf("error from connectionCacher: %w", err) } - box, ok := conn.(*Connection) - if !ok { + box, ex := conn.(*Connection) + if !ex { return nil, fmt.Errorf("invalid connection type %T, want *octopus.Connection", conn) } return box, nil } +func CheckShardInstance(ctx context.Context, instance activerecord.ShardInstance) (activerecord.OptionInterface, error) { + octopusOpt, ok := instance.Options.(*ConnectionOptions) + if !ok { + return nil, fmt.Errorf("invalit type of options %T, want Options", instance.Options) + } + + var err error + c := activerecord.ConnectionCacher().Get(instance) + if c == nil { + c, err = GetConnection(ctx, octopusOpt) + if err != nil { + return nil, fmt.Errorf("error from connectionCacher: %w", err) + } + } + + conn, ok := c.(*Connection) + if !ok { + return nil, fmt.Errorf("invalid connection type %T, want *octopus.Connection", conn) + } + + if len(conn.pool.Online()) == 0 { + return nil, fmt.Errorf("no online channels") + } + + td, err := CallLua(ctx, conn, "box.dostring", "return box.info.status") + if err != nil { + return nil, fmt.Errorf("can't get status: %w", err) + } + + if len(td) == 1 { + ret := td[0] + switch string(ret.Data[0]) { + case "primary": + return NewOptions(octopusOpt.server, ModeMaster) + default: + return NewOptions(octopusOpt.server, ModeReplica) + } + } + + return nil, fmt.Errorf("can't parse status: %w", err) +} + func ProcessResp(respBytes []byte, cntFlag CountFlags) ([]TupleData, error) { tupleCnt, respData, errResp := UnpackResopnseStatus(respBytes) if errResp != nil { diff --git a/pkg/octopus/options.go b/pkg/octopus/options.go index 6031847..8580795 100644 --- a/pkg/octopus/options.go +++ b/pkg/octopus/options.go @@ -8,6 +8,7 @@ import ( "hash/crc32" "time" + "github.com/mailru/activerecord/pkg/activerecord" "github.com/mailru/activerecord/pkg/iproto/iproto" ) @@ -116,8 +117,8 @@ func (o *ConnectionOptions) GetConnectionID() string { } // InstanceMode - метод для получения режима аботы инстанса RO или RW -func (o *ConnectionOptions) InstanceMode() any { - return o.Mode +func (o *ConnectionOptions) InstanceMode() activerecord.ServerModeType { + return activerecord.ServerModeType(o.Mode) } // ConnectionOption - интерфейс которому должны соответствовать опции передаваемые в конструктор