diff --git a/Makefile b/Makefile index ea219925..20ac3fa7 100644 --- a/Makefile +++ b/Makefile @@ -234,7 +234,7 @@ sit-prepare-images: sit-prepare-operator-images @echo "Preparing images" docker pull milvusdb/milvus:v2.4.6 - docker pull -q apachepulsar/pulsar:2.8.2 + # docker pull -q apachepulsar/pulsar:2.8.2 docker pull -q bitnami/kafka:3.1.0-debian-10-r52 docker pull -q milvusdb/etcd:3.5.5-r4 docker pull -q minio/minio:RELEASE.2023-03-20T20-16-18Z @@ -250,7 +250,7 @@ sit-load-operator-images: sit-load-images: sit-load-operator-images @echo "Loading images" kind load docker-image milvusdb/milvus:v2.4.6 - kind load docker-image apachepulsar/pulsar:2.8.2 --name ${KIND_CLUSTER} + # kind load docker-image apachepulsar/pulsar:2.8.2 --name ${KIND_CLUSTER} kind load docker-image bitnami/kafka:3.1.0-debian-10-r52 --name ${KIND_CLUSTER} kind load docker-image milvusdb/etcd:3.5.5-r4 --name ${KIND_CLUSTER} kind load docker-image minio/minio:RELEASE.2023-03-20T20-16-18Z --name ${KIND_CLUSTER} @@ -259,7 +259,7 @@ sit-load-images: sit-load-operator-images sit-load-and-cleanup-images: sit-load-images @echo "Clean up some big images to save disk space in github action" docker rmi milvusdb/milvus:v2.4.6 - docker rmi apachepulsar/pulsar:2.8.2 + # docker rmi apachepulsar/pulsar:2.8.2 docker rmi bitnami/kafka:3.1.0-debian-10-r52 docker rmi milvusdb/etcd:3.5.5-r4 docker rmi minio/minio:RELEASE.2023-03-20T20-16-18Z diff --git a/apis/milvus.io/v1beta1/milvus_types_test.go b/apis/milvus.io/v1beta1/milvus_types_test.go index 40aae9ec..169bd6a9 100644 --- a/apis/milvus.io/v1beta1/milvus_types_test.go +++ b/apis/milvus.io/v1beta1/milvus_types_test.go @@ -123,6 +123,7 @@ func TestMilvusSpec_IsStopping(t *testing.T) { }) com.MixCoord = nil + com.RootCoord = &MilvusRootCoord{} m.Default() t.Run("cluster not stopping", func(t *testing.T) { assert.False(t, m.Spec.IsStopping()) diff --git a/apis/milvus.io/v1beta1/milvus_webhook.go b/apis/milvus.io/v1beta1/milvus_webhook.go index 9a64b265..82ae5a2b 100644 --- a/apis/milvus.io/v1beta1/milvus_webhook.go +++ b/apis/milvus.io/v1beta1/milvus_webhook.go @@ -240,6 +240,13 @@ func (r *Milvus) DefaultMode() { } } +func (r *Milvus) noCoordSpecifiedByUser() bool { + return r.Spec.Com.RootCoord == nil && + r.Spec.Com.DataCoord == nil && + r.Spec.Com.IndexCoord == nil && + r.Spec.Com.QueryCoord == nil +} + func (r *Milvus) DefaultComponents() { spec := &r.Spec setDefaultStr(&spec.Com.Image, config.DefaultMilvusImage) @@ -257,17 +264,22 @@ func (r *Milvus) DefaultComponents() { spec.Com.Proxy = &MilvusProxy{} } if spec.Com.MixCoord == nil { - if spec.Com.RootCoord == nil { - spec.Com.RootCoord = &MilvusRootCoord{} - } - if spec.Com.DataCoord == nil { - spec.Com.DataCoord = &MilvusDataCoord{} - } - if spec.Com.IndexCoord == nil { - spec.Com.IndexCoord = &MilvusIndexCoord{} - } - if spec.Com.QueryCoord == nil { - spec.Com.QueryCoord = &MilvusQueryCoord{} + if r.noCoordSpecifiedByUser() { + // default to use mixcoord + spec.Com.MixCoord = &MilvusMixCoord{} + } else { + if spec.Com.RootCoord == nil { + spec.Com.RootCoord = &MilvusRootCoord{} + } + if spec.Com.DataCoord == nil { + spec.Com.DataCoord = &MilvusDataCoord{} + } + if spec.Com.IndexCoord == nil { + spec.Com.IndexCoord = &MilvusIndexCoord{} + } + if spec.Com.QueryCoord == nil { + spec.Com.QueryCoord = &MilvusQueryCoord{} + } } } if spec.Com.DataNode == nil { @@ -296,9 +308,6 @@ func (r *Milvus) defaultComponentsReplicas() { spec.Com.MixCoord.Replicas = &defaultReplicas } } else { - if spec.Com.Proxy.Replicas == nil { - spec.Com.Proxy.Replicas = &defaultReplicas - } if spec.Com.RootCoord.Replicas == nil { spec.Com.RootCoord.Replicas = &defaultReplicas } @@ -311,15 +320,18 @@ func (r *Milvus) defaultComponentsReplicas() { if spec.Com.QueryCoord.Replicas == nil { spec.Com.QueryCoord.Replicas = &defaultReplicas } - if spec.Com.DataNode.Replicas == nil { - spec.Com.DataNode.Replicas = &defaultReplicas - } - if spec.Com.IndexNode.Replicas == nil { - spec.Com.IndexNode.Replicas = &defaultReplicas - } - if spec.Com.QueryNode.Replicas == nil { - spec.Com.QueryNode.Replicas = &defaultReplicas - } + } + if spec.Com.Proxy.Replicas == nil { + spec.Com.Proxy.Replicas = &defaultReplicas + } + if spec.Com.DataNode.Replicas == nil { + spec.Com.DataNode.Replicas = &defaultReplicas + } + if spec.Com.IndexNode.Replicas == nil { + spec.Com.IndexNode.Replicas = &defaultReplicas + } + if spec.Com.QueryNode.Replicas == nil { + spec.Com.QueryNode.Replicas = &defaultReplicas } } else { if spec.Com.Standalone.Replicas == nil { @@ -470,14 +482,12 @@ func (r *Milvus) DefaultConf() { } if r.Spec.Com.EnableRollingUpdate == nil { - if r.isRollingUpdateEnabledByConfig() { - r.Spec.Com.EnableRollingUpdate = util.BoolPtr(true) - } + r.Spec.Com.EnableRollingUpdate = util.BoolPtr(true) } - if r.Spec.Com.EnableRollingUpdate != nil && - *r.Spec.Com.EnableRollingUpdate { - r.setRollingUpdate(true) + if !r.isRollingUpdateSupportedByConfig() { + r.Spec.Com.EnableRollingUpdate = util.BoolPtr(false) } + setEnableActiveStandby(&r.Spec, true) } var rollingUpdateConfigFields = []string{ @@ -490,24 +500,18 @@ var rollingUpdateConfigFields = []string{ // EnableActiveStandByConfig is a config in coordinators to determine whether a coordinator can be rolling updated const EnableActiveStandByConfig = "enableActiveStandby" -func (r *Milvus) isRollingUpdateEnabledByConfig() bool { +func (r *Milvus) isRollingUpdateSupportedByConfig() bool { if r.Spec.Mode != MilvusModeCluster { switch r.Spec.Dep.MsgStreamType { case MsgStreamTypeRocksMQ, MsgStreamTypeNatsMQ: return false } } - for _, configFieldName := range rollingUpdateConfigFields { - enableActiveStandBy, _ := util.GetBoolValue(r.Spec.Conf.Data, configFieldName, EnableActiveStandByConfig) - if !enableActiveStandBy { - return false - } - } return true } -func (r *Milvus) setRollingUpdate(enabled bool) { +func setEnableActiveStandby(spec *MilvusSpec, enabled bool) { for _, configFieldName := range rollingUpdateConfigFields { - util.SetValue(r.Spec.Conf.Data, enabled, configFieldName, EnableActiveStandByConfig) + util.SetValue(spec.Conf.Data, enabled, configFieldName, EnableActiveStandByConfig) } } diff --git a/apis/milvus.io/v1beta1/milvus_webhook_test.go b/apis/milvus.io/v1beta1/milvus_webhook_test.go index bbf4d6a6..551b8395 100644 --- a/apis/milvus.io/v1beta1/milvus_webhook_test.go +++ b/apis/milvus.io/v1beta1/milvus_webhook_test.go @@ -60,12 +60,14 @@ func TestMilvus_Default_NotExternal(t *testing.T) { Component: defaultComponent, }, }, - RollingMode: RollingModeV2, + EnableRollingUpdate: util.BoolPtr(false), + RollingMode: RollingModeV2, }, Conf: Values{ Data: map[string]interface{}{}, }, } + setEnableActiveStandby(&standaloneDefault, true) t.Run("standalone not external ok", func(t *testing.T) { mc := Milvus{ObjectMeta: metav1.ObjectMeta{Name: crName}} @@ -105,16 +107,8 @@ func TestMilvus_Default_NotExternal(t *testing.T) { Component: defaultComponent, }, }, - RootCoord: &MilvusRootCoord{ - Component: defaultComponent, - }, - DataCoord: &MilvusDataCoord{ - Component: defaultComponent, - }, - IndexCoord: &MilvusIndexCoord{ - Component: defaultComponent, - }, - QueryCoord: &MilvusQueryCoord{ + EnableRollingUpdate: util.BoolPtr(true), + MixCoord: &MilvusMixCoord{ Component: defaultComponent, }, DataNode: &MilvusDataNode{ @@ -132,7 +126,7 @@ func TestMilvus_Default_NotExternal(t *testing.T) { }, }, } - t.Run("cluster not external ok", func(t *testing.T) { + t.Run("cluster not external dep ok", func(t *testing.T) { mc := Milvus{ObjectMeta: metav1.ObjectMeta{Name: crName}} mc.Spec.Mode = MilvusModeCluster mc.Default() @@ -216,7 +210,7 @@ func TestMilvus_Default_DeleteUnSetableOK(t *testing.T) { }, } mc.Default() - assert.Equal(t, conf, mc.Spec.Conf) + assert.Equal(t, conf.Data["minio"], mc.Spec.Conf.Data["minio"]) } func TestMilvus_ValidateCreate_NoError(t *testing.T) { @@ -297,17 +291,8 @@ func Test_DefaultLabels_Legacy(t *testing.T) { } func Test_DefaultConf_EnableRollingUpdate(t *testing.T) { - - t.Run("default nil", func(t *testing.T) { - m := Milvus{} - m.DefaultConf() - assert.Nil(t, m.Spec.Com.EnableRollingUpdate) - }) - - t.Run("enable by config", func(t *testing.T) { + t.Run("default enable", func(t *testing.T) { m := Milvus{} - m.Spec.Conf.Data = map[string]interface{}{} - m.setRollingUpdate(true) m.DefaultConf() assert.True(t, *m.Spec.Com.EnableRollingUpdate) }) @@ -316,24 +301,23 @@ func Test_DefaultConf_EnableRollingUpdate(t *testing.T) { m := Milvus{} m.Spec.Com.EnableRollingUpdate = util.BoolPtr(true) m.DefaultConf() - assert.True(t, m.isRollingUpdateEnabledByConfig()) + assert.True(t, *m.Spec.Com.EnableRollingUpdate) }) t.Run("set false", func(t *testing.T) { m := Milvus{} m.Spec.Com.EnableRollingUpdate = util.BoolPtr(false) m.DefaultConf() - assert.False(t, m.isRollingUpdateEnabledByConfig()) + assert.False(t, *m.Spec.Com.EnableRollingUpdate) }) t.Run("rocksmq false", func(t *testing.T) { m := Milvus{} m.DefaultConf() - m.setRollingUpdate(true) m.Spec.Com.EnableRollingUpdate = util.BoolPtr(true) m.Spec.Dep.MsgStreamType = MsgStreamTypeRocksMQ m.DefaultConf() - assert.False(t, m.isRollingUpdateEnabledByConfig()) + assert.False(t, *m.Spec.Com.EnableRollingUpdate) }) } diff --git a/config/samples/milvus_cluster_4coords.yaml b/config/samples/milvus_cluster_4coords.yaml new file mode 100644 index 00000000..87c98cdb --- /dev/null +++ b/config/samples/milvus_cluster_4coords.yaml @@ -0,0 +1,16 @@ +# This is a sample to deploy a milvus cluster using +# rootCoord, indexCoord, dataCoord and queryCoord instead of mixCoord. + +apiVersion: milvus.io/v1beta1 +kind: Milvus +metadata: + name: my-release + labels: + app: milvus +spec: + mode: cluster + dependencies: {} + components: + rootCoord: + replicas: 1 + config: {} \ No newline at end of file diff --git a/config/samples/milvus_cluster_mixcoord.yaml b/config/samples/milvus_cluster_mixcoord.yaml index b9eaaaca..2398e766 100644 --- a/config/samples/milvus_cluster_mixcoord.yaml +++ b/config/samples/milvus_cluster_mixcoord.yaml @@ -10,6 +10,7 @@ spec: mode: cluster dependencies: {} components: + # now mixCoord is used by default # mixCoord deploys all coordinators in one 'mixture' deployment mixCoord: replicas: 1 diff --git a/pkg/controllers/component_condition_test.go b/pkg/controllers/component_condition_test.go index 8760337c..485a53a6 100644 --- a/pkg/controllers/component_condition_test.go +++ b/pkg/controllers/component_condition_test.go @@ -103,6 +103,7 @@ func TestComponentConditionGetter_GetMilvusInstanceCondition(t *testing.T) { }) milvus.Spec.Mode = v1beta1.MilvusModeCluster + milvus.Spec.Com.RootCoord = &v1beta1.MilvusRootCoord{} milvus.Default() t.Run(("cluster all ok"), func(t *testing.T) { mockClient.EXPECT().List(gomock.Any(), gomock.Any(), gomock.Any()). diff --git a/pkg/controllers/components_test.go b/pkg/controllers/components_test.go index de725e5b..6c297f69 100644 --- a/pkg/controllers/components_test.go +++ b/pkg/controllers/components_test.go @@ -583,6 +583,7 @@ func TestMilvusComponent_GetDependencies(t *testing.T) { m := v1beta1.Milvus{} m.Spec.Mode = v1beta1.MilvusModeCluster m.Spec.Com.Image = "milvusdb/milvus:v2.3.0" + m.Spec.Com.RootCoord = &v1beta1.MilvusRootCoord{} m.Default() assert.Len(t, IndexNode.GetDependencies(m.Spec), 0) assert.Equal(t, IndexNode, RootCoord.GetDependencies(m.Spec)[0]) @@ -597,6 +598,7 @@ func TestMilvusComponent_GetDependencies(t *testing.T) { t.Run("clusterModeDowngrade", func(t *testing.T) { m := v1beta1.Milvus{} m.Spec.Com.Image = "milvusdb/milvus:2.3.0" + m.Spec.Com.RootCoord = &v1beta1.MilvusRootCoord{} m.Spec.Mode = v1beta1.MilvusModeCluster m.Spec.Com.ImageUpdateMode = v1beta1.ImageUpdateModeRollingDowngrade m.Default() diff --git a/pkg/controllers/deployments_test.go b/pkg/controllers/deployments_test.go index 7683d6c1..54485c4d 100644 --- a/pkg/controllers/deployments_test.go +++ b/pkg/controllers/deployments_test.go @@ -47,11 +47,11 @@ func TestClusterReconciler_ReconcileDeployments_CreateIfNotFound(t *testing.T) { mockClient.EXPECT(). Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(k8sErrors.NewNotFound(schema.GroupResource{}, "")). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockClient.EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(nil). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockQnController.EXPECT().Reconcile(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) err := r.ReconcileDeployments(ctx, *mcDefault.DeepCopy()) @@ -74,11 +74,11 @@ func TestClusterReconciler_ReconcileDeployments_CreateIfNotFound(t *testing.T) { mockClient.EXPECT(). Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(k8sErrors.NewNotFound(schema.GroupResource{}, "")). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockClient.EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(nil). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockQnController.EXPECT().Reconcile(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) err := r.ReconcileDeployments(ctx, *mcDefault.DeepCopy()) @@ -97,11 +97,11 @@ func TestClusterReconciler_ReconcileDeployments_CreateIfNotFound(t *testing.T) { mockClient.EXPECT(). Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(k8sErrors.NewNotFound(schema.GroupResource{}, "")). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockClient.EXPECT(). Create(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). Return(nil). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockQnController.EXPECT().Reconcile(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) err := r.ReconcileDeployments(ctx, mc) @@ -130,6 +130,7 @@ func TestClusterReconciler_ReconcileDeployments_Existed(t *testing.T) { CheckComponentHasTerminatingPod = bak }() t.Run("call client.Update if changed", func(t *testing.T) { + defer env.Ctrl.Finish() mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).Return(nil) mockClient.EXPECT(). Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). @@ -138,10 +139,10 @@ func TestClusterReconciler_ReconcileDeployments_Existed(t *testing.T) { cm.Namespace = "ns" cm.Name = "mc" return nil - }).Times(len(MilvusComponents) - 1) + }).Times(len(MixtureComponents) - 1) mockClient.EXPECT(). Update(gomock.Any(), gomock.Any()).Return(nil). - Times(len(MilvusComponents) - 1) + Times(len(MixtureComponents) - 1) mockQnController.EXPECT().Reconcile(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) err := r.ReconcileDeployments(ctx, m) @@ -149,6 +150,7 @@ func TestClusterReconciler_ReconcileDeployments_Existed(t *testing.T) { }) t.Run("not call client.Update if configmap not changed", func(t *testing.T) { + defer env.Ctrl.Finish() mockClient.EXPECT().List(gomock.Any(), gomock.AssignableToTypeOf(&appsv1.DeploymentList{}), gomock.Any()).Return(nil) mockClient.EXPECT(). Get(gomock.Any(), gomock.Any(), gomock.AssignableToTypeOf(&appsv1.Deployment{})). @@ -159,14 +161,8 @@ func TestClusterReconciler_ReconcileDeployments_Existed(t *testing.T) { switch key.Name { case "mc-milvus-proxy": r.updateDeployment(ctx, m, cm, Proxy) - case "mc-milvus-rootcoord": - r.updateDeployment(ctx, m, cm, RootCoord) - case "mc-milvus-datacoord": - r.updateDeployment(ctx, m, cm, DataCoord) - case "mc-milvus-querycoord": - r.updateDeployment(ctx, m, cm, QueryCoord) - case "mc-milvus-indexcoord": - r.updateDeployment(ctx, m, cm, IndexCoord) + case "mc-milvus-mixcoord": + r.updateDeployment(ctx, m, cm, MixCoord) case "mc-milvus-datanode": r.updateDeployment(ctx, m, cm, DataNode) case "mc-milvus-querynode": @@ -177,7 +173,7 @@ func TestClusterReconciler_ReconcileDeployments_Existed(t *testing.T) { r.updateDeployment(ctx, m, cm, MilvusStandalone) } return nil - }).Times(len(MilvusComponents) - 1) + }).Times(len(MixtureComponents) - 1) mockQnController.EXPECT().Reconcile(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) diff --git a/pkg/controllers/milvusupgrade_fsm_test.go b/pkg/controllers/milvusupgrade_fsm_test.go index c2e79a7b..fe347c2a 100644 --- a/pkg/controllers/milvusupgrade_fsm_test.go +++ b/pkg/controllers/milvusupgrade_fsm_test.go @@ -640,6 +640,7 @@ func Test_recordOldInfo_stopMiluvs(t *testing.T) { upgrade.Namespace = "ns" milvus := &v1beta1.Milvus{} milvus.Spec.Mode = "cluster" + milvus.Spec.Com.RootCoord = &v1beta1.MilvusRootCoord{} milvus.Default() recordOldInfo(ctx, mockClient, upgrade, milvus) diff --git a/pkg/controllers/status_cluster_test.go b/pkg/controllers/status_cluster_test.go index ff5ae80e..8e55e7d2 100644 --- a/pkg/controllers/status_cluster_test.go +++ b/pkg/controllers/status_cluster_test.go @@ -379,15 +379,17 @@ func TestStatusSyncer_UpdateReplicas(t *testing.T) { mockCli := NewMockK8sClient(ctrl) ctx := context.Background() - m := &v1beta1.Milvus{} - m.Spec.Mode = v1beta1.MilvusModeCluster - m.Default() + mc := &v1beta1.Milvus{} + mc.Spec.Mode = v1beta1.MilvusModeCluster + mc.Default() s := new(replicaUpdaterImpl) t.Run("all ok", func(t *testing.T) { + defer ctrl.Finish() + m := mc.DeepCopy() mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Do(func(_, _, deploy interface{}, opts ...any) { deploy.(*appsv1.Deployment).Status.UpdatedReplicas = 2 - }).Return(nil).Times(len(MilvusComponents)) + }).Return(nil).Times(len(MixtureComponents)) err := s.UpdateReplicas(ctx, m, mockCli) assert.NoError(t, err) assert.Equal(t, 2, m.Status.DeprecatedReplicas.Proxy) @@ -395,13 +397,16 @@ func TestStatusSyncer_UpdateReplicas(t *testing.T) { }) t.Run("components not found ok", func(t *testing.T) { - mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(kerrors.NewNotFound(schema.GroupResource{}, "")).Times(len(MilvusComponents)) + defer ctrl.Finish() + m := mc.DeepCopy() + mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(kerrors.NewNotFound(schema.GroupResource{}, "")).Times(len(MixtureComponents)) err := s.UpdateReplicas(ctx, m, mockCli) assert.NoError(t, err) assert.Equal(t, 0, m.Status.DeprecatedReplicas.Proxy) assert.Equal(t, 0, m.Status.DeprecatedReplicas.DataNode) }) t.Run("get deploy err", func(t *testing.T) { + m := mc.DeepCopy() mockCli.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(kerrors.NewServiceUnavailable("")).Times(1) err := s.UpdateReplicas(ctx, m, mockCli) assert.Error(t, err)