Skip to content

Commit

Permalink
Move Engine interface into common history package
Browse files Browse the repository at this point in the history
  • Loading branch information
ychebotarev committed Jan 7, 2025
1 parent 4c22972 commit d232510
Show file tree
Hide file tree
Showing 48 changed files with 243 additions and 265 deletions.
3 changes: 2 additions & 1 deletion service/history/api/respondworkflowtaskcompleted/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ import (
"go.temporal.io/server/common/testing/updateutils"
"go.temporal.io/server/internal/effect"
"go.temporal.io/server/service/history/api"
history "go.temporal.io/server/service/history/common"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (s *WorkflowTaskCompletedHandlerSuite) SetupSubTest() {
s.NoError(err)
s.mockShard.SetStateMachineRegistry(reg)

mockEngine := shard.NewMockEngine(s.controller)
mockEngine := history.NewMockEngine(s.controller)
mockEngine.EXPECT().NotifyNewHistoryEvent(gomock.Any()).AnyTimes()
mockEngine.EXPECT().NotifyNewTasks(gomock.Any()).AnyTimes()
s.mockShard.SetEngineForTesting(mockEngine)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination engine_mock.go

package shard
package common

import (
"context"
Expand All @@ -37,7 +37,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
workflowspb "go.temporal.io/server/api/workflow/v1"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/common/definition"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/namespace"
Expand Down Expand Up @@ -130,23 +129,6 @@ type (
Start()
Stop()
}

ReplicationStream interface {
SubscribeReplicationNotification() (<-chan struct{}, string)
UnsubscribeReplicationNotification(string)
ConvertReplicationTask(
ctx context.Context,
task tasks.Task,
clusterID int32,
) (*replicationspb.ReplicationTask, error)
GetReplicationTasksIter(
ctx context.Context,
pollingCluster string,
minInclusiveTaskID int64,
maxExclusiveTaskID int64,
) (collection.Iterator[tasks.Task], error)
GetMaxReplicationTaskInfo() (int64, time.Time)
}
)

type (
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 53 additions & 0 deletions service/history/common/replication_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// The MIT License
//
// Copyright (c) 2024 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package common

import (
"context"
"time"

replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common/collection"
"go.temporal.io/server/service/history/tasks"
)

type (
ReplicationStream interface {
SubscribeReplicationNotification() (<-chan struct{}, string)
UnsubscribeReplicationNotification(string)
ConvertReplicationTask(
ctx context.Context,
task tasks.Task,
clusterID int32,
) (*replicationspb.ReplicationTask, error)

GetReplicationTasksIter(
ctx context.Context,
pollingCluster string,
minInclusiveTaskID int64,
maxExclusiveTaskID int64,
) (collection.Iterator[tasks.Task], error)

GetMaxReplicationTaskInfo() (int64, time.Time)
}
)
7 changes: 4 additions & 3 deletions service/history/history_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ import (
"go.temporal.io/server/service/history/api/verifychildworkflowcompletionrecorded"
"go.temporal.io/server/service/history/api/verifyfirstworkflowtaskscheduled"
"go.temporal.io/server/service/history/circuitbreakerpool"
history "go.temporal.io/server/service/history/common"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/consts"
"go.temporal.io/server/service/history/deletemanager"
Expand Down Expand Up @@ -183,7 +184,7 @@ func NewEngineWithShardContext(
dlqWriter replication.DLQWriter,
commandHandlerRegistry *workflow.CommandHandlerRegistry,
outboundQueueCBPool *circuitbreakerpool.OutboundQueueCircuitBreakerPool,
) shard.Engine {
) history.Engine {
currentClusterName := shard.GetClusterMetadata().GetCurrentClusterName()

logger := shard.GetLogger()
Expand Down Expand Up @@ -759,14 +760,14 @@ func (e *historyEngineImpl) SyncActivities(

func (e *historyEngineImpl) SyncHSM(
ctx context.Context,
request *shard.SyncHSMRequest,
request *history.SyncHSMRequest,
) error {
return e.nDCHSMStateReplicator.SyncHSMState(ctx, request)
}

func (e *historyEngineImpl) BackfillHistoryEvents(
ctx context.Context,
request *shard.BackfillHistoryEventsRequest,
request *history.BackfillHistoryEventsRequest,
) error {
return e.nDCHistoryReplicator.BackfillHistoryEvents(ctx, request)
}
Expand Down
3 changes: 2 additions & 1 deletion service/history/history_engine_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.temporal.io/server/common/sdk"
"go.temporal.io/server/service/history/api"
"go.temporal.io/server/service/history/circuitbreakerpool"
history "go.temporal.io/server/service/history/common"
"go.temporal.io/server/service/history/configs"
"go.temporal.io/server/service/history/events"
"go.temporal.io/server/service/history/replication"
Expand Down Expand Up @@ -77,7 +78,7 @@ type (

func (f *historyEngineFactory) CreateEngine(
shard shard.Context,
) shard.Engine {
) history.Engine {
var wfCache wcache.Cache
if shard.GetConfig().EnableHostLevelHistoryCache() {
wfCache = f.WorkflowCache
Expand Down
5 changes: 3 additions & 2 deletions service/history/ndc/history_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"go.temporal.io/server/common/persistence/versionhistory"
"go.temporal.io/server/common/primitives/timestamp"
serviceerrors "go.temporal.io/server/common/serviceerror"
history "go.temporal.io/server/service/history/common"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
wcache "go.temporal.io/server/service/history/workflow/cache"
Expand Down Expand Up @@ -113,7 +114,7 @@ type (
newEvents []*historypb.HistoryEvent,
newRunID string,
) error
BackfillHistoryEvents(ctx context.Context, request *shard.BackfillHistoryEventsRequest) error
BackfillHistoryEvents(ctx context.Context, request *history.BackfillHistoryEventsRequest) error
}

HistoryReplicatorImpl struct {
Expand Down Expand Up @@ -229,7 +230,7 @@ func (r *HistoryReplicatorImpl) ApplyEvents(

func (r *HistoryReplicatorImpl) BackfillHistoryEvents(
ctx context.Context,
request *shard.BackfillHistoryEventsRequest,
request *history.BackfillHistoryEventsRequest,
) error {
task, err := newReplicationTaskFromBatch(
r.clusterMetadata,
Expand Down
7 changes: 4 additions & 3 deletions service/history/ndc/hsm_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/versionhistory"
serviceerrors "go.temporal.io/server/common/serviceerror"
history "go.temporal.io/server/service/history/common"
"go.temporal.io/server/service/history/hsm"
"go.temporal.io/server/service/history/shard"
"go.temporal.io/server/service/history/workflow"
Expand All @@ -50,7 +51,7 @@ type (
HSMStateReplicator interface {
SyncHSMState(
ctx context.Context,
request *shard.SyncHSMRequest,
request *history.SyncHSMRequest,
) error
}

Expand All @@ -76,7 +77,7 @@ func NewHSMStateReplicator(

func (r *HSMStateReplicatorImpl) SyncHSMState(
ctx context.Context,
request *shard.SyncHSMRequest,
request *history.SyncHSMRequest,
) (retError error) {
namespaceID := namespace.ID(request.WorkflowKey.GetNamespaceID())
execution := &commonpb.WorkflowExecution{
Expand Down Expand Up @@ -150,7 +151,7 @@ func (r *HSMStateReplicatorImpl) SyncHSMState(

func (r *HSMStateReplicatorImpl) syncHSMNode(
mutableState workflow.MutableState,
request *shard.SyncHSMRequest,
request *history.SyncHSMRequest,
) (bool, error) {

shouldSync, err := r.compareVersionHistory(mutableState, request.EventVersionHistory)
Expand Down
4 changes: 2 additions & 2 deletions service/history/ndc/hsm_state_replicator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit d232510

Please sign in to comment.