diff --git a/apis/milvus.io/v1beta1/milvus_types.go b/apis/milvus.io/v1beta1/milvus_types.go index b07d221e..553eec3b 100644 --- a/apis/milvus.io/v1beta1/milvus_types.go +++ b/apis/milvus.io/v1beta1/milvus_types.go @@ -61,6 +61,9 @@ type MilvusSpec struct { // IsStopping returns true if the MilvusSpec has replicas serving func (ms MilvusSpec) IsStopping() bool { + if ms.Com.EnableManualMode { + return false + } if *ms.Com.Standalone.Replicas != 0 { return false } diff --git a/pkg/controllers/deploy_ctrl.go b/pkg/controllers/deploy_ctrl.go index f9f674c8..472228f1 100644 --- a/pkg/controllers/deploy_ctrl.go +++ b/pkg/controllers/deploy_ctrl.go @@ -55,11 +55,6 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus, return errors.Wrap(err, "check deploy mode") } switch deployMode { - case v1beta1.TwoDeployMode: - err = biz.MarkDeployModeChanging(ctx, mc, false) - if err != nil { - return err - } case v1beta1.OneDeployMode: isUpdating, err := biz.IsUpdating(ctx, mc) if err != nil { @@ -79,6 +74,12 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus, if err != nil { return errors.Wrap(err, "change to two deployment mode") } + fallthrough + case v1beta1.TwoDeployMode: + err = biz.MarkDeployModeChanging(ctx, mc, false) + if err != nil { + return err + } default: err = errors.Errorf("unknown deploy mode: %d", deployMode) logger.Error(err, "switch case by deployMode") @@ -95,10 +96,6 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus, return nil } - if ReplicasValue(component.GetReplicas(mc.Spec)) == 0 { - return biz.HandleStop(ctx, mc) - } - err = biz.HandleRolling(ctx, mc) if err != nil { return errors.Wrap(err, "handle rolling") @@ -108,6 +105,10 @@ func (c *DeployControllerImpl) Reconcile(ctx context.Context, mc v1beta1.Milvus, return nil } + if ReplicasValue(component.GetReplicas(mc.Spec)) == 0 { + return biz.HandleStop(ctx, mc) + } + err = biz.HandleScaling(ctx, mc) if err != nil { return errors.Wrap(err, "handle scaling") @@ -166,9 +167,13 @@ func (c *DeployControllerBizImpl) CheckDeployMode(ctx context.Context, mc v1beta if c.component == QueryNode { return v1beta1.TwoDeployMode, nil } + fallthrough default: // check in cluster } + if v1beta1.Labels().IsChangingMode(mc, c.component.Name) { + return v1beta1.OneDeployMode, nil + } mode, err := c.checkDeployModeInCluster(ctx, mc) if err != nil { return mode, errors.Wrap(err, "check rolling mode in cluster") @@ -228,16 +233,16 @@ func (c *DeployControllerBizImpl) HandleCreate(ctx context.Context, mc v1beta1.M case err == ErrNotFound: err := c.util.MarkMilvusComponentGroupId(ctx, mc, c.component, 0) if err != nil { - return errors.Wrapf(err, "mark milvus querynode group id to %d", 0) + return errors.Wrapf(err, "mark milvus %s group id to %d", c.component.Name, 0) } err = c.util.CreateDeploy(ctx, mc, nil, 0) if err != nil { - return errors.Wrap(err, "create querynode deployment 0") + return errors.Wrapf(err, "create %s deployment 0", c.component.Name) } case err == ErrNoLastDeployment: return c.util.CreateDeploy(ctx, mc, nil, 1) default: - return errors.Wrap(err, "get querynode deploys") + return errors.Wrapf(err, "get %s deploys", c.component.Name) } return nil } diff --git a/pkg/controllers/deploy_ctrl_test.go b/pkg/controllers/deploy_ctrl_test.go index c19cc079..817c6425 100644 --- a/pkg/controllers/deploy_ctrl_test.go +++ b/pkg/controllers/deploy_ctrl_test.go @@ -22,96 +22,97 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) { mockRollingModeStatusUpdater := NewMockRollingModeStatusUpdater(mockCtrl) mc := v1beta1.Milvus{} DeployControllerImpl := NewDeployController(mockFactory, mockOneDeployModeController, mockRollingModeStatusUpdater) - t.Cleanup(func() { - mockCtrl.Finish() - }) t.Run("update status rolling mode failed", func(t *testing.T) { + defer mockCtrl.Finish() mockRollingModeStatusUpdater.EXPECT().Update(gomock.Any(), gomock.Any()).Return(errMock) err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) assert.Error(t, err) }) mockRollingModeStatusUpdater.EXPECT().Update(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() t.Run("check deploy mode failed", func(t *testing.T) { + defer mockCtrl.Finish() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.DeployModeUnknown, errMock) err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) assert.Error(t, err) }) t.Run("oneDeploy mode updating, continue", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil) - mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(true, nil) - mockOneDeployModeController.EXPECT().Reconcile(gomock.Any(), mc, QueryNode).Return(nil) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(true, nil) + mockOneDeployModeController.EXPECT().Reconcile(gomock.Any(), m, QueryNode).Return(nil) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.NoError(t, err) }) t.Run("oneDeploy mode check updating failed", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil) - mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, errMock) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, errMock) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.Error(t, err) }) t.Run("oneDeploy mode change to v2, mark failed", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil) - mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil) - mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(errMock) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil) + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(errMock) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.Error(t, err) }) t.Run("oneDeploy mode change to v2 failed", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil) - mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil) - mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(nil) - mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), mc).Return(errMock) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil) + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(nil) + mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), m).Return(errMock) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.Error(t, err) }) t.Run("oneDeploy mode change to v2 ok, handle create requeue err", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.OneDeployMode, nil) - mockBiz.EXPECT().IsUpdating(gomock.Any(), mc).Return(false, nil) - mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, true).Return(nil) - mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), mc).Return(nil) - mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(ErrRequeue) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().IsUpdating(gomock.Any(), m).Return(false, nil) + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, true).Return(nil) + mockBiz.EXPECT().ChangeToTwoDeployMode(gomock.Any(), m).Return(nil) + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil) + mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(ErrRequeue) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.True(t, errors.Is(err, ErrRequeue)) }) t.Run("TwoDeploy mode handle create error", func(t *testing.T) { - mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil) + defer mockCtrl.Finish() + m := *mc.DeepCopy() + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil) mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) - mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(errMock) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) + mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(errMock) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.Error(t, err) }) t.Run("TwoDeploy mode is paused", func(t *testing.T) { - mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil) - mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) - mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(nil) - mockBiz.EXPECT().IsPaused(gomock.Any(), mc).Return(true) - err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) - assert.NoError(t, err) - }) - - t.Run("TwoDeploy mode hanlde stop ok", func(t *testing.T) { + defer mockCtrl.Finish() m := *mc.DeepCopy() - m.Spec.Mode = v1beta1.MilvusModeCluster - m.Default() - m.Spec.Com.QueryNode.Replicas = int32Ptr(0) - mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil) + mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(nil) - mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(false) - mockBiz.EXPECT().HandleStop(gomock.Any(), m).Return(nil) + mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(true) err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) assert.NoError(t, err) }) t.Run("TwoDeploy mode rolling err", func(t *testing.T) { + defer mockCtrl.Finish() mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil) mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) mockBiz.EXPECT().HandleCreate(gomock.Any(), mc).Return(nil) @@ -122,6 +123,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) { }) t.Run("TwoDeploy mode, manual mode skip scaling", func(t *testing.T) { + defer mockCtrl.Finish() mc := *mc.DeepCopy() mc.Spec.Com.EnableManualMode = true mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) @@ -134,6 +136,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) { }) t.Run("TwoDeploy mode scaling err", func(t *testing.T) { + defer mockCtrl.Finish() mc = *mc.DeepCopy() mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil) mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil) @@ -145,7 +148,26 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) { assert.Error(t, err) }) + t.Run("TwoDeploy mode hanlde stop ok", func(t *testing.T) { + defer mockCtrl.Finish() + m := *mc.DeepCopy() + m.Spec.Mode = v1beta1.MilvusModeCluster + m.Default() + m.Spec.Com.QueryNode.Replicas = int32Ptr(0) + gomock.InOrder( + mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil), + mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), m, false).Return(nil), + mockBiz.EXPECT().HandleCreate(gomock.Any(), m).Return(nil), + mockBiz.EXPECT().IsPaused(gomock.Any(), m).Return(false), + mockBiz.EXPECT().HandleRolling(gomock.Any(), m).Return(nil), + mockBiz.EXPECT().HandleStop(gomock.Any(), m).Return(nil), + ) + err := DeployControllerImpl.Reconcile(ctx, m, QueryNode) + assert.NoError(t, err) + }) + t.Run("TwoDeploy mode all ok", func(t *testing.T) { + defer mockCtrl.Finish() gomock.InOrder( mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.TwoDeployMode, nil), mockBiz.EXPECT().MarkDeployModeChanging(gomock.Any(), mc, false).Return(nil), @@ -159,6 +181,7 @@ func TestDeployControllerImpl_Reconcile(t *testing.T) { }) t.Run("unknown mode err", func(t *testing.T) { + defer mockCtrl.Finish() mockBiz.EXPECT().CheckDeployMode(gomock.Any(), gomock.Any()).Return(v1beta1.DeployModeUnknown, nil) err := DeployControllerImpl.Reconcile(ctx, v1beta1.Milvus{}, QueryNode) assert.Error(t, err) diff --git a/pkg/controllers/deploy_ctrl_util.go b/pkg/controllers/deploy_ctrl_util.go index 9fb7d2f9..ef7c6307 100644 --- a/pkg/controllers/deploy_ctrl_util.go +++ b/pkg/controllers/deploy_ctrl_util.go @@ -92,7 +92,10 @@ func (c *DeployControllerBizUtilImpl) RenderPodTemplateWithoutGroupID(mc v1beta1 } updater := newMilvusDeploymentUpdater(mc, c.cli.Scheme(), component) appLabels := NewComponentAppLabels(updater.GetIntanceName(), updater.GetComponentName()) - updatePodTemplate(updater, ret, appLabels, currentTemplate == nil) + isCreating := currentTemplate == nil + isStopped := ReplicasValue(component.GetReplicas(mc.Spec)) == 0 + updateDefaults := isCreating || isStopped + updatePodTemplate(updater, ret, appLabels, updateDefaults) return ret } diff --git a/pkg/controllers/deploy_mode_changer.go b/pkg/controllers/deploy_mode_changer.go index ff2f9f69..5449b89a 100644 --- a/pkg/controllers/deploy_mode_changer.go +++ b/pkg/controllers/deploy_mode_changer.go @@ -34,6 +34,7 @@ func NewDeployModeChanger(component MilvusComponent, cli client.Client, util K8s newStep("update old pod labels", c.UpdateOldPodLabels), newStep("recover replica sets", c.RecoverReplicaSets), newStep("recover deploy", c.RecoverDeploy), + newStep("mark current deploy", c.MarkCurrentDeploy), } return c } @@ -154,7 +155,8 @@ func (c *DeployModeChangerImpl) RecoverReplicaSets(ctx context.Context, mc v1bet return errors.Errorf("invalid old replica set name: %s", rs.Name) } rsHash := splitedName[len(splitedName)-1] - rs.Name = fmt.Sprintf("%s-0-%s", rs.Name, rsHash) + deployName := strings.Join(splitedName[:len(splitedName)-2], "-") + rs.Name = fmt.Sprintf("%s-0-%s", deployName, rsHash) logger.Info("recovering old replica set", "new-name", rs.Name) err = c.util.CreateObject(ctx, &rs) if err != nil { @@ -188,6 +190,15 @@ func (c *DeployModeChangerImpl) RecoverDeploy(ctx context.Context, mc v1beta1.Mi return nil } +func (c *DeployModeChangerImpl) MarkCurrentDeploy(ctx context.Context, mc v1beta1.Milvus) error { + if v1beta1.Labels().GetCurrentGroupId(&mc, c.component.Name) == "0" { + return nil + } + v1beta1.Labels().SetCurrentGroupID(&mc, c.component.Name, 0) + err := c.util.UpdateAndRequeue(ctx, &mc) + return errors.Wrap(err, "mark current deploy") +} + func formatSaveOldDeployName(mc v1beta1.Milvus, component MilvusComponent) string { return fmt.Sprintf("%s-%s-old-deploy", component.Name, mc.Name) } diff --git a/pkg/controllers/deployment_updater.go b/pkg/controllers/deployment_updater.go index 25f54eb4..cfbebcd1 100644 --- a/pkg/controllers/deployment_updater.go +++ b/pkg/controllers/deployment_updater.go @@ -77,7 +77,9 @@ func updateDeployment(deployment *appsv1.Deployment, updater deploymentUpdater) if err != nil { return err } - updatePodTemplate(updater, &deployment.Spec.Template, appLabels, isCreating) + isStopped := getDeployReplicas(deployment) == 0 + updateDefaults := isCreating || isStopped + updatePodTemplate(updater, &deployment.Spec.Template, appLabels, updateDefaults) return nil } @@ -87,7 +89,7 @@ func updatePodTemplate( updater deploymentUpdater, template *corev1.PodTemplateSpec, appLabels map[string]string, - isCreating bool, + updateDefaults bool, ) { currentTemplate := template.DeepCopy() @@ -95,19 +97,25 @@ func updatePodTemplate( updateInitContainers(template, updater) updateUserDefinedVolumes(template, updater) updateScheduleSpec(template, updater) - updateMilvusContainer(template, updater, isCreating) + updateMilvusContainer(template, updater, updateDefaults) updateSidecars(template, updater) updateNetworkSettings(template, updater) - // no rolling update - if IsEqual(currentTemplate, template) { + var hasUpdates = !IsEqual(currentTemplate, template) + switch { + case hasUpdates: + podTemplateLogger.WithValues( + "namespace", updater.GetMilvus().Namespace, + "milvus", updater.GetMilvus().Name). + Info("pod template updated by crd", "diff", diff.ObjectDiff(currentTemplate, template)) + case updateDefaults: + default: + // no updates, no default changes return } - podTemplateLogger.WithValues( - "namespace", updater.GetMilvus().Namespace, - "milvus", updater.GetMilvus().Name). - Info("pod template changed", "diff", diff.ObjectDiff(currentTemplate, template)) - // some defaults change will cause rolling update, so we only perform when rolling update + + // some defaults change will cause rolling update + // so we only perform when rolling update or when caller explicitly ask for it updateSomeFieldsOnlyWhenRolling(template, updater) } diff --git a/pkg/controllers/group_runner.go b/pkg/controllers/group_runner.go index 2ad26b41..54041d2d 100644 --- a/pkg/controllers/group_runner.go +++ b/pkg/controllers/group_runner.go @@ -114,7 +114,7 @@ func (ParallelGroupRunner) RunDiffArgs(f MilvusReconcileFunc, ctx context.Contex } err := g.Wait() - return errors.Wrap(err, "run group failed") + return errors.Wrap(err, "run group") } func getDummyErr(err error) func() error {