Skip to content

Commit

Permalink
Add activity retry policy properties to replication
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Jan 7, 2025
1 parent 4c22972 commit d6f63ea
Show file tree
Hide file tree
Showing 9 changed files with 3,008 additions and 2,811 deletions.
4,484 changes: 2,294 additions & 2,190 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1,296 changes: 676 additions & 620 deletions api/replication/v1/message.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,12 @@ message SyncActivityRequest {
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;

// Retry policy for the activity.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;
}

message SyncActivitiesRequest {
Expand Down Expand Up @@ -666,6 +672,11 @@ message ActivitySyncInfo {
int32 stamp = 20;
// Indicates if the activity is paused.
bool paused = 21;
// Retry policy for the activity. It needs to be replicated now, since the activity properties can be updated.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package temporal.server.api.replication.v1;
option go_package = "go.temporal.io/server/api/replication/v1;repication";

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";

import "temporal/server/api/enums/v1/replication.proto";
import "temporal/server/api/enums/v1/task.proto";
Expand Down Expand Up @@ -174,6 +175,11 @@ message SyncActivityTaskAttributes {
int32 stamp = 20;
// Flag indicating whether the activity is currently paused.
bool paused = 21;
// Retry policy for the activity. It needs to be replicated now, since the activity properties can be updated.
google.protobuf.Duration retry_initial_interval = 22;
google.protobuf.Duration retry_maximum_interval = 23;
int32 retry_maximum_attempts = 24;
double retry_backoff_coefficient = 25;
}

message HistoryTaskAttributes {
Expand Down
4 changes: 4 additions & 0 deletions service/history/ndc/activity_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ func (r *ActivityStateReplicatorImpl) SyncActivityState(
LastAttemptCompleteTime: request.LastAttemptCompleteTime,
Stamp: request.Stamp,
Paused: request.Paused,
RetryInitialInterval: request.RetryInitialInterval,
RetryMaximumInterval: request.RetryMaximumInterval,
RetryMaximumAttempts: request.RetryMaximumAttempts,
RetryBackoffCoefficient: request.RetryBackoffCoefficient,
},
)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions service/history/replication/executable_activity_state_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func NewExecutableActivityStateTask(
LastAttemptCompleteTime: task.LastAttemptCompleteTime,
Stamp: task.Stamp,
Paused: task.Paused,
RetryInitialInterval: task.RetryInitialInterval,
RetryMaximumInterval: task.RetryMaximumInterval,
RetryMaximumAttempts: task.RetryMaximumAttempts,
RetryBackoffCoefficient: task.RetryBackoffCoefficient,
},

batchable: true,
Expand Down
4 changes: 4 additions & 0 deletions service/history/replication/raw_task_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ func convertActivityStateReplicationTask(
LastAttemptCompleteTime: activityInfo.LastAttemptCompleteTime,
Stamp: activityInfo.Stamp,
Paused: activityInfo.Paused,
RetryInitialInterval: activityInfo.RetryInitialInterval,
RetryMaximumInterval: activityInfo.RetryMaximumInterval,
RetryMaximumAttempts: activityInfo.RetryMaximumAttempts,
RetryBackoffCoefficient: activityInfo.RetryBackoffCoefficient,
},
},
VisibilityTime: timestamppb.New(taskInfo.VisibilityTimestamp),
Expand Down
5 changes: 5 additions & 0 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,11 @@ func (ms *MutableStateImpl) UpdateActivityInfo(

ai.Paused = incomingActivityInfo.GetPaused()

ai.RetryInitialInterval = incomingActivityInfo.GetRetryInitialInterval()
ai.RetryMaximumInterval = incomingActivityInfo.GetRetryMaximumInterval()
ai.RetryMaximumAttempts = incomingActivityInfo.GetRetryMaximumAttempts()
ai.RetryBackoffCoefficient = incomingActivityInfo.GetRetryBackoffCoefficient()

ms.updateActivityInfos[ai.ScheduledEventId] = ai
ms.activityInfosUserDataUpdated[ai.ScheduledEventId] = struct{}{}
ms.approximateSize += ai.Size()
Expand Down
5 changes: 4 additions & 1 deletion tests/xdc/activity_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
ActivityOptions: &activitypb.ActivityOptions{
RetryPolicy: &commonpb.RetryPolicy{
InitialInterval: durationpb.New(2 * time.Second),
MaximumAttempts: 10,
},
},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"retry_policy.initial_interval"}},
UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"retry_policy.initial_interval", "retry_policy.maximum_attempts"}},
}
respUpdate, err := s.cluster1.Host().FrontendClient().UpdateActivityOptionsById(ctx, updateRequest)
s.NoError(err)
Expand Down Expand Up @@ -196,6 +197,8 @@ func (s *ActivityApiStateReplicationSuite) TestPauseActivityFailover() {
s.Equal(1, len(description.PendingActivities))
s.True(description.PendingActivities[0].Paused)
s.Equal(int32(1), description.PendingActivities[0].Attempt)
s.Equal(int64(2), description.PendingActivities[0].CurrentRetryInterval.GetSeconds())
s.Equal(int32(10), description.PendingActivities[0].MaximumAttempts)

// start worker2
worker2 := sdkworker.New(standbyClient, taskQueue, sdkworker.Options{})
Expand Down

0 comments on commit d6f63ea

Please sign in to comment.