Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#28 connection pinger design #29

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ require (
github.com/stretchr/testify v1.8.4
golang.org/x/mod v0.7.0
golang.org/x/net v0.7.0
golang.org/x/sync v0.1.0
golang.org/x/sys v0.5.0
golang.org/x/text v0.7.0
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/tools v0.5.0
gopkg.in/yaml.v3 v3.0.1
gotest.tools v2.2.0+incompatible
)

Expand All @@ -21,5 +23,4 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ golang.org/x/mod v0.7.0 h1:LapD9S96VoQRhi/GrNTqeBJFrUjs5UHCAtTlgwA5oZA=
golang.org/x/mod v0.7.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/generator/tmpl/octopus/main.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -824,8 +824,8 @@ func selectBox (ctx context.Context, indexnum uint32, keysPacked [][][]byte, lim
logger.Warn(ctx, "Select limit reached. Result may less than db records.")
}

mode, ok := connection.InstanceMode().(octopus.ServerModeType)
if !ok || activerecord.ServerModeType(mode) == activerecord.ModeReplica {
mode, ok := connection.InstanceMode().(activerecord.ServerModeType)
if !ok || mode == activerecord.ModeReplica {
if !ok {
logger.Error(ctx, "Invalid server mode type: %T", connection.InstanceMode())
}
Expand Down
19 changes: 17 additions & 2 deletions pkg/activerecord/activerecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (l Limiter) String() string {
return fmt.Sprintf("Limit: %d, Offset: %d, Is Threshold: %t", l.limit, l.offset, l.fullfillWarn)
}

//go:generate mockery --name ConfigInterface --filename mock_config.go --structname MockConfig --with-expecter=true --inpackage
type ConfigInterface interface {
GetBool(ctx context.Context, confPath string, dfl ...bool) bool
GetBoolIfExists(ctx context.Context, confPath string) (value bool, ok bool)
Expand Down Expand Up @@ -90,8 +91,12 @@ type ConnectionCacherInterface interface {
CloseConnection(context.Context)
}

type ClusterCheckerInterface interface {
AddClusterChecker(ctx context.Context, path string, params ClusterConfigParameters) (*Cluster, error)
}

type ConfigCacherInterface interface {
Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (Cluster, error)
Get(ctx context.Context, path string, glob MapGlobParam, optionCreator func(ShardInstanceConfig) (OptionInterface, error)) (*Cluster, error)
}

type SerializerInterface interface {
Expand Down Expand Up @@ -125,6 +130,7 @@ type ActiveRecord struct {
metric MetricInterface
connectionCacher ConnectionCacherInterface
configCacher ConfigCacherInterface
pinger ClusterCheckerInterface
}

var instance *ActiveRecord
Expand Down Expand Up @@ -157,7 +163,7 @@ func InitActiveRecord(opts ...Option) {
config: NewDefaultConfig(),
metric: NewDefaultNoopMetric(),
connectionCacher: newConnectionPool(),
configCacher: newConfigCacher(),
configCacher: NewConfigCacher(),
}

for _, opt := range opts {
Expand Down Expand Up @@ -192,3 +198,12 @@ func ConnectionCacher() ConnectionCacherInterface {
func ConfigCacher() ConfigCacherInterface {
return GetInstance().configCacher
}

// AddClusterChecker регистрирует конфигурацию кластера в локальном пингере
func AddClusterChecker(ctx context.Context, configPath string, params ClusterConfigParameters) (*Cluster, error) {
if GetInstance().pinger == nil {
return nil, fmt.Errorf("connection pinger is not configured. Configure it with function InitActiveRecord and WithConnectionPinger option ")
}

return GetInstance().pinger.AddClusterChecker(ctx, configPath, params)
}
Loading