Skip to content

Commit

Permalink
Fix some bugs when change to two deploy mode (#149)
Browse files Browse the repository at this point in the history
deploy_ctrl: fix changing deployMode



podtemplate defaults should be updated when stopped



Fix manual mode

Signed-off-by: haorenfsa <[email protected]>
  • Loading branch information
haorenfsa authored Jul 17, 2024
1 parent 9b5cad7 commit b820944
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 63 deletions.
3 changes: 3 additions & 0 deletions apis/milvus.io/v1beta1/milvus_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ type MilvusSpec struct {

// IsStopping returns true if the MilvusSpec has replicas serving
func (ms MilvusSpec) IsStopping() bool {
if ms.Com.EnableManualMode {
return false
}
if *ms.Com.Standalone.Replicas != 0 {
return false
}
Expand Down
29 changes: 17 additions & 12 deletions pkg/controllers/deploy_ctrl.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus,
return errors.Wrap(err, "check deploy mode")
}
switch deployMode {
case v1beta1.TwoDeployMode:
err = biz.MarkDeployModeChanging(ctx, mc, false)
if err != nil {
return err
}
case v1beta1.OneDeployMode:
isUpdating, err := biz.IsUpdating(ctx, mc)
if err != nil {
Expand All @@ -79,6 +74,12 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus,
if err != nil {
return errors.Wrap(err, "change to two deployment mode")
}
fallthrough
case v1beta1.TwoDeployMode:
err = biz.MarkDeployModeChanging(ctx, mc, false)
if err != nil {
return err
}
default:
err = errors.Errorf("unknown deploy mode: %d", deployMode)
logger.Error(err, "switch case by deployMode")
Expand All @@ -95,10 +96,6 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus,
return nil
}

if ReplicasValue(component.GetReplicas(mc.Spec)) == 0 {
return biz.HandleStop(ctx, mc)
}

err = biz.HandleRolling(ctx, mc)
if err != nil {
return errors.Wrap(err, "handle rolling")
Expand All @@ -108,6 +105,10 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus,
return nil
}

if ReplicasValue(component.GetReplicas(mc.Spec)) == 0 {
return biz.HandleStop(ctx, mc)
}

err = biz.HandleScaling(ctx, mc)
if err != nil {
return errors.Wrap(err, "handle scaling")
Expand Down Expand Up @@ -166,9 +167,13 @@ func (c *DeployControllerBizImpl) CheckDeployMode(ctx context.Context, mc v1beta
if c.component == QueryNode {
return v1beta1.TwoDeployMode, nil
}
fallthrough
default:
// check in cluster
}
if v1beta1.Labels().IsChangingMode(mc, c.component.Name) {
return v1beta1.OneDeployMode, nil
}
mode, err := c.checkDeployModeInCluster(ctx, mc)
if err != nil {
return mode, errors.Wrap(err, "check rolling mode in cluster")
Expand Down Expand Up @@ -228,16 +233,16 @@ func (c *DeployControllerBizImpl) HandleCreate(ctx context.Context, mc v1beta1.M
case err == ErrNotFound:
err := c.util.MarkMilvusComponentGroupId(ctx, mc, c.component, 0)
if err != nil {
return errors.Wrapf(err, "mark milvus querynode group id to %d", 0)
return errors.Wrapf(err, "mark milvus %s group id to %d", c.component.Name, 0)
}
err = c.util.CreateDeploy(ctx, mc, nil, 0)
if err != nil {
return errors.Wrap(err, "create querynode deployment 0")
return errors.Wrapf(err, "create %s deployment 0", c.component.Name)
}
case err == ErrNoLastDeployment:
return c.util.CreateDeploy(ctx, mc, nil, 1)
default:
return errors.Wrap(err, "get querynode deploys")
return errors.Wrapf(err, "get %s deploys", c.component.Name)
}
return nil
}
Expand Down
99 changes: 61 additions & 38 deletions pkg/controllers/deploy_ctrl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,96 +22,97 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) {
mockRollingModeStatusUpdater := NewMockRollingModeStatusUpdater(mockCtrl)
mc := v1beta1.Milvus{}
DeployControllerImpl := NewDeployController(mockFactory, mockOneDeployModeController, mockRollingModeStatusUpdater)
t.Cleanup(func() {
mockCtrl.Finish()
})

t.Run("update status rolling mode failed", func(t *testing.T) {
defer mockCtrl.Finish()
mockRollingModeStatusUpdater.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
assert.Error(t, err)
})
mockRollingModeStatusUpdater.EXPECT().Update(gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
t.Run("check deploy mode failed", func(t *testing.T) {
defer mockCtrl.Finish()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.DeployModeUnknown, errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
assert.Error(t, err)
})

t.Run("oneDeploy mode updating, continue", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil)
mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(true, nil)
mockOneDeployModeController.EXPECT().Reconcile(gomock.Any(), mc, QueryNode).Return(nil)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(true, nil)
mockOneDeployModeController.EXPECT().Reconcile(gomock.Any(), m, QueryNode).Return(nil)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.NoError(t, err)
})

t.Run("oneDeploy mode check updating failed", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil)
mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, errMock)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.Error(t, err)
})

