Skip to content

Commit

Permalink
fix: support leave member by using any pods (#8031)
Browse files Browse the repository at this point in the history
  • Loading branch information
cjc7373 authored Sep 3, 2024
1 parent 3328596 commit 8e6f092
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 27 deletions.
2 changes: 1 addition & 1 deletion controllers/apps/component_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var mockLorryClientDefault = func() {
var mockLorryClient4HScale = func(clusterKey types.NamespacedName, compName string, replicas int) {
mockLorryClient(func(recorder *lorry.MockClientMockRecorder) {
recorder.JoinMember(gomock.Any()).Return(nil).AnyTimes()
recorder.LeaveMember(gomock.Any()).DoAndReturn(func(ctx context.Context) error {
recorder.LeaveMember(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, memberName string) error {
var podList corev1.PodList
labels := client.MatchingLabels{
constant.AppInstanceLabelKey: clusterKey.Name,
Expand Down
23 changes: 19 additions & 4 deletions controllers/apps/transformer_component_workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,16 @@ func (r *componentWorkloadOps) scaleOut(itsObj *workloads.InstanceSet) error {
}
}

func getHealthyLorryClient(pods []*corev1.Pod) (lorry.Client, error) {
for _, pod := range pods {
lorryCli, err := lorry.NewClient(*pod)
if err == nil {
return lorryCli, nil
}
}
return nil, fmt.Errorf("no health lorry client found")
}

func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
labels := constant.GetComponentWellKnownLabels(r.synthesizeComp.ClusterName, r.synthesizeComp.Name)
pods, err := component.ListPodOwnedByComponent(r.reqCtx.Ctx, r.cli, r.synthesizeComp.Namespace, labels, inDataContext4C())
Expand Down Expand Up @@ -613,12 +623,17 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
podsToMemberLeave = append(podsToMemberLeave, pod)
}
for _, pod := range podsToMemberLeave {
// try the pod to leave first
lorryCli, err1 := lorry.NewClient(*pod)
if err1 != nil {
if err == nil {
err = err1
// try another pod
lorryCli, err1 = getHealthyLorryClient(pods)
if err1 != nil {
if err == nil {
err = err1
}
continue
}
continue
}

if intctrlutil.IsNil(lorryCli) {
Expand All @@ -631,7 +646,7 @@ func (r *componentWorkloadOps) leaveMember4ScaleIn() error {
return switchoverErr
}

if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx); err2 != nil {
if err2 := lorryCli.LeaveMember(r.reqCtx.Ctx, pod.Name); err2 != nil {
// For the purpose of upgrade compatibility, if the version of Lorry is 0.7 and
// the version of KB is upgraded to 0.8 or newer, lorry client will return an NotImplemented error,
// in this case, here just ignore it.
Expand Down
7 changes: 5 additions & 2 deletions pkg/lorry/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,11 @@ func (cli *lorryClient) JoinMember(ctx context.Context) error {
}

// LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave.
func (cli *lorryClient) LeaveMember(ctx context.Context) error {
_, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, nil)
func (cli *lorryClient) LeaveMember(ctx context.Context, memberName string) error {
req := map[string]any{
"memberName": memberName,
}
_, err := cli.Request(ctx, string(LeaveMemberOperation), http.MethodPost, req)
return err
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/lorry/client/client_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions pkg/lorry/client/httpclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,33 +447,33 @@ var _ = Describe("Lorry HTTP Client", func() {
})

It("success if leave once", func() {
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(2)
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(1)
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(2)
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), "")).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
})

It("success if leave twice", func() {
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(4)
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(2)
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(2)
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil).Times(2)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil).Times(3)
// first leave
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), "")).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
// second leave
Expect(lorryClient.LeaveMember(context.TODO())).Should(Succeed())
Expect(lorryClient.LeaveMember(context.TODO(), "")).Should(Succeed())
Expect(cluster.HaConfig.DeleteMembers).Should(HaveLen(1))
})

It("not implemented", func() {
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(2)
mockDBManager.EXPECT().GetCurrentMemberName().Return(podName).Times(1)
mockDBManager.EXPECT().LeaveMemberFromCluster(gomock.Any(), gomock.Any(), gomock.Any()).Return(fmt.Errorf(msg))
mockDCSStore.EXPECT().GetCluster().Return(cluster, nil)
mockDCSStore.EXPECT().UpdateHaConfig().Return(nil)
err := lorryClient.LeaveMember(context.TODO())
err := lorryClient.LeaveMember(context.TODO(), "")
Expect(err).Should(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring(msg))
})
Expand Down Expand Up @@ -509,7 +509,7 @@ var _ = Describe("Lorry HTTP Client", func() {
_ = ops[strings.ToLower(string(util.LeaveMemberOperation))].Init(context.TODO())
customManager, _ := custom.NewManager(engines.Properties{})
register.SetCustomManager(customManager)
err := lorryClient.LeaveMember(context.TODO())
err := lorryClient.LeaveMember(context.TODO(), "")
Expect(err).Should(HaveOccurred())
Expect(err.Error()).Should(ContainSubstring("executable file not found"))
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/lorry/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Client interface {
JoinMember(ctx context.Context) error

// LeaveMember sends a Leave member operation request to Lorry, located on the target pod that is about to leave.
LeaveMember(ctx context.Context) error
LeaveMember(ctx context.Context, memberName string) error

Switchover(ctx context.Context, primary, candidate string, force bool) error
Lock(ctx context.Context) error
Expand Down
21 changes: 14 additions & 7 deletions pkg/lorry/operations/replica/leave.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,28 @@ func (s *Leave) Do(ctx context.Context, req *operations.OpsRequest) (*operations
return nil, err
}

currentMember := cluster.GetMemberWithName(manager.GetCurrentMemberName())
if !cluster.HaConfig.IsDeleting(currentMember) {
cluster.HaConfig.AddMemberToDelete(currentMember)
var memberToLeaveName string
if req.GetString("memberName") == "" {
memberToLeaveName = manager.GetCurrentMemberName()
} else {
memberToLeaveName = req.GetString("memberName")
}

memberToLeave := cluster.GetMemberWithName(memberToLeaveName)
if !cluster.HaConfig.IsDeleting(memberToLeave) {
cluster.HaConfig.AddMemberToDelete(memberToLeave)
_ = s.dcsStore.UpdateHaConfig()
}

// remove current member from db cluster
err = manager.LeaveMemberFromCluster(ctx, cluster, manager.GetCurrentMemberName())
// remove member from db cluster
err = manager.LeaveMemberFromCluster(ctx, cluster, memberToLeaveName)
if err != nil {
s.logger.Error(err, "Leave member from cluster failed")
return nil, err
}

if cluster.HaConfig.IsDeleting(currentMember) {
cluster.HaConfig.FinishDeleted(currentMember)
if cluster.HaConfig.IsDeleting(memberToLeave) {
cluster.HaConfig.FinishDeleted(memberToLeave)
_ = s.dcsStore.UpdateHaConfig()
}

Expand Down

0 comments on commit 8e6f092

Please sign in to comment.