Skip to content

Commit

Permalink
Delete Namespace: fix validation of replicated namespace (#7069)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Delete Namespace: fix validation of replicated namespace.

## Why?
<!-- Tell your future self why have you made these changes -->
Namespace should be deletable when:
1. It is active in the current cluster (even if replicated),
2. Doesn't belong to current cluster (current cluster was removed from
cluster list).

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Added new tests, existing tests, manual run.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
No risks.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Jan 10, 2025
1 parent c49995f commit 35f9654
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 38 deletions.
22 changes: 16 additions & 6 deletions service/worker/deletenamespace/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand All @@ -45,6 +46,7 @@ import (
type (
localActivities struct {
metadataManager persistence.MetadataManager
clusterMetadata cluster.Metadata
nexusEndpointManager persistence.NexusEndpointManager
logger log.Logger

Expand All @@ -54,14 +56,17 @@ type (
}

getNamespaceInfoResult struct {
NamespaceID namespace.ID
Namespace namespace.Name
Clusters []string
NamespaceID namespace.ID
Namespace namespace.Name
Clusters []string
ActiveCluster string
CurrentCluster string
}
)

func newLocalActivities(
metadataManager persistence.MetadataManager,
clusterMetadata cluster.Metadata,
nexusEndpointManager persistence.NexusEndpointManager,
logger log.Logger,
protectedNamespaces dynamicconfig.TypedPropertyFn[[]string],
Expand All @@ -70,6 +75,7 @@ func newLocalActivities(
) *localActivities {
return &localActivities{
metadataManager: metadataManager,
clusterMetadata: clusterMetadata,
nexusEndpointManager: nexusEndpointManager,
logger: logger,
protectedNamespaces: protectedNamespaces,
Expand Down Expand Up @@ -104,9 +110,13 @@ func (a *localActivities) GetNamespaceInfoActivity(ctx context.Context, nsID nam
}

return getNamespaceInfoResult{
NamespaceID: namespace.ID(getNamespaceResponse.Namespace.Info.Id),
Namespace: namespace.Name(getNamespaceResponse.Namespace.Info.Name),
Clusters: getNamespaceResponse.Namespace.ReplicationConfig.Clusters,
NamespaceID: namespace.ID(getNamespaceResponse.Namespace.Info.Id),
Namespace: namespace.Name(getNamespaceResponse.Namespace.Info.Name),
Clusters: getNamespaceResponse.Namespace.ReplicationConfig.Clusters,
ActiveCluster: getNamespaceResponse.Namespace.ReplicationConfig.ActiveClusterName,
// CurrentCluster is not technically a "namespace info", but since all cluster data is here,
// it is convenient to have the current cluster name here too.
CurrentCluster: a.clusterMetadata.GetCurrentClusterName(),
}, nil
}

Expand Down
5 changes: 5 additions & 0 deletions service/worker/deletenamespace/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

sdkworker "go.temporal.io/sdk/worker"
"go.temporal.io/sdk/workflow"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/log"
Expand All @@ -49,6 +50,7 @@ type (
atWorkerCfg sdkworker.Options
visibilityManager manager.VisibilityManager
metadataManager persistence.MetadataManager
clusterMetadata cluster.Metadata
nexusEndpointManager persistence.NexusEndpointManager
historyClient resource.HistoryClient
metricsHandler metrics.Handler
Expand All @@ -63,6 +65,7 @@ type (
DynamicCollection *dynamicconfig.Collection
VisibilityManager manager.VisibilityManager
MetadataManager persistence.MetadataManager
ClusterMetadata cluster.Metadata
NexusEndpointManager persistence.NexusEndpointManager
HistoryClient resource.HistoryClient
MetricsHandler metrics.Handler
Expand All @@ -79,6 +82,7 @@ func newComponent(
atWorkerCfg: dynamicconfig.WorkerDeleteNamespaceActivityLimits.Get(params.DynamicCollection)(),
visibilityManager: params.VisibilityManager,
metadataManager: params.MetadataManager,
clusterMetadata: params.ClusterMetadata,
nexusEndpointManager: params.NexusEndpointManager,
historyClient: params.HistoryClient,
metricsHandler: params.MetricsHandler,
Expand Down Expand Up @@ -126,6 +130,7 @@ func (wc *deleteNamespaceComponent) DedicatedActivityWorkerOptions() *workercomm
func (wc *deleteNamespaceComponent) deleteNamespaceLocalActivities() *localActivities {
return newLocalActivities(
wc.metadataManager,
wc.clusterMetadata,
wc.nexusEndpointManager,
wc.logger,
wc.protectedNamespaces,
Expand Down
30 changes: 14 additions & 16 deletions service/worker/deletenamespace/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package deletenamespace

import (
"fmt"
"strings"
"slices"
"time"

enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -104,20 +104,17 @@ func validateParams(params *DeleteNamespaceWorkflowParams) error {
return nil
}

func validateNamespace(ctx workflow.Context, nsID namespace.ID, nsName namespace.Name, nsClusters []string) error {
func validateNamespace(ctx workflow.Context, nsInfo getNamespaceInfoResult) error {

if nsName == primitives.SystemLocalNamespace || nsID == primitives.SystemNamespaceID {
if nsInfo.Namespace == primitives.SystemLocalNamespace || nsInfo.NamespaceID == primitives.SystemNamespaceID {
return errors.NewFailedPrecondition("unable to delete system namespace", nil)
}

// Disable namespace deletion if namespace is replicate because:
// - If namespace is passive in the current cluster, then WF executions will keep coming from
// the active cluster and namespace will never be deleted (ReclaimResourcesWorkflow will fail).
// - If namespace is active in the current cluster, then it technically can be deleted (and
// in this case it will be deleted from this cluster only because delete operation is not replicated),
// but this is confusing for the users, as they might expect that namespace is deleted from all clusters.
if len(nsClusters) > 1 {
return errors.NewFailedPrecondition(fmt.Sprintf("namespace %s is replicated in several clusters [%s]: remove all other cluster and retry", nsName, strings.Join(nsClusters, ",")), nil)
// Prevent namespace deletion if namespace is passive in the current cluster,
// because then WF executions will keep coming from the active cluster and
// namespace will never be deleted (ReclaimResourcesWorkflow will fail).
if slices.Contains(nsInfo.Clusters, nsInfo.CurrentCluster) && nsInfo.ActiveCluster != nsInfo.CurrentCluster {
return errors.NewFailedPrecondition(fmt.Sprintf("namespace %[1]s is passive in current cluster %[2]s: remove cluster %[2]s from cluster list or make namespace active in this cluster and retry", nsInfo.Namespace, nsInfo.CurrentCluster), nil)
}

// NOTE: there is very little chance that another cluster is added after the check above,
Expand All @@ -126,13 +123,13 @@ func validateNamespace(ctx workflow.Context, nsID namespace.ID, nsName namespace
var la *localActivities

ctx1 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
err := workflow.ExecuteLocalActivity(ctx1, la.ValidateProtectedNamespacesActivity, nsName).Get(ctx, nil)
err := workflow.ExecuteLocalActivity(ctx1, la.ValidateProtectedNamespacesActivity, nsInfo.Namespace).Get(ctx, nil)
if err != nil {
return err
}

ctx2 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
err = workflow.ExecuteLocalActivity(ctx2, la.ValidateNexusEndpointsActivity, nsID, nsName).Get(ctx, nil)
err = workflow.ExecuteLocalActivity(ctx2, la.ValidateNexusEndpointsActivity, nsInfo.NamespaceID, nsInfo.Namespace).Get(ctx, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -166,14 +163,15 @@ func DeleteNamespaceWorkflow(ctx workflow.Context, params DeleteNamespaceWorkflo
if err != nil {
return result, err
}
params.Namespace = namespaceInfo.Namespace
params.NamespaceID = namespaceInfo.NamespaceID

// Step 1.1. Validate namespace.
if err = validateNamespace(ctx, params.NamespaceID, params.Namespace, namespaceInfo.Clusters); err != nil {
if err = validateNamespace(ctx, namespaceInfo); err != nil {
return result, err
}

params.Namespace = namespaceInfo.Namespace
params.NamespaceID = namespaceInfo.NamespaceID

// Step 2. Mark namespace as deleted.
ctx2 := workflow.WithLocalActivityOptions(ctx, localActivityOptions)
err = workflow.ExecuteLocalActivity(ctx2, la.MarkNamespaceDeletedActivity, params.Namespace).Get(ctx, nil)
Expand Down
84 changes: 72 additions & 12 deletions service/worker/deletenamespace/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,84 @@ func Test_DeleteNamespaceWorkflow_ByNameAndID(t *testing.T) {

func Test_DeleteReplicatedNamespace(t *testing.T) {
testSuite := &testsuite.WorkflowTestSuite{}
env := testSuite.NewTestWorkflowEnvironment()
var la *localActivities

env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return(
getNamespaceInfoResult{
t.Run("namespace that active in the current cluster should be deleted", func(t *testing.T) {
env := testSuite.NewTestWorkflowEnvironment()
env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return(
getNamespaceInfoResult{
NamespaceID: "namespace-id",
Namespace: "namespace",
Clusters: []string{"active", "passive"},
ActiveCluster: "active",
CurrentCluster: "active",
}, nil).Once()

env.OnActivity(la.ValidateProtectedNamespacesActivity, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.ValidateNexusEndpointsActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.MarkNamespaceDeletedActivity, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.GenerateDeletedNamespaceNameActivity, mock.Anything, mock.Anything, mock.Anything).Return(namespace.EmptyName, nil).Once()
env.OnActivity(la.RenameNamespaceActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.RegisterWorkflow(reclaimresources.ReclaimResourcesWorkflow)
env.OnWorkflow(reclaimresources.ReclaimResourcesWorkflow, mock.Anything, mock.Anything).Return(reclaimresources.ReclaimResourcesResult{}, nil).Once()

env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{
NamespaceID: "namespace-id",
Namespace: "namespace",
Clusters: []string{"active", "passive"},
}, nil).Once()
})

env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{
NamespaceID: "namespace-id",
require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.NoError(t, err)
})

require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.Error(t, err)
require.Contains(t, err.Error(), "namespace namespace is replicated in several clusters [active,passive]: remove all other cluster and retry")
t.Run("namespace that passive in the current cluster should NOT be deleted", func(t *testing.T) {
env := testSuite.NewTestWorkflowEnvironment()
env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return(
getNamespaceInfoResult{
NamespaceID: "namespace-id",
Namespace: "namespace",
Clusters: []string{"active", "passive"},
ActiveCluster: "active",
CurrentCluster: "passive",
}, nil).Once()

env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{
NamespaceID: "namespace-id",
})

require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.Error(t, err)
require.Contains(t, err.Error(), "namespace namespace is passive in current cluster passive: remove cluster passive from cluster list or make namespace active in this cluster and retry")
})

t.Run("namespace that doesn't have current cluster in the cluster list should be deleted", func(t *testing.T) {
env := testSuite.NewTestWorkflowEnvironment()
env.OnActivity(la.GetNamespaceInfoActivity, mock.Anything, namespace.ID("namespace-id"), namespace.EmptyName).Return(
getNamespaceInfoResult{
NamespaceID: "namespace-id",
Namespace: "namespace",
Clusters: []string{"active", "passive"},
ActiveCluster: "active",
CurrentCluster: "another-cluster",
}, nil).Once()

env.OnActivity(la.ValidateProtectedNamespacesActivity, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.ValidateNexusEndpointsActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.MarkNamespaceDeletedActivity, mock.Anything, mock.Anything).Return(nil).Once()
env.OnActivity(la.GenerateDeletedNamespaceNameActivity, mock.Anything, mock.Anything, mock.Anything).Return(namespace.EmptyName, nil).Once()
env.OnActivity(la.RenameNamespaceActivity, mock.Anything, mock.Anything, mock.Anything).Return(nil).Once()
env.RegisterWorkflow(reclaimresources.ReclaimResourcesWorkflow)
env.OnWorkflow(reclaimresources.ReclaimResourcesWorkflow, mock.Anything, mock.Anything).Return(reclaimresources.ReclaimResourcesResult{}, nil).Once()

env.ExecuteWorkflow(DeleteNamespaceWorkflow, DeleteNamespaceWorkflowParams{
NamespaceID: "namespace-id",
})

require.True(t, env.IsWorkflowCompleted())
err := env.GetWorkflowError()
require.NoError(t, err)
})
}

func Test_DeleteSystemNamespace(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions tests/xdc/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2923,14 +2923,14 @@ func (s *FunctionalClustersTestSuite) TestForceMigration_ResetWorkflow() {
verifyHistory(workflowID, resp.GetRunId())
}

func (s *FunctionalClustersTestSuite) TestDeleteNamespace() {
func (s *FunctionalClustersTestSuite) TestBlockNamespaceDeleteInPassiveCluster() {
namespace := "test-namespace-for-fail-over-" + common.GenerateRandomString(5)
client1 := s.cluster1.FrontendClient() // active
regReq := &workflowservice.RegisterNamespaceRequest{
Namespace: namespace,
IsGlobalNamespace: true,
Clusters: s.clusterReplicationConfig(),
ActiveClusterName: s.clusterNames[0],
ActiveClusterName: s.clusterNames[1], // [0] is active, [1] is passive
WorkflowExecutionRetentionPeriod: durationpb.New(7 * time.Hour * 24),
}
_, err := client1.RegisterNamespace(testcore.NewContext(), regReq)
Expand All @@ -2945,8 +2945,8 @@ func (s *FunctionalClustersTestSuite) TestDeleteNamespace() {
})
s.Error(err)
s.Nil(resp)
s.Contains(err.Error(), "is replicated in several clusters")
s.Contains(err.Error(), "remove all other cluster and retry")
s.Contains(err.Error(), "is passive in current cluster")
s.Contains(err.Error(), "make namespace active in this cluster and retry")
}

func (s *FunctionalClustersTestSuite) getHistory(client workflowservice.WorkflowServiceClient, namespace string, execution *commonpb.WorkflowExecution) []*historypb.HistoryEvent {
Expand Down

0 comments on commit 35f9654

Please sign in to comment.