t.Run("oneDeploy mode change to v2, mark failed", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil)
mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.Error(t, err)
})

t.Run("oneDeploy mode change to v2 failed", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil)
mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(nil)
mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), mc).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(nil)
mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), m).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.Error(t, err)
})

t.Run("oneDeploy mode change to v2 ok, handle create requeue err", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil)
mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(nil)
mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), mc).Return(nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(ErrRequeue)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(nil)
mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), m).Return(nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(ErrRequeue)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.True(t, errors.Is(err, ErrRequeue))
})

t.Run("TwoDeploy mode handle create error", func(t *testing.T) {
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil)
defer mockCtrl.Finish()
m := *mc.DeepCopy()
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(errMock)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.Error(t, err)
})

t.Run("TwoDeploy mode is paused", func(t *testing.T) {
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(nil)
mockBiz.EXPECT().IsPaused(gomock.Any(), mc).Return(true)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
assert.NoError(t, err)
})

t.Run("TwoDeploy mode hanlde stop ok", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
m.Spec.Mode = v1beta1.MilvusModeCluster
m.Default()
m.Spec.Com.QueryNode.Replicas = int32Ptr(0)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(nil)
mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(false)
mockBiz.EXPECT().HandleStop(gomock.Any(), m).Return(nil)
mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(true)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.NoError(t, err)
})

t.Run("TwoDeploy mode rolling err", func(t *testing.T) {
defer mockCtrl.Finish()
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(nil)
Expand All @@ -122,6 +123,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) {
})

t.Run("TwoDeploy mode, manual mode skip scaling", func(t *testing.T) {
defer mockCtrl.Finish()
mc := *mc.DeepCopy()
mc.Spec.Com.EnableManualMode = true
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
Expand All @@ -134,6 +136,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) {
})

t.Run("TwoDeploy mode scaling err", func(t *testing.T) {
defer mockCtrl.Finish()
mc = *mc.DeepCopy()
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil)
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil)
Expand All @@ -145,7 +148,26 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) {
assert.Error(t, err)
})

t.Run("TwoDeploy mode hanlde stop ok", func(t *testing.T) {
defer mockCtrl.Finish()
m := *mc.DeepCopy()
m.Spec.Mode = v1beta1.MilvusModeCluster
m.Default()
m.Spec.Com.QueryNode.Replicas = int32Ptr(0)
gomock.InOrder(
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil),
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil),
mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(nil),
mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(false),
mockBiz.EXPECT().HandleRolling(gomock.Any(), m).Return(nil),
mockBiz.EXPECT().HandleStop(gomock.Any(), m).Return(nil),
)
err := DeployControllerImpl.Reconcile(ctx, m, QueryNode)
assert.NoError(t, err)
})

t.Run("TwoDeploy mode all ok", func(t *testing.T) {
defer mockCtrl.Finish()
gomock.InOrder(
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil),
mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil),
Expand All @@ -159,6 +181,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) {
})

t.Run("unknown mode err", func(t *testing.T) {
defer mockCtrl.Finish()
mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.DeployModeUnknown, nil)
err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode)
assert.Error(t, err)
Expand Down
5 changes: 4 additions & 1 deletion pkg/controllers/deploy_ctrl_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ func (c *DeployControllerBizUtilImpl) RenderPodTemplateWithoutGroupID(mc v1beta1
}
updater := newMilvusDeploymentUpdater(mc, c.cli.Scheme(), component)
appLabels := NewComponentAppLabels(updater.GetIntanceName(), updater.GetComponentName())
updatePodTemplate(updater, ret, appLabels, currentTemplate == nil)
isCreating := currentTemplate == nil
isStopped := ReplicasValue(component.GetReplicas(mc.Spec)) == 0
updateDefaults := isCreating || isStopped
updatePodTemplate(updater, ret, appLabels, updateDefaults)
return ret
}

