From 9b20b4e405443efe2ac464052574684ea9f5472f Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Fri, 10 Jan 2025 14:16:27 -0800 Subject: [PATCH] Delete Namespace: fix validation of replicated namespace (#7069) ## What changed? Delete Namespace: fix validation of replicated namespace. ## Why? Namespace should be deletable when: 1. It is active in the current cluster (even if replicated), 2. Doesn't belong to current cluster (current cluster was removed from cluster list). ## How did you test it? Added new tests, existing tests, manual run. ## Potential risks No risks. ## Documentation No. ## Is hotfix candidate? No. --- service/worker/deletenamespace/activities.go | 22 +++-- service/worker/deletenamespace/fx.go | 5 ++ service/worker/deletenamespace/workflow.go | 30 ++++--- .../worker/deletenamespace/workflow_test.go | 84 ++++++++++++++++--- tests/xdc/failover_test.go | 8 +- 5 files changed, 111 insertions(+), 38 deletions(-) diff --git a/service/worker/deletenamespace/activities.go b/service/worker/deletenamespace/activities.go index 28c5d696c86..308a36b1c4f 100644 --- a/service/worker/deletenamespace/activities.go +++ b/service/worker/deletenamespace/activities.go @@ -33,6 +33,7 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" @@ -45,6 +46,7 @@ import ( type ( localActivities struct { metadataManager persistence.MetadataManager + clusterMetadata cluster.Metadata nexusEndpointManager persistence.NexusEndpointManager logger log.Logger @@ -54,14 +56,17 @@ type ( } getNamespaceInfoResult struct { - NamespaceID namespace.ID - Namespace namespace.Name - Clusters []string + NamespaceID namespace.ID + Namespace namespace.Name + Clusters []string + ActiveCluster string + CurrentCluster string } ) func newLocalActivities( metadataManager persistence.MetadataManager, + clusterMetadata cluster.Metadata, nexusEndpointManager persistence.NexusEndpointManager, logger log.Logger, protectedNamespaces dynamicconfig.TypedPropertyFn[[]string], @@ -70,6 +75,7 @@ func newLocalActivities( ) *localActivities { return &localActivities{ metadataManager: metadataManager, + clusterMetadata: clusterMetadata, nexusEndpointManager: nexusEndpointManager, logger: logger, protectedNamespaces: protectedNamespaces, @@ -104,9 +110,13 @@ func (a *localActivities) GetNamespaceInfoActivity(ctx context.Context, nsID nam } return getNamespaceInfoResult{ - NamespaceID: namespace.ID(getNamespaceResponse.Namespace.Info.Id), - Namespace: namespace.Name(getNamespaceResponse.Namespace.Info.Name), - Clusters: getNamespaceResponse.Namespace.ReplicationConfig.Clusters, + NamespaceID: namespace.ID(getNamespaceResponse.Namespace.Info.Id), + Namespace: namespace.Name(getNamespaceResponse.Namespace.Info.Name), + Clusters: getNamespaceResponse.Namespace.ReplicationConfig.Clusters, + ActiveCluster: getNamespaceResponse.Namespace.ReplicationConfig.ActiveClusterName, + // CurrentCluster is not technically a "namespace info", but since all cluster data is here, + // it is convenient to have the current cluster name here too. + CurrentCluster: a.clusterMetadata.GetCurrentClusterName(), }, nil } diff --git a/service/worker/deletenamespace/fx.go b/service/worker/deletenamespace/fx.go index 7d39075701c..0295fb2ca40 100644 --- a/service/worker/deletenamespace/fx.go +++ b/service/worker/deletenamespace/fx.go @@ -29,6 +29,7 @@ import ( sdkworker "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/cluster" "go.temporal.io/server/common/dynamicconfig" "go.temporal.io/server/common/headers" "go.temporal.io/server/common/log" @@ -49,6 +50,7 @@ type ( atWorkerCfg sdkworker.Options visibilityManager manager.VisibilityManager metadataManager persistence.MetadataManager + clusterMetadata cluster.Metadata nexusEndpointManager persistence.NexusEndpointManager historyClient resource.HistoryClient metricsHandler metrics.Handler @@ -63,6 +65,7 @@ type ( DynamicCollection *dynamicconfig.Collection VisibilityManager manager.VisibilityManager MetadataManager persistence.MetadataManager + ClusterMetadata cluster.Metadata NexusEndpointManager persistence.NexusEndpointManager HistoryClient resource.HistoryClient MetricsHandler metrics.Handler @@ -79,6 +82,7 @@ func newComponent( atWorkerCfg: dynamicconfig.WorkerDeleteNamespaceActivityLimits.Get(params.DynamicCollection)(), visibilityManager: params.VisibilityManager, metadataManager: params.MetadataManager, + clusterMetadata: params.ClusterMetadata, nexusEndpointManager: params.NexusEndpointManager, historyClient: params.HistoryClient, metricsHandler: params.MetricsHandler, @@ -126,6 +130,7 @@ func (wc *deleteNamespaceComponent) DedicatedActivityWorkerOptions() *workercomm func (wc *deleteNamespaceComponent) deleteNamespaceLocalActivities() *localActivities { return newLocalActivities( wc.metadataManager, + wc.clusterMetadata, wc.nexusEndpointManager, wc.logger, wc.protectedNamespaces, diff --git a/service/worker/deletenamespace/workflow.go b/service/worker/deletenamespace/workflow.go index 001c6078231..113d1188f61 100644 --- a/service/worker/deletenamespace/workflow.go +++ b/service/worker/deletenamespace/workflow.go @@ -26,7 +26,7 @@ package deletenamespace import ( "fmt" - "strings" + "slices" "time" enumspb "go.temporal.io/api/enums/v1" @@ -104,20 +104,17 @@ func validateParams(params *DeleteNamespaceWorkflowParams) error { return nil } -func validateNamespace(ctx workflow.Context, nsID namespace.ID, nsName namespace.Name, nsClusters []string) error { +func validateNamespace(ctx workflow.Context, nsInfo getNamespaceInfoResult) error { - if nsName == primitives.SystemLocalNamespace || nsID == primitives.SystemNamespaceID { + if nsInfo.Namespace == primitives.SystemLocalNamespace || nsInfo.NamespaceID == primitives.SystemNamespaceID { return errors.NewFailedPrecondition("unable to delete system namespace", nil) } - // Disable namespace deletion if namespace is replicate because: - // - If namespace is passive in the current cluster, then WF executions will keep coming from - // the active cluster and namespace will never be deleted (ReclaimResourcesWorkflow will fail). - // - If namespace is active in the current cluster, then it technically can be deleted (and - // in this case it will be deleted from this cluster only because delete operation is not replicated), - // but this is confusing for the users, as they might expect that namespace is deleted from all clusters. - if len(nsClusters) > 1 { - return errors.NewFailedPrecondition(fmt.Sprintf("namespace %s is replicated in several clusters [%s]: remove all other cluster and retry", nsName, strings.Join(nsClusters, ",")), nil) + // Prevent namespace deletion if namespace is passive in the current cluster, + // because then WF executions will keep coming from the active cluster and + // namespace will never be deleted (ReclaimResourcesWorkflow will fail). + if slices.Contains(nsInfo.Clusters, nsInfo.CurrentCluster) && nsInfo.ActiveCluster != nsInfo.CurrentCluster { + return errors.NewFailedPrecondition(fmt.Sprintf("namespace %[1]s is passive in current cluster %[2]s: remove cluster %[2]s from cluster list or make namespace active in this cluster and retry", nsInfo.Namespace, nsInfo.CurrentCluster), nil) } // NOTE: there is very little chance that another cluster is added after the check above, @@ -126,13 +123,13 @@ func validateNamespace(ctx workflow.Context, nsID namespace.ID, nsName namespace var la *localActivities ctx1 := workflow.WithLocalActivityOptions(ctx, localActivityOptions) - err := workflow.ExecuteLocalActivity(ctx1, la.ValidateProtectedNamespacesActivity, nsName).Get(ctx, nil) + err := workflow.ExecuteLocalActivity(ctx1, la.ValidateProtectedNamespacesActivity, nsInfo.Namespace).Get(ctx, nil) if err != nil { return err } ctx2 := workflow.WithLocalActivityOptions(ctx, localActivityOptions) - err = workflow.ExecuteLocalActivity(ctx2, la.ValidateNexusEndpointsActivity, nsID, nsName).Get(ctx, nil) + err = workflow.ExecuteLocalActivity(ctx2, la.ValidateNexusEndpointsActivity, nsInfo.NamespaceID, nsInfo.Namespace).Get(ctx, nil) if err != nil { return err } @@ -166,14 +163,15 @@ func DeleteNamespaceWorkflow(ctx workflow.Context, params DeleteNamespaceWorkflo if err != nil { return result, err } - params.Namespace = namespaceInfo.Namespace - params.NamespaceID = namespaceInfo.NamespaceID // Step 1.1. Validate namespace. - if err = validateNamespace(ctx, params.NamespaceID, params.Namespace, namespaceInfo.Clusters); err != nil { + if err = validateNamespace(ctx, namespaceInfo); err != nil { return result, err } + params.Namespace = namespaceInfo.Namespace + params.NamespaceID = namespaceInfo.NamespaceID + // Step 2. Mark namespace as deleted. ctx2 := workflow.WithLocalActivityOptions(ctx, localActivityOptions) err = workflow.ExecuteLocalActivity(ctx2, la.MarkNamespaceDeletedActivity, params.Namespace).Get(ctx, nil) diff --git a/service/worker/deletenamespace/workflow_test.go b/service/worker/deletenamespace/workflow_test.go index 09298f2099e..bc4aff07f0a 100644 --- a/service/worker/deletenamespace/workflow_test.go +++ b/service/worker/deletenamespace/workflow_test.go @@ -150,24 +150,84 @@ func Test_DeleteNamespaceWorkflow_ByNameAndID(t *testing.T) { func Test_DeleteReplicatedNamespace(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} - env := testSuite.NewTestWorkflowEnvironment() var la *localActivities - env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return( - getNamespaceInfoResult{ + t.Run("namespace that active in the current cluster should be deleted", func(t *testing.T) { + env := testSuite.NewTestWorkflowEnvironment() + env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return( + getNamespaceInfoResult{ + NamespaceID: "namespace-id", + Namespace: "namespace", + Clusters: []string{"active", "passive"}, + ActiveCluster: "active", + CurrentCluster: "active", + }, nil).Once() + + env.OnActivity(la.ValidateProtectedNamespacesActivity, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.ValidateNexusEndpointsActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.MarkNamespaceDeletedActivity, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.GenerateDeletedNamespaceNameActivity, mock.Anything, mock.Anything, mock.Anything).Return(namespace.EmptyName, nil).Once() + env.OnActivity(la.RenameNamespaceActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + env.RegisterWorkflow(reclaimresources.ReclaimResourcesWorkflow) + env.OnWorkflow(reclaimresources.ReclaimResourcesWorkflow, mock.Anything, mock.Anything).Return(reclaimresources.ReclaimResourcesResult{}, nil).Once() + + env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{ NamespaceID: "namespace-id", - Namespace: "namespace", - Clusters: []string{"active", "passive"}, - }, nil).Once() + }) - env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{ - NamespaceID: "namespace-id", + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.NoError(t, err) }) - require.True(t, env.IsWorkflowCompleted()) - err := env.GetWorkflowError() - require.Error(t, err) - require.Contains(t, err.Error(), "namespace namespace is replicated in several clusters [active,passive]: remove all other cluster and retry") + t.Run("namespace that passive in the current cluster should NOT be deleted", func(t *testing.T) { + env := testSuite.NewTestWorkflowEnvironment() + env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return( + getNamespaceInfoResult{ + NamespaceID: "namespace-id", + Namespace: "namespace", + Clusters: []string{"active", "passive"}, + ActiveCluster: "active", + CurrentCluster: "passive", + }, nil).Once() + + env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{ + NamespaceID: "namespace-id", + }) + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.Error(t, err) + require.Contains(t, err.Error(), "namespace namespace is passive in current cluster passive: remove cluster passive from cluster list or make namespace active in this cluster and retry") + }) + + t.Run("namespace that doesn't have current cluster in the cluster list should be deleted", func(t *testing.T) { + env := testSuite.NewTestWorkflowEnvironment() + env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return( + getNamespaceInfoResult{ + NamespaceID: "namespace-id", + Namespace: "namespace", + Clusters: []string{"active", "passive"}, + ActiveCluster: "active", + CurrentCluster: "another-cluster", + }, nil).Once() + + env.OnActivity(la.ValidateProtectedNamespacesActivity, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.ValidateNexusEndpointsActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.MarkNamespaceDeletedActivity, mock.Anything, mock.Anything).Return(nil).Once() + env.OnActivity(la.GenerateDeletedNamespaceNameActivity, mock.Anything, mock.Anything, mock.Anything).Return(namespace.EmptyName, nil).Once() + env.OnActivity(la.RenameNamespaceActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once() + env.RegisterWorkflow(reclaimresources.ReclaimResourcesWorkflow) + env.OnWorkflow(reclaimresources.ReclaimResourcesWorkflow, mock.Anything, mock.Anything).Return(reclaimresources.ReclaimResourcesResult{}, nil).Once() + + env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{ + NamespaceID: "namespace-id", + }) + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.NoError(t, err) + }) } func Test_DeleteSystemNamespace(t *testing.T) { diff --git a/tests/xdc/failover_test.go b/tests/xdc/failover_test.go index 2a29d5c59e8..be12250d3e2 100644 --- a/tests/xdc/failover_test.go +++ b/tests/xdc/failover_test.go @@ -2923,14 +2923,14 @@ func (s *FunctionalClustersTestSuite) TestForceMigration_ResetWorkflow() { verifyHistory(workflowID, resp.GetRunId()) } -func (s *FunctionalClustersTestSuite) TestDeleteNamespace() { +func (s *FunctionalClustersTestSuite) TestBlockNamespaceDeleteInPassiveCluster() { namespace := "test-namespace-for-fail-over-" + common.GenerateRandomString(5) client1 := s.cluster1.FrontendClient() // active regReq := &workflowservice.RegisterNamespaceRequest{ Namespace: namespace, IsGlobalNamespace: true, Clusters: s.clusterReplicationConfig(), - ActiveClusterName: s.clusterNames[0], + ActiveClusterName: s.clusterNames[1], // [0] is active, [1] is passive WorkflowExecutionRetentionPeriod: durationpb.New(7 * time.Hour * 24), } _, err := client1.RegisterNamespace(testcore.NewContext(), regReq) @@ -2945,8 +2945,8 @@ func (s *FunctionalClustersTestSuite) TestDeleteNamespace() { }) s.Error(err) s.Nil(resp) - s.Contains(err.Error(), "is replicated in several clusters") - s.Contains(err.Error(), "remove all other cluster and retry") + s.Contains(err.Error(), "is passive in current cluster") + s.Contains(err.Error(), "make namespace active in this cluster and retry") } func (s *FunctionalClustersTestSuite) getHistory(client workflowservice.WorkflowServiceClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {