Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Functional Tests and Fix Bugs #6889

Merged
merged 7 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,9 @@ Deleted Redirect Rules will be kept in the DB (with DeleteTimestamp). After this
"matching.wv.ReachabilityBuildIdVisibilityGracePeriod",
3*time.Minute,
`ReachabilityBuildIdVisibilityGracePeriod is the time period for which deleted versioning rules are still considered active
to account for the delay in updating the build id field in visibility.`,
to account for the delay in updating the build id field in visibility. Not yet supported for GetDeploymentReachability. We recommend waiting
carlydf marked this conversation as resolved.
Show resolved Hide resolved
at least 2 minutes between changing the current deployment and calling GetDeployment, so that newly started workflow executions using the
recently-current deployment can arrive in visibility.`,
)
ReachabilityTaskQueueScanLimit = NewGlobalIntSetting(
"limit.reachabilityTaskQueueScan",
Expand Down
6 changes: 6 additions & 0 deletions common/rpc/interceptor/redirection.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,12 @@ var (
"DescribeBatchOperation": func() any { return &workflowservice.DescribeBatchOperationResponse{} },
"ListBatchOperations": func() any { return &workflowservice.ListBatchOperationsResponse{} },
"UpdateActivityOptionsById": func() any { return &workflowservice.UpdateActivityOptionsByIdResponse{} },

"DescribeDeployment": func() any { return &workflowservice.DescribeDeploymentResponse{} },
"ListDeployments": func() any { return &workflowservice.ListDeploymentsResponse{} },
"GetDeploymentReachability": func() any { return &workflowservice.GetDeploymentReachabilityResponse{} },
"GetCurrentDeployment": func() any { return &workflowservice.GetCurrentDeploymentResponse{} },
"SetCurrentDeployment": func() any { return &workflowservice.SetCurrentDeploymentResponse{} },
}
)

Expand Down
10 changes: 3 additions & 7 deletions common/worker_versioning/worker_versioning.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ const (
UnversionedSearchAttribute = buildIdSearchAttributePrefixUnversioned
)

// TODO (carly): fix delimiter
// escapeBuildIdSearchAttributeDelimiter is a helper which escapes the BuildIdSearchAttributeDelimiter character in the input string
func escapeBuildIdSearchAttributeDelimiter(s string) string {
s = strings.Replace(s, BuildIdSearchAttributeDelimiter, `|`+BuildIdSearchAttributeDelimiter, -1)
Expand All @@ -63,16 +62,13 @@ func escapeBuildIdSearchAttributeDelimiter(s string) string {
// BuildIds KeywordList in this format. If the workflow becomes unpinned or unversioned, this entry will be removed from
// that list.
func PinnedBuildIdSearchAttribute(deployment *deploymentpb.Deployment) string {
escapedDeployment := fmt.Sprintf("%s%s%s",
return fmt.Sprintf("%s%s%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(deployment.GetSeriesName()),
BuildIdSearchAttributeDelimiter,
escapeBuildIdSearchAttributeDelimiter(deployment.GetBuildId()),
)
return sqlparser.String(sqlparser.NewStrVal([]byte(fmt.Sprintf("%s%s%s",
BuildIdSearchAttributePrefixPinned,
BuildIdSearchAttributeDelimiter,
escapedDeployment,
))))
}

// AssignedBuildIdSearchAttribute returns the search attribute value for the currently assigned build ID
Expand Down
17 changes: 7 additions & 10 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ package updateworkflowoptions
import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/fieldmaskpb"

"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -64,24 +63,22 @@ func Invoke(
req.GetWorkflowExecution().GetWorkflowId(),
req.GetWorkflowExecution().GetRunId(),
),
func(workflowLease api.WorkflowLease) (_ *api.UpdateWorkflowAction, updateError error) {
func(workflowLease api.WorkflowLease) (*api.UpdateWorkflowAction, error) {
mutableState := workflowLease.GetMutableState()
defer func() { workflowLease.GetReleaseFn()(updateError) }()

if !mutableState.IsWorkflowExecutionRunning() {
// in-memory mutable state is still clean, let updateError=nil in the defer func()
// to prevent clearing and reloading mutable state while releasing the lock
return nil, consts.ErrWorkflowCompleted
}

// Merge the requested options mentioned in the field mask with the current options in the mutable state
mergedOpts, updateError := applyWorkflowExecutionOptions(
mergedOpts, err := applyWorkflowExecutionOptions(
getOptionsFromMutableState(mutableState),
req.GetWorkflowExecutionOptions(),
req.GetUpdateMask(),
)
if updateError != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error parsing update_options: %v", updateError))
if err != nil {
return nil, serviceerror.NewInvalidArgument(fmt.Sprintf("error applying update_options: %v", err))
}

// Set options for gRPC response
Expand All @@ -95,9 +92,9 @@ func Invoke(
}, nil
}

_, updateError = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(req.GetWorkflowExecutionOptions().GetVersioningOverride())
if updateError != nil {
return nil, updateError
_, err = mutableState.AddWorkflowExecutionOptionsUpdatedEvent(mergedOpts.GetVersioningOverride())
if err != nil {
return nil, err
}

// TODO (carly) part 2: handle safe deployment change --> CreateWorkflowTask=true
Expand Down
5 changes: 4 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2740,7 +2740,10 @@ func (ms *MutableStateImpl) saveBuildIds(buildIds []string, maxSearchAttributeVa
ms.executionInfo.SearchAttributes = searchAttributes
}

hasUnversionedOrAssigned := worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
hasUnversionedOrAssigned := false
if len(buildIds) > 0 { // len is 0 if we are removing the pinned search attribute and the workflow was never unversioned or assigned
hasUnversionedOrAssigned = worker_versioning.IsUnversionedOrAssignedBuildIdSearchAttribute(buildIds[0])
}
for {
saPayload, err := searchattribute.EncodeValue(buildIds, enumspb.INDEXED_VALUE_TYPE_KEYWORD_LIST)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions service/worker/deployment/deployment_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ type DeploymentClientImpl struct {
VisibilityManager manager.VisibilityManager
MaxIDLengthLimit dynamicconfig.IntPropertyFn
VisibilityMaxPageSize dynamicconfig.IntPropertyFnWithNamespaceFilter

reachabilityCache reachabilityCache
reachabilityCache reachabilityCache
}

var _ DeploymentStoreClient = (*DeploymentClientImpl)(nil)
Expand Down
6 changes: 3 additions & 3 deletions service/worker/deployment/deployment_reachability.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func getDeploymentReachability(
) (enumspb.DeploymentReachability, time.Time, error) {
// 1a. Reachable by new unpinned workflows
if isCurrent {
// TODO (carly): still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
// TODO (carly) part 2: still return reachable if the deployment just became not current, but workflows started on it are not yet in reachability
return enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, time.Now(), nil
}

Expand Down Expand Up @@ -99,10 +99,10 @@ func makeCountRequest(

func makeDeploymentQuery(seriesName, buildID string, open bool) string {
var statusFilter string
deploymentFilter := "= " + worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
deploymentFilter := fmt.Sprintf("= '%s'", worker_versioning.PinnedBuildIdSearchAttribute(&deploymentpb.Deployment{
SeriesName: seriesName,
BuildId: buildID,
})
}))
if open {
statusFilter = "= 'Running'"
} else {
Expand Down
7 changes: 3 additions & 4 deletions service/worker/deployment/deployment_reachability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ func TestMakeDeploymentQuery(t *testing.T) {
buildId := "A"

query := makeDeploymentQuery(seriesName, buildId, true)
expectedQuery := "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
expectedQuery := "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus = 'Running'"
assert.Equal(t, expectedQuery, query)

query = makeDeploymentQuery(seriesName, buildId, false)
expectedQuery = "BuildIds = 'reachability:Pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
expectedQuery = "BuildIds = 'pinned:test-deployment:A' AND ExecutionStatus != 'Running'"
assert.Equal(t, expectedQuery, query)
}

Expand All @@ -63,9 +63,8 @@ func TestReachable_CurrentDeployment(t *testing.T) {
vm := manager.NewMockVisibilityManager(gomock.NewController(t)) // won't receive any calls
testCache := newReachabilityCache(metrics.NoopMetricsHandler, vm, testReachabilityCacheOpenWFsTTL, testReachabilityCacheClosedWFsTTL)

reach, reachValidTime, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
reach, _, err := getDeploymentReachability(ctx, "", "", seriesName, buildId, true, testCache)
assert.Nil(t, err)
assert.Greater(t, time.Now(), reachValidTime)
assert.Equal(t, enumspb.DEPLOYMENT_REACHABILITY_REACHABLE, reach)
}

Expand Down
4 changes: 2 additions & 2 deletions service/worker/deployment/fx.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func DeploymentStoreClientProvider(historyClient resource.HistoryClient, visibil
reachabilityCache: newReachabilityCache(
metrics.NoopMetricsHandler,
visibilityManager,
reachabilityCacheOpenWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheOpenWFsTTL)
reachabilityCacheClosedWFsTTL, // TODO (carly) use dc (ie. config.ReachabilityCacheClosedWFsTTL)
dynamicconfig.ReachabilityCacheOpenWFsTTL.Get(dc)(),
dynamicconfig.ReachabilityCacheClosedWFsTTL.Get(dc)(),
),
}
}
Expand Down
Loading
Loading