Expand Down
13 changes: 12 additions & 1 deletion pkg/controllers/deploy_mode_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func NewDeployModeChanger(component MilvusComponent, cli client.Client, util K8s
newStep("update old pod labels", c.UpdateOldPodLabels),
newStep("recover replica sets", c.RecoverReplicaSets),
newStep("recover deploy", c.RecoverDeploy),
newStep("mark current deploy", c.MarkCurrentDeploy),
}
return c
}
Expand Down Expand Up @@ -154,7 +155,8 @@ func (c *DeployModeChangerImpl) RecoverReplicaSets(ctx context.Context, mc v1bet
return errors.Errorf("invalid old replica set name: %s", rs.Name)
}
rsHash := splitedName[len(splitedName)-1]
rs.Name = fmt.Sprintf("%s-0-%s", rs.Name, rsHash)
deployName := strings.Join(splitedName[:len(splitedName)-2], "-")
rs.Name = fmt.Sprintf("%s-0-%s", deployName, rsHash)
logger.Info("recovering old replica set", "new-name", rs.Name)
err = c.util.CreateObject(ctx, &rs)
if err != nil {
Expand Down Expand Up @@ -188,6 +190,15 @@ func (c *DeployModeChangerImpl) RecoverDeploy(ctx context.Context, mc v1beta1.Mi
return nil
}

func (c *DeployModeChangerImpl) MarkCurrentDeploy(ctx context.Context, mc v1beta1.Milvus) error {
if v1beta1.Labels().GetCurrentGroupId(&mc, c.component.Name) == "0" {
return nil
}
v1beta1.Labels().SetCurrentGroupID(&mc, c.component.Name, 0)
err := c.util.UpdateAndRequeue(ctx, &mc)
return errors.Wrap(err, "mark current deploy")
}

func formatSaveOldDeployName(mc v1beta1.Milvus, component MilvusComponent) string {
return fmt.Sprintf("%s-%s-old-deploy", component.Name, mc.Name)
}
Expand Down
28 changes: 18 additions & 10 deletions pkg/controllers/deployment_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func updateDeployment(deployment *appsv1.Deployment, updater deploymentUpdater)
if err != nil {
return err
}
updatePodTemplate(updater, &deployment.Spec.Template, appLabels, isCreating)
isStopped := getDeployReplicas(deployment) == 0
updateDefaults := isCreating || isStopped
updatePodTemplate(updater, &deployment.Spec.Template, appLabels, updateDefaults)
return nil
}

Expand All @@ -87,27 +89,33 @@ func updatePodTemplate(
updater deploymentUpdater,
template *corev1.PodTemplateSpec,
appLabels map[string]string,
isCreating bool,
updateDefaults bool,
) {
currentTemplate := template.DeepCopy()

updatePodMeta(template, appLabels, updater)
updateInitContainers(template, updater)
updateUserDefinedVolumes(template, updater)
updateScheduleSpec(template, updater)
updateMilvusContainer(template, updater, isCreating)
updateMilvusContainer(template, updater, updateDefaults)
updateSidecars(template, updater)
updateNetworkSettings(template, updater)

// no rolling update
if IsEqual(currentTemplate, template) {
var hasUpdates = !IsEqual(currentTemplate, template)
switch {
case hasUpdates:
podTemplateLogger.WithValues(
"namespace", updater.GetMilvus().Namespace,
"milvus", updater.GetMilvus().Name).
Info("pod template updated by crd", "diff", diff.ObjectDiff(currentTemplate, template))
case updateDefaults:
default:
// no updates, no default changes
return
}
podTemplateLogger.WithValues(
"namespace", updater.GetMilvus().Namespace,
"milvus", updater.GetMilvus().Name).
Info("pod template changed", "diff", diff.ObjectDiff(currentTemplate, template))
// some defaults change will cause rolling update, so we only perform when rolling update

// some defaults change will cause rolling update
// so we only perform when rolling update or when caller explicitly ask for it
updateSomeFieldsOnlyWhenRolling(template, updater)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/group_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (ParallelGroupRunner) RunDiffArgs(f MilvusReconcileFunc, ctx context.Contex
}

err := g.Wait()
return errors.Wrap(err, "run group failed")
return errors.Wrap(err, "run group")
}

func getDummyErr(err error) func() error {
Expand Down

0 comments on commit b820944

Please sign in to comment.