Skip to content

Commit

Permalink
Fix non-deterministic error caused by upsert search attributes (#828)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu committed Aug 29, 2019
1 parent eb10667 commit 4d94977
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 23 deletions.
13 changes: 8 additions & 5 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,16 +1278,19 @@ func isDecisionMatchEvent(d *s.Decision, e *s.HistoryEvent, strictMode bool) boo
}
eventAttributes := e.UpsertWorkflowSearchAttributesEventAttributes
decisionAttributes := d.UpsertWorkflowSearchAttributesDecisionAttributes
if eventAttributes.SearchAttributes != decisionAttributes.SearchAttributes {
return false
}
return true

return isSearchAttributesMatched(eventAttributes.SearchAttributes, decisionAttributes.SearchAttributes)
}

return false
}

func isSearchAttributesMatched(attrFromEvent, attrFromDecision *s.SearchAttributes) bool {
if attrFromEvent != nil && attrFromDecision != nil {
return reflect.DeepEqual(attrFromEvent.IndexedFields, attrFromDecision.IndexedFields)
}
return attrFromEvent == nil && attrFromDecision == nil
}

// return true if the check fails:
// domain is not empty in decision
// and domain is not replayDomain
Expand Down
128 changes: 111 additions & 17 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -992,27 +992,121 @@ func Test_NonDeterministicCheck(t *testing.T) {

func Test_IsDecisionMatchEvent_UpsertWorkflowSearchAttributes(t *testing.T) {
diType := s.DecisionTypeUpsertWorkflowSearchAttributes
searchAttr := &s.SearchAttributes{}
decision := &s.Decision{
DecisionType: &diType,
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
SearchAttributes: searchAttr,
eType := s.EventTypeUpsertWorkflowSearchAttributes

testCases := []struct {
name string
decision *s.Decision
event *s.HistoryEvent
expected bool
}{
{
name: "event type not match",
decision: &s.Decision{
DecisionType: &diType,
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
SearchAttributes: &s.SearchAttributes{},
},
},
event: &s.HistoryEvent{},
expected: false,
},
{
name: "attributes not match",
decision: &s.Decision{
DecisionType: &diType,
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
SearchAttributes: &s.SearchAttributes{},
},
},
event: &s.HistoryEvent{
EventType: &eType,
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
},
expected: false,
},
{
name: "attributes match",
decision: &s.Decision{
DecisionType: &diType,
UpsertWorkflowSearchAttributesDecisionAttributes: &s.UpsertWorkflowSearchAttributesDecisionAttributes{
SearchAttributes: &s.SearchAttributes{},
},
},
event: &s.HistoryEvent{
EventType: &eType,
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{
SearchAttributes: &s.SearchAttributes{},
},
},
expected: true,
},
}
historyEvent := &s.HistoryEvent{}
ok := isDecisionMatchEvent(decision, historyEvent, false)

eType := s.EventTypeUpsertWorkflowSearchAttributes
historyEvent = &s.HistoryEvent{
EventType: &eType,
UpsertWorkflowSearchAttributesEventAttributes: &s.UpsertWorkflowSearchAttributesEventAttributes{},
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
require.Equal(t, testCase.expected, isDecisionMatchEvent(testCase.decision, testCase.event, false))
})
}
}

func Test_IsSearchAttributesMatched(t *testing.T) {
testCases := []struct {
name string
lhs *s.SearchAttributes
rhs *s.SearchAttributes
expected bool
}{
{
name: "both nil",
lhs: nil,
rhs: nil,
expected: true,
},
{
name: "left nil",
lhs: nil,
rhs: &s.SearchAttributes{},
expected: false,
},
{
name: "right nil",
lhs: &s.SearchAttributes{},
rhs: nil,
expected: false,
},
{
name: "not match",
lhs: &s.SearchAttributes{
IndexedFields: map[string][]byte{
"key1": []byte("1"),
"key2": []byte("abc"),
},
},
rhs: &s.SearchAttributes{},
expected: false,
},
{
name: "match",
lhs: &s.SearchAttributes{
IndexedFields: map[string][]byte{
"key1": []byte("1"),
"key2": []byte("abc"),
},
},
rhs: &s.SearchAttributes{
IndexedFields: map[string][]byte{
"key2": []byte("abc"),
"key1": []byte("1"),
},
},
expected: true,
},
}
ok = isDecisionMatchEvent(decision, historyEvent, false)
require.False(t, ok)

historyEvent.UpsertWorkflowSearchAttributesEventAttributes = &s.UpsertWorkflowSearchAttributesEventAttributes{
SearchAttributes: searchAttr,
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
require.Equal(t, testCase.expected, isSearchAttributesMatched(testCase.lhs, testCase.rhs))
})
}
ok = isDecisionMatchEvent(decision, historyEvent, false)
require.True(t, ok)
}
2 changes: 1 addition & 1 deletion test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (ts *IntegrationTestSuite) SetupSuite() {
func (ts *IntegrationTestSuite) TearDownSuite() {
// sleep for a while to allow the pollers to shutdown
// then assert that there are no lingering go routines
time.Sleep(11 * time.Second)
time.Sleep(20 * time.Second)
// https://github.com/uber-go/cadence-client/issues/739
goleak.VerifyNoLeaks(ts.T(), goleak.IgnoreTopFunction("go.uber.org/cadence/internal.(*coroutineState).initialYield"))
}
Expand Down

0 comments on commit 4d94977

Please sign in to comment.