From b7d397b2bd51c04509afabbd9a744ba6beaa218c Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Thu, 19 Sep 2024 14:24:47 +0800 Subject: [PATCH] Reconnect to the etcd in the startup phase (#538) Signed-off-by: Gao Hongtao --- CHANGES.md | 1 + banyand/metadata/client.go | 76 +++++++++++++++++++++++++++++++------- scripts/push-release.sh | 1 + 3 files changed, 64 insertions(+), 14 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 20cd8b38f..22c7ee358 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -42,6 +42,7 @@ Release Notes. - Fix several "sync.Pool" leak issues by adding a tracker to the pool. - Fix panic when removing a expired segment. - Fix panic when reading a disorder block of measure. This block's versions are not sorted in descending order. +- Fix the bug that the etcd client doesn't reconnect when facing the context timeout in the startup phase. ### Documentation diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go index 61ba8164f..94a7f1bd8 100644 --- a/banyand/metadata/client.go +++ b/banyand/metadata/client.go @@ -19,6 +19,9 @@ package metadata import ( "context" + "os" + "os/signal" + "syscall" "time" "github.com/pkg/errors" @@ -68,6 +71,7 @@ type clientService struct { etcdTLSCertFile string etcdTLSKeyFile string endpoints []string + registryTimeout time.Duration forceRegisterNode bool } @@ -84,6 +88,7 @@ func (s *clientService) FlagSet() *run.FlagSet { fs.StringVar(&s.etcdTLSCAFile, flagEtcdTLSCAFile, "", "Trusted certificate authority") fs.StringVar(&s.etcdTLSCertFile, flagEtcdTLSCertFile, "", "Etcd client certificate") fs.StringVar(&s.etcdTLSKeyFile, flagEtcdTLSKeyFile, "", "Private key for the etcd client certificate.") + fs.DurationVar(&s.registryTimeout, "node-registry-timeout", 2*time.Minute, "The timeout for the node registry") return fs } @@ -95,17 +100,50 @@ func (s *clientService) Validate() error { } func (s *clientService) PreRun(ctx context.Context) error { - var err error - s.schemaRegistry, err = schema.NewEtcdSchemaRegistry( - schema.Namespace(s.namespace), - schema.ConfigureServerEndpoints(s.endpoints), - schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword), - schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile), - schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile), - ) - if err != nil { + stopCh := make(chan struct{}) + sn := make(chan os.Signal, 1) + l := logger.GetLogger(s.Name()) + signal.Notify(sn, + syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM) + go func() { + select { + case si := <-sn: + logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si) + close(stopCh) + case <-s.closer.CloseNotify(): + close(stopCh) + } + }() + + for { + var err error + s.schemaRegistry, err = schema.NewEtcdSchemaRegistry( + schema.Namespace(s.namespace), + schema.ConfigureServerEndpoints(s.endpoints), + schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword), + schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile), + schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile), + ) + if errors.Is(err, context.DeadlineExceeded) { + select { + case <-stopCh: + return errors.New("pre-run interrupted") + case <-time.After(s.registryTimeout): + return errors.New("pre-run timeout") + case <-s.closer.CloseNotify(): + return errors.New("pre-run interrupted") + default: + l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...") + time.Sleep(time.Second) + continue + } + } + if err == nil { + break + } return err } + val := ctx.Value(common.ContextNodeKey) if val == nil { return errors.New("node id is empty") @@ -116,7 +154,6 @@ func (s *clientService) PreRun(ctx context.Context) error { return errors.New("node roles is empty") } nodeRoles := val.([]databasev1.Role) - l := logger.GetLogger(s.Name()) nodeInfo := &databasev1.Node{ Metadata: &commonv1.Metadata{ Name: node.NodeID, @@ -126,15 +163,26 @@ func (s *clientService) PreRun(ctx context.Context) error { Roles: nodeRoles, CreatedAt: timestamppb.Now(), } + var cancel context.CancelFunc for { - ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10) - err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, s.forceRegisterNode) + ctx, cancel = context.WithTimeout(ctx, time.Second*10) + err := s.schemaRegistry.RegisterNode(ctx, nodeInfo, s.forceRegisterNode) cancel() if errors.Is(err, schema.ErrGRPCAlreadyExists) { return errors.Wrapf(err, "node[%s] already exists in etcd", node.NodeID) } else if errors.Is(err, context.DeadlineExceeded) { - l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...") - continue + select { + case <-stopCh: + return errors.New("register node interrupted") + case <-time.After(s.registryTimeout): + return errors.New("register node timeout") + case <-s.closer.CloseNotify(): + return errors.New("register node interrupted") + default: + l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("register node timeout, retrying...") + time.Sleep(time.Second) + continue + } } if err == nil { l.Info().Stringer("info", nodeInfo).Msg("register node successfully") diff --git a/scripts/push-release.sh b/scripts/push-release.sh index 0a0ed328e..4c41f32ec 100755 --- a/scripts/push-release.sh +++ b/scripts/push-release.sh @@ -45,6 +45,7 @@ cp ${PRODUCT_NAME}-*.tgz.asc skywalking/banyandb/"$VERSION" cp ${PRODUCT_NAME}-*.tgz.sha512 skywalking/banyandb/"$VERSION" cd skywalking/banyandb && svn add "$VERSION" && svn commit -m "Draft Apache SkyWalking BanyanDB release $VERSION" +cd "$VERSION" cat << EOF =========================================================================