Skip to content

Commit

Permalink
Make RunID empty by default again
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin committed Jan 9, 2025
1 parent 6f0213f commit 302fa66
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 46 deletions.
1 change: 1 addition & 0 deletions common/rpc/interceptor/logtags/workflow_tags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func TestExtract(t *testing.T) {
)

tv := testvars.New(t)
tv = tv.WithRunID(tv.Any().RunID())
taskToken := tokenspb.Task{
WorkflowId: tv.WorkflowID(),
RunId: tv.RunID(),
Expand Down
5 changes: 5 additions & 0 deletions common/testing/testvars/any.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package testvars
import (
"time"

"github.com/pborman/uuid"
commonpb "go.temporal.io/api/common/v1"
failurepb "go.temporal.io/api/failure/v1"
"go.temporal.io/server/common/payload"
Expand Down Expand Up @@ -85,3 +86,7 @@ func (a Any) ApplicationFailure() *failurepb.Failure {
func (a Any) InfiniteTimeout() *durationpb.Duration {
return durationpb.New(10 * time.Hour)
}

func (a Any) RunID() string {
return uuid.New()
}
9 changes: 8 additions & 1 deletion common/testing/testvars/test_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func (tv *TestVars) uuidString(_ string) string {
return uuid.New()
}

func (tv *TestVars) emptyString(_ string) string {
return ""
}

func (tv *TestVars) clone() *TestVars {
tv2 := newFromName(tv.testName)
tv.values.Range(func(key, value any) bool {
Expand Down Expand Up @@ -204,8 +208,11 @@ func (tv *TestVars) WithWorkflowIDNumber(n int) *TestVars {
return tv.cloneSetN("workflow_id", n)
}

// RunID is different from other getters. By default, it returns an empty string.
// This is to simplify the usage of WorkflowExecution() which most of the time
// doesn't need RunID. Otherwise, RunID can be set explicitly using WithRunID.
func (tv *TestVars) RunID() string {
return getOrCreate(tv, "run_id", tv.uuidString, tv.uuidNSetter)
return getOrCreate(tv, "run_id", tv.emptyString, tv.uuidNSetter)
}

func (tv *TestVars) WithRunID(runID string) *TestVars {
Expand Down
7 changes: 7 additions & 0 deletions service/history/api/respondworkflowtaskcompleted/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Accept Complete", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
writtenHistoryCh := createWrittenHistoryCh(1)
Expand Down Expand Up @@ -217,6 +218,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Reject", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)

Expand All @@ -241,6 +243,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Write failed on normal task queue", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)

Expand All @@ -266,6 +269,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Write failed on sticky task queue", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)

Expand Down Expand Up @@ -295,6 +299,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("GetHistory failed", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
writtenHistoryCh := createWrittenHistoryCh(1)
Expand Down Expand Up @@ -335,6 +340,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Discard speculative WFT with events", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
// Expect only 2 calls to UpdateWorkflowExecution: for timer started and timer fired events but not Update or WFT events.
Expand Down Expand Up @@ -392,6 +398,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) TestUpdateWorkflow() {

s.Run("Do not discard speculative WFT with more than 10 events", func() {
tv := testvars.New(s.T())
tv = tv.WithRunID(tv.Any().RunID())
s.mockNamespaceCache.EXPECT().GetNamespaceByID(tv.NamespaceID()).Return(tv.Namespace(), nil).AnyTimes()
wfContext := s.createStartedWorkflow(tv)
// Expect 2 calls to UpdateWorkflowExecution: for timer started and WFT events.
Expand Down
15 changes: 5 additions & 10 deletions tests/update_workflow_sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TerminateWorkflowAfterUpdate
defer cancel()
tv := testvars.New(s.T()).
WithTaskQueue(s.TaskQueue()).
WithNamespaceName(namespace.Name(s.Namespace())).
WithRunID("")
WithNamespaceName(namespace.Name(s.Namespace()))

workflowFn := func(ctx workflow.Context) error {
s.NoError(workflow.SetUpdateHandler(ctx, tv.HandlerName(), func(ctx workflow.Context, arg string) error {
Expand Down Expand Up @@ -102,8 +101,7 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TimeoutWorkflowAfterUpdateAc
defer cancel()
tv := testvars.New(s.T()).
WithTaskQueue(s.TaskQueue()).
WithNamespaceName(namespace.Name(s.Namespace())).
WithRunID("")
WithNamespaceName(namespace.Name(s.Namespace()))

workflowFn := func(ctx workflow.Context) error {
s.NoError(workflow.SetUpdateHandler(ctx, tv.HandlerName(), func(ctx workflow.Context, arg string) error {
Expand Down Expand Up @@ -167,8 +165,7 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TerminateWorkflowAfterUpdate
defer cancel()
tv := testvars.New(s.T()).
WithTaskQueue(s.TaskQueue()).
WithNamespaceName(namespace.Name(s.Namespace())).
WithRunID("")
WithNamespaceName(namespace.Name(s.Namespace()))

workflowFn := func(ctx workflow.Context) error {
s.NoError(workflow.SetUpdateHandler(ctx, tv.HandlerName(), func(ctx workflow.Context, arg string) error {
Expand Down Expand Up @@ -232,8 +229,7 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_ContinueAsNewAfterUpdateAdmi

tv := testvars.New(s.T()).
WithTaskQueue(s.TaskQueue()).
WithNamespaceName(namespace.Name(s.Namespace())).
WithRunID("")
WithNamespaceName(namespace.Name(s.Namespace()))

rootCtx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down Expand Up @@ -320,8 +316,7 @@ func (s *UpdateWorkflowSdkSuite) TestUpdateWorkflow_TimeoutWithRetryAfterUpdateA

tv := testvars.New(s.T()).
WithTaskQueue(s.TaskQueue()).
WithNamespaceName(namespace.Name(s.Namespace())).
WithRunID("")
WithNamespaceName(namespace.Name(s.Namespace()))

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
Expand Down
20 changes: 10 additions & 10 deletions tests/update_workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4946,7 +4946,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
s.Run(fmt.Sprintf("start workflow and send update (with conflict policy %v)", p), func() {

s.Run("and accept", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

startReq := startWorkflowReq(tv)
startReq.WorkflowIdConflictPolicy = p
Expand Down Expand Up @@ -4985,7 +4985,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
})

s.Run("and reject", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

startReq := startWorkflowReq(tv)
updateReq := s.updateWorkflowRequest(tv,
Expand Down Expand Up @@ -5028,7 +5028,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
s.Run("workflow id conflict policy use-existing: only send update", func() {

s.Run("and accept", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

// start workflow
_, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
Expand Down Expand Up @@ -5078,7 +5078,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
})

s.Run("and reject", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

// start workflow
_, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
Expand Down Expand Up @@ -5125,7 +5125,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
})

s.Run("workflow id conflict policy terminate-existing: terminate workflow first, then start and update", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

// start workflow
firstWF, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
Expand Down Expand Up @@ -5175,7 +5175,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
})

s.Run("workflow id conflict policy fail: abort multi operation", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

_, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
s.NoError(err)
Expand Down Expand Up @@ -5208,7 +5208,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
enumspb.WORKFLOW_ID_CONFLICT_POLICY_FAIL,
} {
s.Run(fmt.Sprintf("for workflow id conflict policy %v", p), func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

startReq := startWorkflowReq(tv)
startReq.RequestId = "request_id"
Expand Down Expand Up @@ -5248,7 +5248,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
s.Run("workflow is closed", func() {

s.Run("workflow id reuse policy allow-duplicate", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

// start and terminate workflow
initialWorkflow, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
Expand Down Expand Up @@ -5302,7 +5302,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
})

s.Run("workflow id reuse policy reject-duplicate", func() {
tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

// start and terminate workflow
_, err := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), startWorkflowReq(tv))
Expand Down Expand Up @@ -5344,7 +5344,7 @@ func (s *UpdateWorkflowSuite) TestUpdateWithStart() {
cleanup := s.OverrideDynamicConfig(dynamicconfig.WorkflowExecutionMaxTotalUpdates, maxTotalUpdates)
defer cleanup()

tv := testvars.New(s.T()).WithRunID("")
tv := testvars.New(s.T())

startReq := startWorkflowReq(tv)
updateReq := s.updateWorkflowRequest(tv,
Expand Down
Loading

0 comments on commit 302fa66

Please sign in to comment.