From 4d949775a449ff4c62c45af5a0ba1431bcc1dc1c Mon Sep 17 00:00:00 2001 From: Bowei Xu Date: Wed, 28 Aug 2019 16:48:33 -0700 Subject: [PATCH] Fix non-deterministic error caused by upsert search attributes (#828) --- internal/internal_task_handlers.go | 13 ++- internal/internal_task_handlers_test.go | 128 ++++++++++++++++++++---- test/integration_test.go | 2 +- 3 files changed, 120 insertions(+), 23 deletions(-) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 0592f5f2d..48d7bbedb 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1278,16 +1278,19 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo } eventAttributes := e.UpsertWorkflowSearchAttributesEventAttributes decisionAttributes := d.UpsertWorkflowSearchAttributesDecisionAttributes - if eventAttributes.SearchAttributes != decisionAttributes.SearchAttributes { - return false - } - return true - + return isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes) } return false } +func isSearchAttributesMatched(attrFromEvent, attrFromDecision *s.SearchAttributes) bool { + if attrFromEvent != nil && attrFromDecision != nil { + return reflect.DeepEqual(attrFromEvent.IndexedFields, attrFromDecision.IndexedFields) + } + return attrFromEvent == nil && attrFromDecision == nil +} + // return true if the check fails: // domain is not empty in decision // and domain is not replayDomain diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index 670822e68..ebc0b5047 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -992,27 +992,121 @@ func Test_NonDeterministicCheck(t *testing.T) { func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) { diType := s.DecisionTypeUpsertWorkflowSearchAttributes - searchAttr := &s.SearchAttributes{} - decision := &s.Decision{ - DecisionType: &diType, - UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{ - SearchAttributes: searchAttr, + eType := s.EventTypeUpsertWorkflowSearchAttributes + + testCases := []struct { + name string + decision *s.Decision + event *s.HistoryEvent + expected bool + }{ + { + name: "event type not match", + decision: &s.Decision{ + DecisionType: &diType, + UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{ + SearchAttributes: &s.SearchAttributes{}, + }, + }, + event: &s.HistoryEvent{}, + expected: false, + }, + { + name: "attributes not match", + decision: &s.Decision{ + DecisionType: &diType, + UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{ + SearchAttributes: &s.SearchAttributes{}, + }, + }, + event: &s.HistoryEvent{ + EventType: &eType, + UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{}, + }, + expected: false, + }, + { + name: "attributes match", + decision: &s.Decision{ + DecisionType: &diType, + UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{ + SearchAttributes: &s.SearchAttributes{}, + }, + }, + event: &s.HistoryEvent{ + EventType: &eType, + UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{ + SearchAttributes: &s.SearchAttributes{}, + }, + }, + expected: true, }, } - historyEvent := &s.HistoryEvent{} - ok := isDecisionMatchEvent(decision, historyEvent, false) - eType := s.EventTypeUpsertWorkflowSearchAttributes - historyEvent = &s.HistoryEvent{ - EventType: &eType, - UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{}, + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, false)) + }) + } +} + +func Test_IsSearchAttributesMatched(t *testing.T) { + testCases := []struct { + name string + lhs *s.SearchAttributes + rhs *s.SearchAttributes + expected bool + }{ + { + name: "both nil", + lhs: nil, + rhs: nil, + expected: true, + }, + { + name: "left nil", + lhs: nil, + rhs: &s.SearchAttributes{}, + expected: false, + }, + { + name: "right nil", + lhs: &s.SearchAttributes{}, + rhs: nil, + expected: false, + }, + { + name: "not match", + lhs: &s.SearchAttributes{ + IndexedFields: map[string][]byte{ + "key1": []byte("1"), + "key2": []byte("abc"), + }, + }, + rhs: &s.SearchAttributes{}, + expected: false, + }, + { + name: "match", + lhs: &s.SearchAttributes{ + IndexedFields: map[string][]byte{ + "key1": []byte("1"), + "key2": []byte("abc"), + }, + }, + rhs: &s.SearchAttributes{ + IndexedFields: map[string][]byte{ + "key2": []byte("abc"), + "key1": []byte("1"), + }, + }, + expected: true, + }, } - ok = isDecisionMatchEvent(decision, historyEvent, false) - require.False(t, ok) - historyEvent.UpsertWorkflowSearchAttributesEventAttributes = &s.UpsertWorkflowSearchAttributesEventAttributes{ - SearchAttributes: searchAttr, + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + require.Equal(t, testCase.expected, isSearchAttributesMatched(testCase.lhs, testCase.rhs)) + }) } - ok = isDecisionMatchEvent(decision, historyEvent, false) - require.True(t, ok) } diff --git a/test/integration_test.go b/test/integration_test.go index c962dab51..9be729e3d 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -73,7 +73,7 @@ func (ts *IntegrationTestSuite) SetupSuite() { func (ts *IntegrationTestSuite) TearDownSuite() { // sleep for a while to allow the pollers to shutdown // then assert that there are no lingering go routines - time.Sleep(11 * time.Second) + time.Sleep(20 * time.Second) // https://github.com/uber-go/cadence-client/issues/739 goleak.VerifyNoLeaks(ts.T(), goleak.IgnoreTopFunction("go.uber.org/cadence/internal.(*coroutineState).initialYield")) }