diff --git a/internal/common/util/rsa_test.go b/internal/common/util/rsa_test.go new file mode 100644 index 000000000..7cb14e67e --- /dev/null +++ b/internal/common/util/rsa_test.go @@ -0,0 +1,80 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package util + +import ( + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func loadTestKeyFile(t *testing.T, path string) []byte { + t.Helper() + key, err := os.ReadFile(path) + require.NoError(t, err) + return key +} + +func TestLoadRSAPublicKey(t *testing.T) { + publicKeyPEM := loadTestKeyFile(t, "./../auth/credentials/keytest.pub") + + t.Run("valid public key", func(t *testing.T) { + key, err := LoadRSAPublicKey(publicKeyPEM) + assert.NoError(t, err) + assert.NotNil(t, key) + assert.Equal(t, 2048, key.Size()*8) + }) + + t.Run("invalid public key", func(t *testing.T) { + _, err := LoadRSAPublicKey([]byte("invalid PEM data")) + assert.ErrorContains(t, err, "failed to parse PEM block containing the public key") + }) + + t.Run("incorrect PEM type for public key", func(t *testing.T) { + privateKeyPEM := loadTestKeyFile(t, "./../auth/credentials/keytest") + _, err := LoadRSAPublicKey(privateKeyPEM) + assert.ErrorContains(t, err, "failed to parse PEM block containing the public key") + }) +} + +func TestLoadRSAPrivateKey(t *testing.T) { + privateKeyPEM := loadTestKeyFile(t, "./../auth/credentials/keytest") + + t.Run("valid private key", func(t *testing.T) { + key, err := LoadRSAPrivateKey(privateKeyPEM) + assert.NoError(t, err) + assert.NotNil(t, key) + assert.Equal(t, 2048, key.Size()*8) + }) + + t.Run("invalid private key", func(t *testing.T) { + _, err := LoadRSAPrivateKey([]byte("invalid PEM data")) + assert.ErrorContains(t, err, "failed to parse PEM block containing the private key") + }) + + t.Run("incorrect PEM type for private key", func(t *testing.T) { + publicKeyPEM := loadTestKeyFile(t, "./../auth/credentials/keytest.pub") + _, err := LoadRSAPrivateKey(publicKeyPEM) + assert.ErrorContains(t, err, "failed to parse PEM block containing the private key") + }) +} diff --git a/internal/error_test.go b/internal/error_test.go index a3cbd0ffa..751060ea9 100644 --- a/internal/error_test.go +++ b/internal/error_test.go @@ -460,7 +460,7 @@ func Test_SignalExternalWorkflowExecutionFailedError(t *testing.T) { InitiatedEventId: common.Int64Ptr(initiatedEventID), Cause: shared.SignalExternalWorkflowExecutionFailedCauseUnknownExternalWorkflowExecution.Ptr(), }) - require.NoError(t, weh.handleSignalExternalWorkflowExecutionFailed(event)) + weh.handleSignalExternalWorkflowExecutionFailed(event) _, ok := actualErr.(*UnknownExternalWorkflowExecutionError) require.True(t, ok) } diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 7c8ef2346..3656a639d 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -806,131 +806,89 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( }) switch event.GetEventType() { + // Noops + case m.EventTypeWorkflowExecutionCompleted, + m.EventTypeWorkflowExecutionTimedOut, + m.EventTypeWorkflowExecutionFailed, + m.EventTypeDecisionTaskScheduled, + m.EventTypeDecisionTaskTimedOut, + m.EventTypeDecisionTaskFailed, + m.EventTypeActivityTaskStarted, + m.EventTypeDecisionTaskCompleted, + m.EventTypeWorkflowExecutionCanceled, + m.EventTypeWorkflowExecutionContinuedAsNew: + // No Operation case m.EventTypeWorkflowExecutionStarted: err = weh.handleWorkflowExecutionStarted(event.WorkflowExecutionStartedEventAttributes) - - case m.EventTypeWorkflowExecutionCompleted: - // No Operation - case m.EventTypeWorkflowExecutionFailed: - // No Operation - case m.EventTypeWorkflowExecutionTimedOut: - // No Operation - case m.EventTypeDecisionTaskScheduled: - // No Operation case m.EventTypeDecisionTaskStarted: // Set replay clock. weh.SetCurrentReplayTime(time.Unix(0, event.GetTimestamp())) weh.workflowDefinition.OnDecisionTaskStarted() // Set replay decisionStarted eventID weh.workflowInfo.DecisionStartedEventID = event.GetEventId() - - case m.EventTypeDecisionTaskTimedOut: - // No Operation - case m.EventTypeDecisionTaskFailed: - // No Operation - case m.EventTypeDecisionTaskCompleted: - // No Operation case m.EventTypeActivityTaskScheduled: weh.decisionsHelper.handleActivityTaskScheduled( event.GetEventId(), event.ActivityTaskScheduledEventAttributes.GetActivityId()) - - case m.EventTypeActivityTaskStarted: - // No Operation - case m.EventTypeActivityTaskCompleted: - err = weh.handleActivityTaskCompleted(event) - + weh.handleActivityTaskCompleted(event) case m.EventTypeActivityTaskFailed: - err = weh.handleActivityTaskFailed(event) - + weh.handleActivityTaskFailed(event) case m.EventTypeActivityTaskTimedOut: - err = weh.handleActivityTaskTimedOut(event) - + weh.handleActivityTaskTimedOut(event) case m.EventTypeActivityTaskCancelRequested: weh.decisionsHelper.handleActivityTaskCancelRequested( event.ActivityTaskCancelRequestedEventAttributes.GetActivityId()) - case m.EventTypeRequestCancelActivityTaskFailed: weh.decisionsHelper.handleRequestCancelActivityTaskFailed( event.RequestCancelActivityTaskFailedEventAttributes.GetActivityId()) - case m.EventTypeActivityTaskCanceled: - err = weh.handleActivityTaskCanceled(event) - + weh.handleActivityTaskCanceled(event) case m.EventTypeTimerStarted: weh.decisionsHelper.handleTimerStarted(event.TimerStartedEventAttributes.GetTimerId()) - case m.EventTypeTimerFired: weh.handleTimerFired(event) - case m.EventTypeTimerCanceled: weh.decisionsHelper.handleTimerCanceled(event.TimerCanceledEventAttributes.GetTimerId()) - case m.EventTypeCancelTimerFailed: weh.decisionsHelper.handleCancelTimerFailed(event.CancelTimerFailedEventAttributes.GetTimerId()) - case m.EventTypeWorkflowExecutionCancelRequested: weh.handleWorkflowExecutionCancelRequested() - - case m.EventTypeWorkflowExecutionCanceled: - // No Operation. - case m.EventTypeRequestCancelExternalWorkflowExecutionInitiated: weh.handleRequestCancelExternalWorkflowExecutionInitiated(event) - case m.EventTypeRequestCancelExternalWorkflowExecutionFailed: weh.handleRequestCancelExternalWorkflowExecutionFailed(event) - case m.EventTypeExternalWorkflowExecutionCancelRequested: weh.handleExternalWorkflowExecutionCancelRequested(event) - - case m.EventTypeWorkflowExecutionContinuedAsNew: - // No Operation. - case m.EventTypeWorkflowExecutionSignaled: weh.handleWorkflowExecutionSignaled(event.WorkflowExecutionSignaledEventAttributes) - case m.EventTypeSignalExternalWorkflowExecutionInitiated: signalID := string(event.SignalExternalWorkflowExecutionInitiatedEventAttributes.Control) weh.decisionsHelper.handleSignalExternalWorkflowExecutionInitiated(event.GetEventId(), signalID) - case m.EventTypeSignalExternalWorkflowExecutionFailed: weh.handleSignalExternalWorkflowExecutionFailed(event) - case m.EventTypeExternalWorkflowExecutionSignaled: weh.handleSignalExternalWorkflowExecutionCompleted(event) - case m.EventTypeMarkerRecorded: err = weh.handleMarkerRecorded(event.GetEventId(), event.MarkerRecordedEventAttributes) - case m.EventTypeStartChildWorkflowExecutionInitiated: weh.decisionsHelper.handleStartChildWorkflowExecutionInitiated( event.StartChildWorkflowExecutionInitiatedEventAttributes.GetWorkflowId()) - case m.EventTypeStartChildWorkflowExecutionFailed: - err = weh.handleStartChildWorkflowExecutionFailed(event) - + weh.handleStartChildWorkflowExecutionFailed(event) case m.EventTypeChildWorkflowExecutionStarted: - err = weh.handleChildWorkflowExecutionStarted(event) - + weh.handleChildWorkflowExecutionStarted(event) case m.EventTypeChildWorkflowExecutionCompleted: - err = weh.handleChildWorkflowExecutionCompleted(event) - + weh.handleChildWorkflowExecutionCompleted(event) case m.EventTypeChildWorkflowExecutionFailed: - err = weh.handleChildWorkflowExecutionFailed(event) - + weh.handleChildWorkflowExecutionFailed(event) case m.EventTypeChildWorkflowExecutionCanceled: - err = weh.handleChildWorkflowExecutionCanceled(event) - + weh.handleChildWorkflowExecutionCanceled(event) case m.EventTypeChildWorkflowExecutionTimedOut: - err = weh.handleChildWorkflowExecutionTimedOut(event) - + weh.handleChildWorkflowExecutionTimedOut(event) case m.EventTypeChildWorkflowExecutionTerminated: - err = weh.handleChildWorkflowExecutionTerminated(event) - + weh.handleChildWorkflowExecutionTerminated(event) case m.EventTypeUpsertWorkflowSearchAttributes: weh.handleUpsertWorkflowSearchAttributes(event) - default: weh.logger.Error("unknown event type", zap.Int64(tagEventID, event.GetEventId()), @@ -1010,38 +968,35 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted( return nil } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCompleted(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } activity.handle(event.ActivityTaskCompletedEventAttributes.Result, nil) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskFailed(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } attributes := event.ActivityTaskFailedEventAttributes err := constructError(*attributes.Reason, attributes.Details, weh.GetDataConverter()) activity.handle(nil, err) - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskClosed(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } var err error @@ -1056,15 +1011,14 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskTimedOut(event * err = NewTimeoutError(attributes.GetTimeoutType(), details) } activity.handle(nil, err) - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event *m.HistoryEvent) { activityID := weh.decisionsHelper.getActivityID(event) decision := weh.decisionsHelper.handleActivityTaskCanceled(activityID) activity := decision.getData().(*scheduledActivity) if activity.handled { - return nil + return } if decision.isDone() || !activity.waitForCancelRequest { @@ -1073,8 +1027,6 @@ func (weh *workflowExecutionEventHandlerImpl) handleActivityTaskCanceled(event * err := NewCanceledError(details) activity.handle(nil, err) } - - return nil } func (weh *workflowExecutionEventHandlerImpl) handleTimerFired(event *m.HistoryEvent) { @@ -1101,13 +1053,19 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( case sideEffectMarkerName: var sideEffectID int32 var result []byte - encodedValues.Get(&sideEffectID, &result) + err := encodedValues.Get(&sideEffectID, &result) + if err != nil { + return fmt.Errorf("extract side effect: %w", err) + } weh.sideEffectResult[sideEffectID] = result return nil case versionMarkerName: var changeID string var version Version - encodedValues.Get(&changeID, &version) + err := encodedValues.Get(&changeID, &version) + if err != nil { + return fmt.Errorf("extract change id: %w", err) + } weh.changeVersions[changeID] = version return nil case localActivityMarkerName: @@ -1115,7 +1073,10 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded( case mutableSideEffectMarkerName: var fixedID string var result string - encodedValues.Get(&fixedID, &result) + err := encodedValues.Get(&fixedID, &result) + if err != nil { + return fmt.Errorf("extract fixed id: %w", err) + } weh.mutableSideEffect[fixedID] = []byte(result) return nil default: @@ -1200,13 +1161,13 @@ func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled( weh.signalHandler(attributes.GetSignalName(), attributes.Input) } -func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.StartChildWorkflowExecutionFailedEventAttributes childWorkflowID := attributes.GetWorkflowId() decision := weh.decisionsHelper.handleStartChildWorkflowExecutionFailed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := &m.WorkflowExecutionAlreadyStartedError{ @@ -1214,18 +1175,16 @@ func (weh *workflowExecutionEventHandlerImpl) handleStartChildWorkflowExecutionF } childWorkflow.startedCallback(WorkflowExecution{}, err) childWorkflow.handle(nil, err) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarted(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionStartedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() childRunID := attributes.WorkflowExecution.GetRunId() decision := weh.decisionsHelper.handleChildWorkflowExecutionStarted(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } childWorkflowExecution := WorkflowExecution{ @@ -1233,95 +1192,83 @@ func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionStarte RunID: childRunID, } childWorkflow.startedCallback(childWorkflowExecution, nil) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCompleted(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionCompletedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } childWorkflow.handle(attributes.Result, nil) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionFailedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := constructError(attributes.GetReason(), attributes.Details, weh.GetDataConverter()) childWorkflow.handle(nil, err) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionCanceled(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionCanceledEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionCanceled(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } details := newEncodedValues(attributes.Details, weh.GetDataConverter()) err := NewCanceledError(details) childWorkflow.handle(nil, err) - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTimedOut(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionTimedOutEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := NewTimeoutError(attributes.GetTimeoutType()) childWorkflow.handle(nil, err) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleChildWorkflowExecutionTerminated(event *m.HistoryEvent) { attributes := event.ChildWorkflowExecutionTerminatedEventAttributes childWorkflowID := attributes.WorkflowExecution.GetWorkflowId() decision := weh.decisionsHelper.handleChildWorkflowExecutionClosed(childWorkflowID) childWorkflow := decision.getData().(*scheduledChildWorkflow) if childWorkflow.handled { - return nil + return } err := newTerminatedError() childWorkflow.handle(nil, err) - - return nil } func (weh *workflowExecutionEventHandlerImpl) handleUpsertWorkflowSearchAttributes(event *m.HistoryEvent) { weh.updateWorkflowInfoWithSearchAttributes(event.UpsertWorkflowSearchAttributesEventAttributes.SearchAttributes) } -func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionInitiated(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionInitiated(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attribute := event.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes workflowID := attribute.WorkflowExecution.GetWorkflowId() cancellationID := string(attribute.Control) weh.decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(event.GetEventId(), workflowID, cancellationID) - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCancelRequested(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCancelRequested(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attributes := event.ExternalWorkflowExecutionCancelRequestedEventAttributes @@ -1331,15 +1278,13 @@ func (weh *workflowExecutionEventHandlerImpl) handleExternalWorkflowExecutionCan // for cancel external workflow, we need to set the future cancellation := decision.getData().(*scheduledCancellation) if cancellation.handled { - return nil + return } cancellation.handle(nil, nil) } - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflowExecutionFailed(event *m.HistoryEvent) { // For cancellation of child workflow only, we do not use cancellation ID // for cancellation of external workflow, we have to use cancellation ID attributes := event.RequestCancelExternalWorkflowExecutionFailedEventAttributes @@ -1349,33 +1294,29 @@ func (weh *workflowExecutionEventHandlerImpl) handleRequestCancelExternalWorkflo // for cancel external workflow, we need to set the future cancellation := decision.getData().(*scheduledCancellation) if cancellation.handled { - return nil + return } err := fmt.Errorf("cancel external workflow failed, %v", attributes.GetCause()) cancellation.handle(nil, err) } - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionCompleted(event *m.HistoryEvent) { attributes := event.ExternalWorkflowExecutionSignaledEventAttributes decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionCompleted(attributes.GetInitiatedEventId()) signal := decision.getData().(*scheduledSignal) if signal.handled { - return nil + return } signal.handle(nil, nil) - - return nil } -func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) error { +func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecutionFailed(event *m.HistoryEvent) { attributes := event.SignalExternalWorkflowExecutionFailedEventAttributes decision := weh.decisionsHelper.handleSignalExternalWorkflowExecutionFailed(attributes.GetInitiatedEventId()) signal := decision.getData().(*scheduledSignal) if signal.handled { - return nil + return } var err error @@ -1387,6 +1328,4 @@ func (weh *workflowExecutionEventHandlerImpl) handleSignalExternalWorkflowExecut } signal.handle(nil, err) - - return nil } diff --git a/internal/internal_event_handlers_test.go b/internal/internal_event_handlers_test.go index e7118858e..6af1e2494 100644 --- a/internal/internal_event_handlers_test.go +++ b/internal/internal_event_handlers_test.go @@ -24,6 +24,10 @@ import ( "encoding/json" "testing" + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/zap/zaptest" + "go.uber.org/cadence/internal/common" "github.com/stretchr/testify/assert" @@ -402,3 +406,313 @@ func TestProcessQuery_KnownQueryTypes(t *testing.T) { assert.NoError(t, err) assert.Equal(t, "[\"__open_sessions\",\"__query_types\",\"__stack_trace\",\"a\"]\n", string(result)) } + +func TestWorkflowExecutionEventHandler_ProcessEvent_WorkflowExecutionStarted(t *testing.T) { + t.Run("success", func(t *testing.T) { + testRegistry := newRegistry() + testRegistry.RegisterWorkflowWithOptions(func(ctx Context) error { return nil }, RegisterWorkflowOptions{Name: "test"}) + + weh := testWorkflowExecutionEventHandler(t, testRegistry) + + testHeaderStruct := &s.Header{ + Fields: map[string][]byte{ + "test": []byte("test"), + }, + } + testInput := []byte("testInput") + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionStarted), + WorkflowExecutionStartedEventAttributes: &s.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &s.WorkflowType{ + Name: common.StringPtr("test"), + }, + Input: testInput, + Header: testHeaderStruct, + }, + } + + err := weh.ProcessEvent(event, false, false) + assert.NoError(t, err) + }) + t.Run("error", func(t *testing.T) { + testRegistry := newRegistry() + + weh := testWorkflowExecutionEventHandler(t, testRegistry) + + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(s.EventTypeWorkflowExecutionStarted), + WorkflowExecutionStartedEventAttributes: &s.WorkflowExecutionStartedEventAttributes{ + WorkflowType: &s.WorkflowType{ + Name: common.StringPtr("test"), + }, + }, + } + + err := weh.ProcessEvent(event, false, false) + assert.ErrorContains(t, err, errMsgUnknownWorkflowType) + }) +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_Noops(t *testing.T) { + for _, tc := range []s.EventType{ + s.EventTypeWorkflowExecutionCompleted, + s.EventTypeWorkflowExecutionTimedOut, + s.EventTypeWorkflowExecutionFailed, + s.EventTypeDecisionTaskScheduled, + s.EventTypeDecisionTaskTimedOut, + s.EventTypeDecisionTaskFailed, + s.EventTypeActivityTaskStarted, + s.EventTypeDecisionTaskCompleted, + s.EventTypeWorkflowExecutionCanceled, + s.EventTypeWorkflowExecutionContinuedAsNew, + } { + t.Run(tc.String(), func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + + event := &s.HistoryEvent{ + EventType: common.EventTypePtr(tc), + } + + err := weh.ProcessEvent(event, false, false) + assert.NoError(t, err) + }) + } +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_nil(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + + err := weh.ProcessEvent(nil, false, false) + assert.ErrorContains(t, err, "nil event provided") +} + +func TestWorkflowExecutionEventHandler_ProcessEvent_no_error_events(t *testing.T) { + for _, tc := range []struct { + event *s.HistoryEvent + prepareHandler func(*testing.T, *workflowExecutionEventHandlerImpl) + }{ + { + event: &s.HistoryEvent{ + EventType: s.EventType(-1).Ptr(), + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskCompleted.Ptr(), + EventId: common.Int64Ptr(2), + ActivityTaskCompletedEventAttributes: &s.ActivityTaskCompletedEventAttributes{ + Result: []byte("test"), + ScheduledEventId: common.Int64Ptr(1), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: true, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(1, "test-activity") + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskFailed.Ptr(), + EventId: common.Int64Ptr(3), + ActivityTaskFailedEventAttributes: &s.ActivityTaskFailedEventAttributes{ + Reason: common.StringPtr("test-reason"), + ScheduledEventId: common.Int64Ptr(2), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: false, + callback: func(result []byte, err error) { + var customErr *CustomError + assert.ErrorAs(t, err, &customErr) + assert.Equal(t, customErr.reason, "test-reason") + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(2, "test-activity") + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeActivityTaskCanceled.Ptr(), + EventId: common.Int64Ptr(4), + ActivityTaskCanceledEventAttributes: &s.ActivityTaskCanceledEventAttributes{ + Details: []byte("test-details"), + ScheduledEventId: common.Int64Ptr(3), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.scheduleActivityTask(&s.ScheduleActivityTaskDecisionAttributes{ + ActivityId: common.StringPtr("test-activity"), + }) + decision.setData(&scheduledActivity{ + handled: false, + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleActivityTaskScheduled(3, "test-activity") + decision.cancel() + decision.handleDecisionSent() + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeTimerCanceled.Ptr(), + EventId: common.Int64Ptr(5), + TimerCanceledEventAttributes: &s.TimerCanceledEventAttributes{ + TimerId: common.StringPtr("test-timer"), + StartedEventId: common.Int64Ptr(4), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startTimer(&s.StartTimerDecisionAttributes{ + TimerId: common.StringPtr("test-timer"), + }) + decision.setData(&scheduledTimer{ + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleTimerStarted("test-timer") + decision.cancel() + decision.handleDecisionSent() + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeCancelTimerFailed.Ptr(), + EventId: common.Int64Ptr(6), + CancelTimerFailedEventAttributes: &s.CancelTimerFailedEventAttributes{ + TimerId: common.StringPtr("test-timer"), + Cause: common.StringPtr("test-cause"), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startTimer(&s.StartTimerDecisionAttributes{ + TimerId: common.StringPtr("test-timer"), + }) + decision.setData(&scheduledTimer{ + callback: func(result []byte, err error) { + var customErr *CustomError + assert.ErrorAs(t, err, &customErr) + assert.Equal(t, customErr.reason, "test-cause") + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleTimerStarted("test-timer") + decision.cancel() + decision.handleDecisionSent() + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeWorkflowExecutionCancelRequested.Ptr(), + EventId: common.Int64Ptr(7), + WorkflowExecutionCancelRequestedEventAttributes: &s.WorkflowExecutionCancelRequestedEventAttributes{ + Cause: common.StringPtr("test-cause"), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + cancelCalled := false + t.Cleanup(func() { + if !cancelCalled { + t.Error("cancelWorkflow not called") + t.FailNow() + } + }) + impl.cancelHandler = func() { + cancelCalled = true + } + }, + }, + { + event: &s.HistoryEvent{ + EventType: s.EventTypeRequestCancelExternalWorkflowExecutionInitiated.Ptr(), + EventId: common.Int64Ptr(8), + RequestCancelExternalWorkflowExecutionInitiatedEventAttributes: &s.RequestCancelExternalWorkflowExecutionInitiatedEventAttributes{ + DecisionTaskCompletedEventId: common.Int64Ptr(7), + Domain: common.StringPtr(testDomain), + WorkflowExecution: &s.WorkflowExecution{ + WorkflowId: common.StringPtr("wid"), + }, + ChildWorkflowOnly: common.BoolPtr(true), + }, + }, + prepareHandler: func(t *testing.T, impl *workflowExecutionEventHandlerImpl) { + decision := impl.decisionsHelper.startChildWorkflowExecution(&s.StartChildWorkflowExecutionDecisionAttributes{ + Domain: common.StringPtr(testDomain), + WorkflowId: common.StringPtr("wid"), + }) + decision.setData(&scheduledChildWorkflow{ + handled: true, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleStartChildWorkflowExecutionInitiated("wid") + impl.decisionsHelper.handleChildWorkflowExecutionStarted("wid") + decision = impl.decisionsHelper.requestCancelExternalWorkflowExecution(testDomain, "wid", "", "", true) + decision.setData(&scheduledCancellation{ + callback: func(result []byte, err error) { + var canceledErr *CanceledError + assert.ErrorAs(t, err, &canceledErr) + }, + }) + impl.decisionsHelper.getDecisions(true) + impl.decisionsHelper.handleRequestCancelExternalWorkflowExecutionInitiated(7, "wid", "") + decision.handleDecisionSent() + }, + }, + } { + t.Run(tc.event.EventType.String(), func(t *testing.T) { + weh := testWorkflowExecutionEventHandler(t, newRegistry()) + if tc.prepareHandler != nil { + tc.prepareHandler(t, weh) + } + + // EventHandle handles all internal failures as panics. + // make sure we don't fail event processing. + // This can happen if there is a panic inside the handler. + weh.completeHandler = func(result []byte, err error) { + assert.NoError(t, err) + } + + err := weh.ProcessEvent(tc.event, false, false) + assert.NoError(t, err) + }) + } +} + +func testWorkflowExecutionEventHandler(t *testing.T, registry *registry) *workflowExecutionEventHandlerImpl { + return newWorkflowExecutionEventHandler( + testWorkflowInfo, + func(result []byte, err error) {}, + zaptest.NewLogger(t), + true, + tally.NewTestScope("test", nil), + registry, + nil, + nil, + opentracing.NoopTracer{}, + nil, + ).(*workflowExecutionEventHandlerImpl) +} + +var testWorkflowInfo = &WorkflowInfo{ + WorkflowType: WorkflowType{ + Name: "test", + Path: "", + }, +} diff --git a/internal/internal_worker_base.go b/internal/internal_worker_base.go index b98982b7c..1dbe1429a 100644 --- a/internal/internal_worker_base.go +++ b/internal/internal_worker_base.go @@ -106,7 +106,7 @@ type ( // WorkflowDefinition wraps the code that can execute a workflow. workflowDefinition interface { Execute(env workflowEnvironment, header *shared.Header, input []byte) - // Called for each non timed out startDecision event. + // OnDecisionTaskStarted is called for each non timed out startDecision event. // Executed after all history events since the previous decision are applied to workflowDefinition OnDecisionTaskStarted() StackTrace() string // Stack trace of all coroutines owned by the Dispatcher instance