Skip to content

Commit

Permalink
Modify incoming batch query to select only running workflows for cert…
Browse files Browse the repository at this point in the history
…ain batch operations (#7062)

## What changed?
<!-- Describe what has changed in this PR -->
Add "Status='Running'" to the query for certain batch operation types.
Note - if query is empty or already contains 'Status' cause - it will
not be added.


## Why?
<!-- Tell your future self why have you made these changes -->
There is no need to run those operations on not-running workflows, but
omitting this cause can significantly increase run time.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
add unit test

## Potential risks
N/A

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
May be...


## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No
  • Loading branch information
ychebotarev authored Jan 9, 2025
1 parent 1b4c7f4 commit 66aa23b
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 9 deletions.
30 changes: 23 additions & 7 deletions service/worker/batcher/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ import (
)

const (
pageSize = 1000
pageSize = 1000
statusRunningQueryFilter = "ExecutionStatus='Running'"
)

var (
Expand Down Expand Up @@ -108,11 +109,13 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
}
}

adjustedQuery := a.adjustQuery(batchParams)

if startOver {
estimateCount := int64(len(batchParams.Executions))
if len(batchParams.Query) > 0 {
if len(adjustedQuery) > 0 {
resp, err := sdkClient.CountWorkflow(ctx, &workflowservice.CountWorkflowExecutionsRequest{
Query: batchParams.Query,
Query: adjustedQuery,
})
if err != nil {
metrics.BatcherOperationFailures.With(metricsHandler).Record(1)
Expand All @@ -136,11 +139,11 @@ func (a *activities) BatchActivity(ctx context.Context, batchParams BatchParams)
for {
executions := batchParams.Executions
pageToken := hbd.PageToken
if len(batchParams.Query) > 0 {
if len(adjustedQuery) > 0 {
resp, err := sdkClient.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{
PageSize: int32(pageSize),
NextPageToken: pageToken,
Query: batchParams.Query,
Query: adjustedQuery,
})
if err != nil {
metrics.BatcherOperationFailures.With(metricsHandler).Record(1)
Expand Down Expand Up @@ -212,6 +215,20 @@ func (a *activities) getActivityLogger(ctx context.Context) log.Logger {
)
}

func (a *activities) adjustQuery(batchParams BatchParams) string {
if len(batchParams.Query) == 0 {
// don't add anything if query is empty
return batchParams.Query
}

switch batchParams.BatchType {
case BatchTypeTerminate, BatchTypeSignal, BatchTypeCancel, BatchTypeUpdateOptions:
return fmt.Sprintf("(%s) AND (%s)", batchParams.Query, statusRunningQueryFilter)
default:
return batchParams.Query
}
}

func (a *activities) getOperationRPS(requestedRPS float64) float64 {
maxRPS := float64(a.rps(a.namespace.String()))
if requestedRPS <= 0 || requestedRPS > maxRPS {
Expand Down Expand Up @@ -426,7 +443,7 @@ func getResetEventIDByOptions(
case *commonpb.ResetOptions_WorkflowTaskId:
return target.WorkflowTaskId, nil
case *commonpb.ResetOptions_BuildId:
return getResetPoint(ctx, namespaceStr, workflowExecution, frontendClient, logger, target.BuildId, resetOptions.CurrentRunOnly)
return getResetPoint(ctx, namespaceStr, workflowExecution, frontendClient, target.BuildId, resetOptions.CurrentRunOnly)
default:
errorMsg := fmt.Sprintf("provided reset target (%+v) is not supported.", resetOptions.Target)
return 0, serviceerror.NewInvalidArgument(errorMsg)
Expand Down Expand Up @@ -519,7 +536,6 @@ func getResetPoint(
namespaceStr string,
execution *commonpb.WorkflowExecution,
frontendClient workflowservice.WorkflowServiceClient,
logger log.Logger,
buildId string,
currentRunOnly bool,
) (workflowTaskEventID int64, err error) {
Expand Down
64 changes: 62 additions & 2 deletions service/worker/batcher/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package batcher

import (
"context"
"fmt"
"slices"
"testing"
"time"
Expand Down Expand Up @@ -194,7 +195,6 @@ func (s *activitiesSuite) TestGetFirstWorkflowTaskEventID() {

func (s *activitiesSuite) TestGetResetPoint() {
ctx := context.Background()
logger := log.NewTestLogger()
ns := "namespacename"
tests := []struct {
name string
Expand Down Expand Up @@ -303,7 +303,7 @@ func (s *activitiesSuite) TestGetResetPoint() {
WorkflowId: "wfid",
RunId: "run1",
}
id, err := getResetPoint(ctx, ns, execution, s.mockFrontendClient, logger, tt.buildId, tt.currentRunOnly)
id, err := getResetPoint(ctx, ns, execution, s.mockFrontendClient, tt.buildId, tt.currentRunOnly)
s.Equal(tt.wantErr, err != nil)
s.Equal(tt.wantWorkflowTaskEventID, id)
if tt.wantSetRunId != "" {
Expand All @@ -312,3 +312,63 @@ func (s *activitiesSuite) TestGetResetPoint() {
})
}
}

func (s *activitiesSuite) TestAdjustQuery() {
tests := []struct {
name string
query string
expectedResult string
batchType string
}{
{
name: "Empty query",
query: "",
expectedResult: "",
batchType: BatchTypeTerminate,
},
{
name: "Acceptance",
query: "A=B",
expectedResult: fmt.Sprintf("(A=B) AND (%s)", statusRunningQueryFilter),
batchType: BatchTypeTerminate,
},
{
name: "Acceptance with parenthesis",
query: "(A=B)",
expectedResult: fmt.Sprintf("((A=B)) AND (%s)", statusRunningQueryFilter),
batchType: BatchTypeTerminate,
},
{
name: "Acceptance with multiple conditions",
query: "(A=B) OR C=D",
expectedResult: fmt.Sprintf("((A=B) OR C=D) AND (%s)", statusRunningQueryFilter),
batchType: BatchTypeTerminate,
},
{
name: "Contains status - 1",
query: "ExecutionStatus=Completed",
expectedResult: fmt.Sprintf("(ExecutionStatus=Completed) AND (%s)", statusRunningQueryFilter),
batchType: BatchTypeTerminate,
},
{
name: "Contains status - 2",
query: "A=B OR ExecutionStatus='Completed'",
expectedResult: fmt.Sprintf("(A=B OR ExecutionStatus='Completed') AND (%s)", statusRunningQueryFilter),
batchType: BatchTypeTerminate,
},
{
name: "Not supported batch type",
query: "A=B",
expectedResult: "A=B",
batchType: "NotSupported",
},
}
for _, testRun := range tests {
s.Run(testRun.name, func() {
a := activities{}
batchParams := BatchParams{Query: testRun.query, BatchType: testRun.batchType}
adjustedQuery := a.adjustQuery(batchParams)
s.Equal(testRun.expectedResult, adjustedQuery)
})
}
}

0 comments on commit 66aa23b

Please sign in to comment.