diff --git a/internal/query_builder.go b/internal/query_builder.go index c53cc4e9b..9519a0c32 100644 --- a/internal/query_builder.go +++ b/internal/query_builder.go @@ -71,10 +71,11 @@ var ( type ( // QueryBuilder builds visibility query. It's shadower's own Query builders that processes the shadow filter - // options into a query to pull t he required workflows. + // options into a query to pull the required workflows. QueryBuilder interface { WorkflowTypes([]string) QueryBuilder + ExcludeWorkflowTypes([]string) QueryBuilder WorkflowStatus([]WorkflowStatus) QueryBuilder StartTime(time.Time, time.Time) QueryBuilder CloseTime(time.Time, time.Time) QueryBuilder @@ -100,6 +101,18 @@ func (q *queryBuilderImpl) WorkflowTypes(types []string) QueryBuilder { return q } +func (q *queryBuilderImpl) ExcludeWorkflowTypes(types []string) QueryBuilder { + if len(types) == 0 { + return q + } + excludeTypeQueries := make([]string, 0, len(types)) + for _, workflowType := range types { + excludeTypeQueries = append(excludeTypeQueries, fmt.Sprintf(keyWorkflowType+` != "%v"`, workflowType)) + } + q.appendPartialQuery(strings.Join(excludeTypeQueries, " and ")) + return q +} + func (q *queryBuilderImpl) WorkflowStatus(statuses []WorkflowStatus) QueryBuilder { workflowStatusQueries := make([]string, 0, len(statuses)) for _, status := range statuses { diff --git a/internal/query_builder_test.go b/internal/query_builder_test.go index 9632ba382..1b005249c 100644 --- a/internal/query_builder_test.go +++ b/internal/query_builder_test.go @@ -153,6 +153,38 @@ func (s *queryBuilderSuite) TestStartTimeQuery() { } } +func (s *queryBuilderSuite) TestExcludeWorkflowTypesQuery() { + testCases := []struct { + msg string + excludeTypes []string + expectedQuery string + }{ + { + msg: "empty excludeTypes", + excludeTypes: nil, + expectedQuery: "", + }, + { + msg: "single excludeType", + excludeTypes: []string{"excludedWorkflowType"}, + expectedQuery: `(WorkflowType != "excludedWorkflowType")`, + }, + { + msg: "multiple excludeTypes", + excludeTypes: []string{"excludedWorkflowType1", "excludedWorkflowType2"}, + expectedQuery: `(WorkflowType != "excludedWorkflowType1" and WorkflowType != "excludedWorkflowType2")`, + }, + } + + for _, test := range testCases { + s.T().Run(test.msg, func(t *testing.T) { + builder := NewQueryBuilder() + builder.ExcludeWorkflowTypes(test.excludeTypes) + s.Equal(test.expectedQuery, builder.Build()) + }) + } +} + func (s *queryBuilderSuite) TestMultipleFilters() { maxStartTime := time.Now() minStartTime := maxStartTime.Add(-time.Hour) diff --git a/internal/workflow_shadower.go b/internal/workflow_shadower.go index 4ae17ff09..9c2257bc4 100644 --- a/internal/workflow_shadower.go +++ b/internal/workflow_shadower.go @@ -62,6 +62,11 @@ type ( // default: empty list, which matches all workflow types WorkflowTypes []string + // Optional: A list of workflow type names that need to be excluded in the query. + // The list will be used to construct WorkflowQuery.The listed workflow types will be excluded from replay. + // default: empty list, which matches all workflow types + ExcludeTypes []string + // Optional: A list of workflow status. // The list will be used to construct WorkflowQuery. Only workflows with status listed will be replayed. // accepted values (case-insensitive): OPEN, CLOSED, ALL, COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT @@ -310,7 +315,15 @@ func (o *ShadowOptions) validateAndPopulateFields() error { } if len(o.WorkflowQuery) == 0 { - queryBuilder := NewQueryBuilder().WorkflowTypes(o.WorkflowTypes) + queryBuilder := NewQueryBuilder() + + if len(o.WorkflowTypes) > 0 { + queryBuilder.WorkflowTypes(o.WorkflowTypes) + } + + if len(o.ExcludeTypes) > 0 { + queryBuilder.ExcludeWorkflowTypes(o.ExcludeTypes) + } statuses := make([]WorkflowStatus, 0, len(o.WorkflowStatus)) for _, statusString := range o.WorkflowStatus { diff --git a/internal/workflow_shadower_test.go b/internal/workflow_shadower_test.go index 865fd9a4b..849ad9e2b 100644 --- a/internal/workflow_shadower_test.go +++ b/internal/workflow_shadower_test.go @@ -21,6 +21,8 @@ package internal import ( + "context" + "fmt" "sync" "testing" "time" @@ -243,6 +245,30 @@ func (s *workflowShadowerSuite) TestShadowOptionsValidation() { } } +func (s *workflowShadowerSuite) TestShadowOptionsWithExcludeTypes() { + excludeTypes := []string{"excludedType1", "excludedType2"} + options := ShadowOptions{ + WorkflowTypes: []string{"includedType1", "includedType2"}, + ExcludeTypes: excludeTypes, + Mode: ShadowModeNormal, + } + expectedQuery := fmt.Sprintf( + `(WorkflowType = "includedType1" or WorkflowType = "includedType2") and (WorkflowType != "excludedType1" and WorkflowType != "excludedType2") and (CloseTime = missing)`, + ) + shadower, err := NewWorkflowShadower(s.mockService, "testDomain", options, ReplayOptions{}, nil) + s.NoError(err) + s.mockService.EXPECT(). + ScanWorkflowExecutions(gomock.Any(), gomock.Any(), gomock.Any()). + DoAndReturn(func(ctx context.Context, request *shared.ListWorkflowExecutionsRequest, opts ...interface{}) (*shared.ListWorkflowExecutionsResponse, error) { + s.Equal(expectedQuery, *request.Query) + return &shared.ListWorkflowExecutionsResponse{ + Executions: nil, + NextPageToken: nil, + }, nil + }).Times(1) + s.NoError(shadower.shadowWorker()) +} + func (s *workflowShadowerSuite) TestShadowWorkerExitCondition_ExpirationTime() { totalWorkflows := 50 timePerWorkflow := 7 * time.Second