Skip to content

Commit

Permalink
Add activity retry policy properties to replication (#7055)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Add the following activity retry policy properties to the activity
replication logic:
* InitialInterval
* MaximumInterval
* MaximumAttempts
* BackoffCoefficient

## Why?
<!-- Tell your future self why have you made these changes -->
Before there was assumption that activity retry policy can't change.
Because of that it was not replicated.
Now we can change activity retry policy via UpdateActivity API. Those
changes should be replicated.


## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Add func tests that check few updated properties.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
Increasing replication size.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No
  • Loading branch information
ychebotarev authored Jan 10, 2025
1 parent 975ff7b commit c49995f
Show file tree
Hide file tree
Showing 10 changed files with 3,019 additions and 2,813 deletions.
4,484 changes: 2,294 additions & 2,190 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1,296 changes: 676 additions & 620 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ message SyncActivityRequest {
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;

// Retry policy for the activity.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;
}

message SyncActivitiesRequest {
Expand Down Expand Up @@ -666,6 +672,11 @@ message ActivitySyncInfo {
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;
// Retry policy for the activity. It needs to be replicated now, since the activity properties can be updated.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package temporal.server.api.replication.v1;
option go_package = "go.temporal.io/server/api/replication/v1;repication";

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

import "temporal/server/api/enums/v1/replication.proto";
import "temporal/server/api/enums/v1/task.proto";
Expand Down Expand Up @@ -174,6 +175,11 @@ message SyncActivityTaskAttributes {
int32 stamp = 20;
// Flag indicating whether the activity is currently paused.
bool paused = 21;
// Retry policy for the activity. It needs to be replicated now, since the activity properties can be updated.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;
}

message HistoryTaskAttributes {
Expand Down
11 changes: 10 additions & 1 deletion service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
LastAttemptCompleteTime: request.LastAttemptCompleteTime,
Stamp: request.Stamp,
Paused: request.Paused,
RetryInitialInterval: request.RetryInitialInterval,
RetryMaximumInterval: request.RetryMaximumInterval,
RetryMaximumAttempts: request.RetryMaximumAttempts,
RetryBackoffCoefficient: request.RetryBackoffCoefficient,
},
)
if err != nil {
Expand Down Expand Up @@ -354,11 +358,16 @@ func (r *ActivityStateReplicatorImpl) compareActivity(
return true
}

if activityInfo.Stamp != stamp {
if activityInfo.Stamp < stamp {
// stamp changed, should update activity
return true
}

if activityInfo.Stamp > stamp {
// stamp is older than we have, should not update activity
return false
}

// activityInfo.Version == version
if activityInfo.Attempt > attempt {
// this should not retry, can be caused by failover or reset
Expand Down
2 changes: 1 addition & 1 deletion service/history/ndc/activity_state_replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (s *activityReplicatorStateSuite) TestActivity_DifferentStamp() {
localActivityInfo := &persistencespb.ActivityInfo{
Version: version,
Attempt: attempt,
Stamp: stamp + 1,
Stamp: stamp - 1,
}

apply := s.nDCActivityStateReplicator.compareActivity(
Expand Down
8 changes: 8 additions & 0 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func NewExecutableActivityStateTask(
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
RetryInitialInterval: task.RetryInitialInterval,
RetryMaximumInterval: task.RetryMaximumInterval,
RetryMaximumAttempts: task.RetryMaximumAttempts,
RetryBackoffCoefficient: task.RetryBackoffCoefficient,
},

batchable: true,
Expand All @@ -128,6 +132,10 @@ func NewExecutableActivityStateTask(
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
RetryInitialInterval: task.RetryInitialInterval,
RetryMaximumInterval: task.RetryMaximumInterval,
RetryMaximumAttempts: task.RetryMaximumAttempts,
RetryBackoffCoefficient: task.RetryBackoffCoefficient,
}),
}
}
Expand Down
4 changes: 4 additions & 0 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func convertActivityStateReplicationTask(
LastAttemptCompleteTime: activityInfo.LastAttemptCompleteTime,
Stamp: activityInfo.Stamp,
Paused: activityInfo.Paused,
RetryInitialInterval: activityInfo.RetryInitialInterval,
RetryMaximumInterval: activityInfo.RetryMaximumInterval,
RetryMaximumAttempts: activityInfo.RetryMaximumAttempts,
RetryBackoffCoefficient: activityInfo.RetryBackoffCoefficient,
},
},
VisibilityTime: timestamppb.New(taskInfo.VisibilityTimestamp),
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,11 @@ func (ms *MutableStateImpl) UpdateActivityInfo(

ai.Paused = incomingActivityInfo.GetPaused()

ai.RetryInitialInterval = incomingActivityInfo.GetRetryInitialInterval()
ai.RetryMaximumInterval = incomingActivityInfo.GetRetryMaximumInterval()
ai.RetryMaximumAttempts = incomingActivityInfo.GetRetryMaximumAttempts()
ai.RetryBackoffCoefficient = incomingActivityInfo.GetRetryBackoffCoefficient()

ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()
Expand Down
5 changes: 4 additions & 1 deletion tests/xdc/activity_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
ActivityOptions: &activitypb.ActivityOptions{
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: durationpb.New(2 * time.Second),
MaximumAttempts: 10,
},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"retry_policy.initial_interval"}},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"retry_policy.initial_interval", "retry_policy.maximum_attempts"}},
}
respUpdate, err := s.cluster1.Host().FrontendClient().UpdateActivityOptionsById(ctx, updateRequest)
s.NoError(err)
Expand Down Expand Up @@ -196,6 +197,8 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
s.Equal(1, len(description.PendingActivities))
s.True(description.PendingActivities[0].Paused)
s.Equal(int32(1), description.PendingActivities[0].Attempt)
s.Equal(int64(2), description.PendingActivities[0].CurrentRetryInterval.GetSeconds())
s.Equal(int32(10), description.PendingActivities[0].MaximumAttempts)

// start worker2
worker2 := sdkworker.New(standbyClient, taskQueue, sdkworker.Options{})
Expand Down

0 comments on commit c49995f

Please sign in to comment.