From c2a6c200b15108222834be416676f49786af4b4f Mon Sep 17 00:00:00 2001 From: cormick Date: Sun, 17 Nov 2024 18:14:44 +0800 Subject: [PATCH] modify for code review Signed-off-by: cormick --- manager/database/mysql.go | 4 +- manager/database/postgres.go | 4 +- manager/job/mocks/sync_peers_mock.go | 12 ++-- manager/job/sync_peers.go | 94 ++++++++++++---------------- manager/job/sync_peers_test.go | 22 +++---- manager/service/job.go | 11 +++- 6 files changed, 68 insertions(+), 79 deletions(-) diff --git a/manager/database/mysql.go b/manager/database/mysql.go index 81d3d60cca7..aa264fd52b2 100644 --- a/manager/database/mysql.go +++ b/manager/database/mysql.go @@ -71,14 +71,14 @@ func newMysql(cfg *config.Config) (*gorm.DB, error) { return nil, err } - // AsyncSyncPeers migration. + // Run migration. if mysqlCfg.Migrate { if err := migrate(db); err != nil { return nil, err } } - // AsyncSyncPeers seed. + // Run seed. if err := seed(db); err != nil { return nil, err } diff --git a/manager/database/postgres.go b/manager/database/postgres.go index 4b3f11de434..1ce101691ac 100644 --- a/manager/database/postgres.go +++ b/manager/database/postgres.go @@ -49,14 +49,14 @@ func newPostgres(cfg *config.Config) (*gorm.DB, error) { return nil, err } - // AsyncSyncPeers migration. + // Run migration. if postgresCfg.Migrate { if err := migrate(db); err != nil { return nil, err } } - // AsyncSyncPeers seed. + // Run seed. if err := seed(db); err != nil { return nil, err } diff --git a/manager/job/mocks/sync_peers_mock.go b/manager/job/mocks/sync_peers_mock.go index 68bf646e420..49f2ace24c1 100644 --- a/manager/job/mocks/sync_peers_mock.go +++ b/manager/job/mocks/sync_peers_mock.go @@ -41,18 +41,18 @@ func (m *MockSyncPeers) EXPECT() *MockSyncPeersMockRecorder { return m.recorder } -// AsyncSyncPeers mocks base method. -func (m *MockSyncPeers) AsyncSyncPeers(arg0 context.Context, arg1 job.SyncPeersArgs) error { +// Run mocks base method. +func (m *MockSyncPeers) Run(arg0 context.Context, arg1 job.SyncPeersArgs) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AsyncSyncPeers", arg0, arg1) + ret := m.ctrl.Call(m, "Run", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// AsyncSyncPeers indicates an expected call of AsyncSyncPeers. -func (mr *MockSyncPeersMockRecorder) AsyncSyncPeers(arg0, arg1 any) *gomock.Call { +// Run indicates an expected call of Run. +func (mr *MockSyncPeersMockRecorder) Run(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AsyncSyncPeers", reflect.TypeOf((*MockSyncPeers)(nil).AsyncSyncPeers), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockSyncPeers)(nil).Run), arg0, arg1) } // Serve mocks base method. diff --git a/manager/job/sync_peers.go b/manager/job/sync_peers.go index b56c34e8df3..6f9f55106f3 100644 --- a/manager/job/sync_peers.go +++ b/manager/job/sync_peers.go @@ -24,6 +24,8 @@ import ( "sync" "time" + "gorm.io/gorm/clause" + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" "github.com/google/uuid" "go.opentelemetry.io/otel/trace" @@ -41,8 +43,8 @@ import ( // SyncPeers is an interface for sync peers. type SyncPeers interface { - // AsyncSyncPeers execute action to sync peers, which is async. - AsyncSyncPeers(context.Context, SyncPeersArgs) error + // Run execute action to sync peers, which is async. + Run(context.Context, SyncPeersArgs) error // Serve started sync peers server. Serve() @@ -70,15 +72,17 @@ type SyncPeersArgs struct { // newSyncPeers returns a new SyncPeers. func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) { return &syncPeers{ - config: cfg, - db: gdb, - job: job, - done: make(chan struct{}), + config: cfg, + db: gdb, + job: job, + done: make(chan struct{}), + workChan: make(chan SyncPeersArgs, 10), + syncLocker: sync.Mutex{}, }, nil } -// AsyncSyncPeers start to sync peers. -func (s *syncPeers) AsyncSyncPeers(ctx context.Context, args SyncPeersArgs) error { +// Run start to sync peers. +func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error { if len(args.CandidateSchedulerClusters) == 0 { if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil { return fmt.Errorf("failed to get candidate scheduler clusters: %v", err) @@ -91,14 +95,17 @@ func (s *syncPeers) AsyncSyncPeers(ctx context.Context, args SyncPeersArgs) erro // Serve started sync peers server. func (s *syncPeers) Serve() { - tick := time.NewTicker(s.config.Job.SyncPeers.Interval) + ticker := time.NewTicker(s.config.Job.SyncPeers.Interval) + defer ticker.Stop() for { select { - case <-tick.C: + case <-ticker.C: + logger.Debugf("start to sync peerrs periodically") if err := s.syncPeers(context.Background(), nil); err != nil { logger.Errorf("sync peers failed periodically: %v", err) } case args := <-s.workChan: + logger.Debugf("start to sync peerrs for request") err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters) if err != nil { logger.Errorf("sync peers failed for request: %v", err) @@ -110,7 +117,7 @@ func (s *syncPeers) Serve() { if err == nil { state = machineryv1tasks.StateSuccess } - if updateErr := s.db.WithContext(context.Background()).First(&job, args.TaskID).Updates(models.Job{ + if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{ State: state, }).Error; updateErr != nil { logger.Errorf("update sync peers job result failed for request: %v", updateErr) @@ -228,76 +235,57 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler, } // Calculate differences using diffPeers function - toAdd, toUpdate, toDelete := diffPeers(existingPeers, results) - - // Perform database updates based on the differences - if err := s.db.WithContext(ctx).CreateInBatches(toAdd, s.config.Job.SyncPeers.BatchSize).Error; err != nil { - log.Error(err) - } - - for _, peer := range toUpdate { - if err := s.db.WithContext(ctx).Model(&models.Peer{}).Where("id = ?", peer.ID).Updates(peer).Error; err != nil { + toUpsert, toDelete := diffPeers(existingPeers, results) + + // Perform batch upsert + if len(toUpsert) > 0 { + // Construct the upsert query + if err := s.db.WithContext(ctx). + Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + UpdateAll: true, + }). + CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize). + Error; err != nil { log.Error(err) } } - for _, peer := range toDelete { - if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil { + // Perform batch delete + if len(toDelete) > 0 { + if err := s.db.WithContext(ctx). + Delete(&toDelete). + Error; err != nil { log.Error(err) } } } -func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toAdd, toUpdate, toDelete []models.Peer) { +func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) { // Convert current peers to a map for quick lookup currentPeersMap := lo.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string { return item.ID }) - // // Convert existing peers to a map for quick lookup + // Convert existing peers to a map for quick lookup existingPeersMap := lo.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string { return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal) }) // Calculate differences for id, currentPeer := range currentPeersMap { - if existingPeer, ok := existingPeersMap[id]; ok { - // Peer exists in both, check if it needs to be updated - if !isPeerEqual(existingPeer, *currentPeer) { - toUpdate = append(toUpdate, convertToModelPeer(*currentPeer)) - } + if _, ok := existingPeersMap[id]; ok { // Remove from existingPeersMap to mark it as processed delete(existingPeersMap, id) - } else { - // Peer exists in currentPeers but not in existingPeers, add it - toAdd = append(toAdd, convertToModelPeer(*currentPeer)) } + // Add all current peers to upsert list + toUpsert = append(toUpsert, convertToModelPeer(*currentPeer)) } // Peers left in existingPeersMap are to be deleted toDelete = lo.Values(existingPeersMap) - return toAdd, toUpdate, lo.Values(existingPeersMap) -} - -// Helper function to check if two peers are equal -func isPeerEqual(peer models.Peer, currentPeer resource.Host) bool { - // Implement the equality check based on your requirements - // For example, compare all fields that should be considered for equality - return peer.Type == currentPeer.Type.Name() && - peer.IDC == currentPeer.Network.IDC && - peer.Location == currentPeer.Network.Location && - peer.Port == currentPeer.Port && - peer.DownloadPort == currentPeer.DownloadPort && - peer.ObjectStoragePort == currentPeer.ObjectStoragePort && - peer.OS == currentPeer.OS && - peer.Platform == currentPeer.Platform && - peer.PlatformFamily == currentPeer.PlatformFamily && - peer.PlatformVersion == currentPeer.PlatformVersion && - peer.KernelVersion == currentPeer.KernelVersion && - peer.GitVersion == currentPeer.Build.GitVersion && - peer.GitCommit == currentPeer.Build.GitCommit && - peer.BuildPlatform == currentPeer.Build.Platform + return toUpsert, toDelete } // Helper function to convert resource.Host to models.Peer diff --git a/manager/job/sync_peers_test.go b/manager/job/sync_peers_test.go index 232d0099b60..c25b3707219 100644 --- a/manager/job/sync_peers_test.go +++ b/manager/job/sync_peers_test.go @@ -17,13 +17,14 @@ package job import ( + "sort" + "testing" + "d7y.io/dragonfly/v2/manager/models" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "github.com/stretchr/testify/assert" - "sort" - "testing" ) func Test_diffPeers(t *testing.T) { @@ -34,8 +35,7 @@ func Test_diffPeers(t *testing.T) { tests := []struct { name string args args - wantToAdd []models.Peer - wantToUpdate []models.Peer + wantToUpsert []models.Peer wantToDelete []models.Peer }{ { @@ -77,12 +77,10 @@ func Test_diffPeers(t *testing.T) { types.HostTypeSuperSeed), // append only }, }, - wantToAdd: []models.Peer{ - generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed), - }, - wantToUpdate: []models.Peer{ + wantToUpsert: []models.Peer{ generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed), generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed), + generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed), }, wantToDelete: []models.Peer{ generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal), @@ -93,19 +91,15 @@ func Test_diffPeers(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - gotToAdd, gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers) + gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers) // sort the result to compare - sort.Slice(gotToAdd, func(i, j int) bool { - return gotToAdd[i].IP < gotToAdd[j].IP - }) sort.Slice(gotToUpdate, func(i, j int) bool { return gotToUpdate[i].IP < gotToUpdate[j].IP }) sort.Slice(gotToDelete, func(i, j int) bool { return gotToDelete[i].IP < gotToDelete[j].IP }) - assert.Equalf(t, tt.wantToAdd, gotToAdd, "diffPeers toAdd(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) - assert.Equalf(t, tt.wantToUpdate, gotToUpdate, "diffPeers toUpdate(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) + assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers) }) } diff --git a/manager/service/job.go b/manager/service/job.go index eecf2c9ef02..73d91a71452 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -18,9 +18,10 @@ package service import ( "context" - "d7y.io/dragonfly/v2/manager/job" "errors" "fmt" + + "d7y.io/dragonfly/v2/manager/job" machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" "github.com/google/uuid" @@ -36,6 +37,11 @@ import ( ) func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) { + args, err := structure.StructToMap(json) + if err != nil { + return nil, err + } + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil) if err != nil { return nil, err @@ -48,7 +54,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP taskID := fmt.Sprintf("manager_%v", uuid.New().String()) - if err = s.job.SyncPeers.AsyncSyncPeers(ctx, job.SyncPeersArgs{ + if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{ CandidateSchedulerClusters: candidateClusters, TaskID: taskID, }); err != nil { @@ -59,6 +65,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP job := models.Job{ TaskID: taskID, BIO: json.BIO, + Args: args, Type: json.Type, State: machineryv1tasks.StateStarted, UserID: json.UserID,