Skip to content

Commit

Permalink
modify for code review
Browse files Browse the repository at this point in the history
Signed-off-by: cormick <[email protected]>
  • Loading branch information
CormickKneey committed Nov 25, 2024
1 parent 74b3663 commit c2a6c20
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 79 deletions.
4 changes: 2 additions & 2 deletions manager/database/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,14 @@ func newMysql(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

// AsyncSyncPeers migration.
// Run migration.
if mysqlCfg.Migrate {
if err := migrate(db); err != nil {
return nil, err
}
}

// AsyncSyncPeers seed.
// Run seed.
if err := seed(db); err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions manager/database/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ func newPostgres(cfg *config.Config) (*gorm.DB, error) {
return nil, err
}

// AsyncSyncPeers migration.
// Run migration.
if postgresCfg.Migrate {
if err := migrate(db); err != nil {
return nil, err
}
}

// AsyncSyncPeers seed.
// Run seed.
if err := seed(db); err != nil {
return nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions manager/job/mocks/sync_peers_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 41 additions & 53 deletions manager/job/sync_peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync"
"time"

"gorm.io/gorm/clause"

machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
Expand All @@ -41,8 +43,8 @@ import (

// SyncPeers is an interface for sync peers.
type SyncPeers interface {
// AsyncSyncPeers execute action to sync peers, which is async.
AsyncSyncPeers(context.Context, SyncPeersArgs) error
// Run execute action to sync peers, which is async.
Run(context.Context, SyncPeersArgs) error

// Serve started sync peers server.
Serve()
Expand Down Expand Up @@ -70,15 +72,17 @@ type SyncPeersArgs struct {
// newSyncPeers returns a new SyncPeers.
func newSyncPeers(cfg *config.Config, job *internaljob.Job, gdb *gorm.DB) (SyncPeers, error) {
return &syncPeers{
config: cfg,
db: gdb,
job: job,
done: make(chan struct{}),
config: cfg,
db: gdb,
job: job,
done: make(chan struct{}),
workChan: make(chan SyncPeersArgs, 10),
syncLocker: sync.Mutex{},
}, nil
}

// AsyncSyncPeers start to sync peers.
func (s *syncPeers) AsyncSyncPeers(ctx context.Context, args SyncPeersArgs) error {
// Run start to sync peers.
func (s *syncPeers) Run(ctx context.Context, args SyncPeersArgs) error {
if len(args.CandidateSchedulerClusters) == 0 {
if err := s.db.WithContext(ctx).Find(&args.CandidateSchedulerClusters).Error; err != nil {
return fmt.Errorf("failed to get candidate scheduler clusters: %v", err)
Expand All @@ -91,14 +95,17 @@ func (s *syncPeers) AsyncSyncPeers(ctx context.Context, args SyncPeersArgs) erro

// Serve started sync peers server.
func (s *syncPeers) Serve() {
tick := time.NewTicker(s.config.Job.SyncPeers.Interval)
ticker := time.NewTicker(s.config.Job.SyncPeers.Interval)
defer ticker.Stop()
for {
select {
case <-tick.C:
case <-ticker.C:
logger.Debugf("start to sync peerrs periodically")
if err := s.syncPeers(context.Background(), nil); err != nil {
logger.Errorf("sync peers failed periodically: %v", err)
}
case args := <-s.workChan:
logger.Debugf("start to sync peerrs for request")
err := s.syncPeers(context.Background(), args.CandidateSchedulerClusters)
if err != nil {
logger.Errorf("sync peers failed for request: %v", err)
Expand All @@ -110,7 +117,7 @@ func (s *syncPeers) Serve() {
if err == nil {
state = machineryv1tasks.StateSuccess
}
if updateErr := s.db.WithContext(context.Background()).First(&job, args.TaskID).Updates(models.Job{
if updateErr := s.db.WithContext(context.Background()).First(&job, "task_id = ?", args.TaskID).Updates(models.Job{
State: state,
}).Error; updateErr != nil {
logger.Errorf("update sync peers job result failed for request: %v", updateErr)
Expand Down Expand Up @@ -228,76 +235,57 @@ func (s *syncPeers) mergePeers(ctx context.Context, scheduler models.Scheduler,
}

// Calculate differences using diffPeers function
toAdd, toUpdate, toDelete := diffPeers(existingPeers, results)

// Perform database updates based on the differences
if err := s.db.WithContext(ctx).CreateInBatches(toAdd, s.config.Job.SyncPeers.BatchSize).Error; err != nil {
log.Error(err)
}

for _, peer := range toUpdate {
if err := s.db.WithContext(ctx).Model(&models.Peer{}).Where("id = ?", peer.ID).Updates(peer).Error; err != nil {
toUpsert, toDelete := diffPeers(existingPeers, results)

// Perform batch upsert
if len(toUpsert) > 0 {
// Construct the upsert query
if err := s.db.WithContext(ctx).
Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
UpdateAll: true,
}).
CreateInBatches(toUpsert, s.config.Job.SyncPeers.BatchSize).
Error; err != nil {
log.Error(err)
}
}

for _, peer := range toDelete {
if err := s.db.WithContext(ctx).Unscoped().Delete(&models.Peer{}, peer.ID).Error; err != nil {
// Perform batch delete
if len(toDelete) > 0 {
if err := s.db.WithContext(ctx).
Delete(&toDelete).
Error; err != nil {
log.Error(err)
}
}
}

func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toAdd, toUpdate, toDelete []models.Peer) {
func diffPeers(existingPeers []models.Peer, currentPeers []*resource.Host) (toUpsert, toDelete []models.Peer) {
// Convert current peers to a map for quick lookup
currentPeersMap := lo.KeyBy[string, *resource.Host](currentPeers, func(item *resource.Host) string {
return item.ID
})

// // Convert existing peers to a map for quick lookup
// Convert existing peers to a map for quick lookup
existingPeersMap := lo.KeyBy[string, models.Peer](existingPeers, func(item models.Peer) string {
return idgen.HostIDV2(item.IP, item.Hostname, types.ParseHostType(item.Type) != types.HostTypeNormal)
})

// Calculate differences
for id, currentPeer := range currentPeersMap {
if existingPeer, ok := existingPeersMap[id]; ok {
// Peer exists in both, check if it needs to be updated
if !isPeerEqual(existingPeer, *currentPeer) {
toUpdate = append(toUpdate, convertToModelPeer(*currentPeer))
}
if _, ok := existingPeersMap[id]; ok {
// Remove from existingPeersMap to mark it as processed
delete(existingPeersMap, id)
} else {
// Peer exists in currentPeers but not in existingPeers, add it
toAdd = append(toAdd, convertToModelPeer(*currentPeer))
}
// Add all current peers to upsert list
toUpsert = append(toUpsert, convertToModelPeer(*currentPeer))
}

// Peers left in existingPeersMap are to be deleted
toDelete = lo.Values(existingPeersMap)

return toAdd, toUpdate, lo.Values(existingPeersMap)
}

// Helper function to check if two peers are equal
func isPeerEqual(peer models.Peer, currentPeer resource.Host) bool {
// Implement the equality check based on your requirements
// For example, compare all fields that should be considered for equality
return peer.Type == currentPeer.Type.Name() &&
peer.IDC == currentPeer.Network.IDC &&
peer.Location == currentPeer.Network.Location &&
peer.Port == currentPeer.Port &&
peer.DownloadPort == currentPeer.DownloadPort &&
peer.ObjectStoragePort == currentPeer.ObjectStoragePort &&
peer.OS == currentPeer.OS &&
peer.Platform == currentPeer.Platform &&
peer.PlatformFamily == currentPeer.PlatformFamily &&
peer.PlatformVersion == currentPeer.PlatformVersion &&
peer.KernelVersion == currentPeer.KernelVersion &&
peer.GitVersion == currentPeer.Build.GitVersion &&
peer.GitCommit == currentPeer.Build.GitCommit &&
peer.BuildPlatform == currentPeer.Build.Platform
return toUpsert, toDelete
}

// Helper function to convert resource.Host to models.Peer
Expand Down
22 changes: 8 additions & 14 deletions manager/job/sync_peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
package job

import (
"sort"
"testing"

"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"github.com/stretchr/testify/assert"
"sort"
"testing"
)

func Test_diffPeers(t *testing.T) {
Expand All @@ -34,8 +35,7 @@ func Test_diffPeers(t *testing.T) {
tests := []struct {
name string
args args
wantToAdd []models.Peer
wantToUpdate []models.Peer
wantToUpsert []models.Peer
wantToDelete []models.Peer
}{
{
Expand Down Expand Up @@ -77,12 +77,10 @@ func Test_diffPeers(t *testing.T) {
types.HostTypeSuperSeed), // append only
},
},
wantToAdd: []models.Peer{
generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed),
},
wantToUpdate: []models.Peer{
wantToUpsert: []models.Peer{
generateModePeer("127.0.0.1", "foo1", 80, 80, types.HostTypeSuperSeed),
generateModePeer("127.0.0.2", "foo2", 80, 80, types.HostTypeSuperSeed),
generateModePeer("127.0.0.3", "foo3", 80, 80, types.HostTypeSuperSeed),
},
wantToDelete: []models.Peer{
generateModePeer("127.0.0.4", "foo4", 80, 80, types.HostTypeNormal),
Expand All @@ -93,19 +91,15 @@ func Test_diffPeers(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotToAdd, gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers)
gotToUpdate, gotToDelete := diffPeers(tt.args.existingPeers, tt.args.currentPeers)
// sort the result to compare
sort.Slice(gotToAdd, func(i, j int) bool {
return gotToAdd[i].IP < gotToAdd[j].IP
})
sort.Slice(gotToUpdate, func(i, j int) bool {
return gotToUpdate[i].IP < gotToUpdate[j].IP
})
sort.Slice(gotToDelete, func(i, j int) bool {
return gotToDelete[i].IP < gotToDelete[j].IP
})
assert.Equalf(t, tt.wantToAdd, gotToAdd, "diffPeers toAdd(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
assert.Equalf(t, tt.wantToUpdate, gotToUpdate, "diffPeers toUpdate(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
assert.Equalf(t, tt.wantToUpsert, gotToUpdate, "diffPeers toUpsert(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
assert.Equalf(t, tt.wantToDelete, gotToDelete, "diffPeers toDelete(%v, %v)", tt.args.existingPeers, tt.args.currentPeers)
})
}
Expand Down
11 changes: 9 additions & 2 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package service

import (
"context"
"d7y.io/dragonfly/v2/manager/job"
"errors"
"fmt"

"d7y.io/dragonfly/v2/manager/job"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"

Expand All @@ -36,6 +37,11 @@ import (
)

func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncPeersJobRequest) (*models.Job, error) {
args, err := structure.StructToMap(json)
if err != nil {
return nil, err
}

candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs, nil)
if err != nil {
return nil, err
Expand All @@ -48,7 +54,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP

taskID := fmt.Sprintf("manager_%v", uuid.New().String())

if err = s.job.SyncPeers.AsyncSyncPeers(ctx, job.SyncPeersArgs{
if err = s.job.SyncPeers.Run(ctx, job.SyncPeersArgs{
CandidateSchedulerClusters: candidateClusters,
TaskID: taskID,
}); err != nil {
Expand All @@ -59,6 +65,7 @@ func (s *service) CreateSyncPeersJob(ctx context.Context, json types.CreateSyncP
job := models.Job{
TaskID: taskID,
BIO: json.BIO,
Args: args,
Type: json.Type,
State: machineryv1tasks.StateStarted,
UserID: json.UserID,
Expand Down

0 comments on commit c2a6c20

Please sign in to comment.