Skip to content

Commit

Permalink
Fix workflow and activity registration race (#980)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 authored and yux0 committed Jun 2, 2020
1 parent 4e06a49 commit accf9b3
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 48 deletions.
4 changes: 2 additions & 2 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,8 +395,8 @@ func deSerializeFunctionResult(f interface{}, result []byte, to interface{}, dat
case reflect.String:
// If we know about this function through registration then we will try to return corresponding result type.
fnName := reflect.ValueOf(f).String()
if fnRegistered, ok := registry.getActivityFn(fnName); ok {
return deSerializeFnResultFromFnType(reflect.TypeOf(fnRegistered), result, to, dataConverter)
if activity, ok := registry.GetActivity(fnName); ok {
return deSerializeFnResultFromFnType(reflect.TypeOf(activity.GetFunction()), result, to, dataConverter)
}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1833,7 +1833,7 @@ func (ath *activityTaskHandlerImpl) getActivity(name string) activity {
return ath.activityProvider(name)
}

if a, ok := ath.registry.getActivity(name); ok {
if a, ok := ath.registry.GetActivity(name); ok {
return a
}

Expand Down
7 changes: 5 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,7 @@ func (t *TaskHandlersTestSuite) TestActivityExecutionDeadline() {
}
a := &testActivityDeadline{logger: t.logger}
registry := getGlobalRegistry()
registry.addActivity(a.ActivityType().Name, a)
registry.addActivityWithLock(a.ActivityType().Name, a)

mockCtrl := gomock.NewController(t.T())
mockService := workflowservicetest.NewMockClient(mockCtrl)
Expand Down Expand Up @@ -1324,7 +1324,10 @@ func activityWithWorkerStop(ctx context.Context) error {
func (t *TaskHandlersTestSuite) TestActivityExecutionWorkerStop() {
a := &testActivityDeadline{logger: t.logger}
registry := getGlobalRegistry()
registry.addActivityFn(a.ActivityType().Name, activityWithWorkerStop)
registry.RegisterActivityWithOptions(
activityWithWorkerStop,
RegisterActivityOptions{Name: a.ActivityType().Name, DisableAlreadyRegisteredCheck: true},
)

mockCtrl := gomock.NewController(t.T())
mockService := workflowservicetest.NewMockClient(mockCtrl)
Expand Down
69 changes: 30 additions & 39 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,28 +539,32 @@ func (r *registry) RegisterWorkflow(af interface{}) {
}

func (r *registry) RegisterWorkflowWithOptions(
af interface{},
wf interface{},
options RegisterWorkflowOptions,
) {
// Validate that it is a function
fnType := reflect.TypeOf(af)
fnType := reflect.TypeOf(wf)
if err := validateFnFormat(fnType, true); err != nil {
panic(err)
}
fnName := getFunctionName(af)
fnName := getFunctionName(wf)
alias := options.Name
registerName := fnName
if len(alias) > 0 {
registerName = alias
}

r.Lock()
defer r.Unlock()

if !options.DisableAlreadyRegisteredCheck {
if _, ok := r.workflowFuncMap[registerName]; ok {
panic(fmt.Sprintf("workflow name \"%v\" is already registered", registerName))
}
}
r.addWorkflowFn(registerName, af)
r.workflowFuncMap[registerName] = wf
if len(alias) > 0 {
r.addWorkflowAlias(fnName, alias)
r.workflowAliasMap[fnName] = alias
}
}

Expand Down Expand Up @@ -594,20 +598,27 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
if len(alias) > 0 {
registerName = alias
}

r.Lock()
defer r.Unlock()

if !options.DisableAlreadyRegisteredCheck {
if _, ok := r.activityFuncMap[registerName]; ok {
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.addActivityFn(registerName, af)
r.activityFuncMap[registerName] = &activityExecutor{registerName, af}
if len(alias) > 0 {
r.addActivityAlias(fnName, alias)
r.activityAliasMap[fnName] = alias
}

return nil
}

func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterActivityOptions) error {
r.Lock()
defer r.Unlock()

structValue := reflect.ValueOf(aStruct)
structType := structValue.Type()
count := 0
Expand All @@ -629,11 +640,11 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
}
registerName = prefix + name
if !options.DisableAlreadyRegisteredCheck {
if _, ok := r.getActivityFn(registerName); ok {
if _, ok := r.getActivityNoLock(registerName); ok {
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.addActivityFn(registerName, methodValue.Interface())
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface()}
count++
}

Expand All @@ -644,12 +655,6 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
return nil
}

func (r *registry) addWorkflowAlias(fnName string, alias string) {
r.Lock()
defer r.Unlock()
r.workflowAliasMap[fnName] = alias
}

func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
r.Lock() // do not defer for Unlock to call next.getWorkflowAlias without lock
alias, ok := r.workflowAliasMap[fnName]
Expand All @@ -661,12 +666,6 @@ func (r *registry) getWorkflowAlias(fnName string) (string, bool) {
return alias, ok
}

func (r *registry) addWorkflowFn(fnName string, wf interface{}) {
r.Lock()
defer r.Unlock()
r.workflowFuncMap[fnName] = wf
}

func (r *registry) getWorkflowFn(fnName string) (interface{}, bool) {
r.Lock() // do not defer for Unlock to call next.getWorkflowFn without lock
fn, ok := r.workflowFuncMap[fnName]
Expand All @@ -692,12 +691,6 @@ func (r *registry) getRegisteredWorkflowTypes() []string {
return result
}

func (r *registry) addActivityAlias(fnName string, alias string) {
r.Lock()
defer r.Unlock()
r.activityAliasMap[fnName] = alias
}

func (r *registry) getActivityAlias(fnName string) (string, bool) {
r.Lock() // do not defer for Unlock to call next.getActivityAlias without lock
alias, ok := r.activityAliasMap[fnName]
Expand All @@ -709,32 +702,30 @@ func (r *registry) getActivityAlias(fnName string) (string, bool) {
return alias, ok
}

func (r *registry) addActivity(fnName string, a activity) {
// Use in unit test only, otherwise deadlock will occur.
func (r *registry) addActivityWithLock(fnName string, a activity) {
r.Lock()
defer r.Unlock()
r.activityFuncMap[fnName] = a
}

func (r *registry) addActivityFn(fnName string, af interface{}) {
r.addActivity(fnName, &activityExecutor{fnName, af})
}

func (r *registry) getActivity(fnName string) (activity, bool) {
r.Lock() // do not defer for Unlock to call next.getActivity without lock
func (r *registry) GetActivity(fnName string) (activity, bool) {
r.Lock() // do not defer for Unlock to call next.GetActivity without lock
a, ok := r.activityFuncMap[fnName]
if !ok && r.next != nil {
r.Unlock()
return r.next.getActivity(fnName)
return r.next.GetActivity(fnName)
}
r.Unlock()
return a, ok
}

func (r *registry) getActivityFn(fnName string) (interface{}, bool) {
if a, ok := r.getActivity(fnName); ok {
return a.GetFunction(), ok
func (r *registry) getActivityNoLock(fnName string) (activity, bool) {
a, ok := r.activityFuncMap[fnName]
if !ok && r.next != nil {
return r.next.getActivityNoLock(fnName)
}
return nil, false
return a, ok
}

func (r *registry) getRegisteredActivities() []activity {
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *WorkersTestSuite) TestActivityWorker() {
overrides := &workerOverrides{activityTaskHandler: newSampleActivityTaskHandler()}
a := &greeterActivity{}
registry := getGlobalRegistry()
registry.addActivity(a.ActivityType().Name, a)
registry.addActivityWithLock(a.ActivityType().Name, a)
activityWorker := newActivityWorker(
s.service, domain, executionParameters, overrides, registry, nil,
)
Expand Down Expand Up @@ -176,7 +176,7 @@ func (s *WorkersTestSuite) TestActivityWorkerStop() {
overrides := &workerOverrides{activityTaskHandler: activityTaskHandler}
a := &greeterActivity{}
registry := getGlobalRegistry()
registry.addActivity(a.ActivityType().Name, a)
registry.addActivityWithLock(a.ActivityType().Name, a)
worker := newActivityWorker(
s.service, domain, executionParameters, overrides, registry, nil,
)
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ func getGetWorkflowExecutionHistoryRequest(filterType shared.HistoryEventFilterT
},
WaitForNewEvent: common.BoolPtr(isLongPoll),
HistoryEventFilterType: &filterType,
SkipArchival: common.BoolPtr(true),
SkipArchival: common.BoolPtr(true),
}

return request
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -1517,7 +1517,7 @@ func (env *testWorkflowEnvironmentImpl) newTestActivityTaskHandler(taskList stri
}
}

activity, ok := registry.getActivity(name)
activity, ok := registry.GetActivity(name)
if !ok {
return nil
}
Expand Down

0 comments on commit accf9b3

Please sign in to comment.