Skip to content

Commit

Permalink
fix: update getFinishedPeers function and remove concurrency feature.
Browse files Browse the repository at this point in the history
Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo committed Aug 10, 2024
1 parent eae443a commit 915d880
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 96 deletions.
35 changes: 35 additions & 0 deletions pkg/rpc/dfdaemon/client/client_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,41 @@ func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grp
}, nil
}

// GetV2ByAddr returns v2 version of the dfdaemon client by address.
func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) {
conn, err := grpc.DialContext(
ctx,
target,
append([]grpc.DialOption{
grpc.WithIdleTimeout(0),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(maxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)),
),
)),
grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient(
grpc_prometheus.StreamClientInterceptor,
grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()),
)),
}, opts...)...,
)
if err != nil {
return nil, err
}

return &v2{
DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn),
ClientConn: conn,
}, nil
}

// V2 is the interface for v2 version of the grpc client.
type V2 interface {
// SyncPieces syncs pieces from the other peers.
Expand Down
129 changes: 33 additions & 96 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,11 @@ import (
"errors"
"fmt"
"io"
"math"
"strings"
"sync"
"time"

"github.com/RichardKnop/machinery/v1"
"github.com/go-playground/validator/v10"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -46,6 +39,7 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/pkg/idgen"
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)
Expand All @@ -55,12 +49,6 @@ const (
preheatTimeout = 20 * time.Minute
// deleteTaskTimeout is timeout of deleting task.
deleteTaskTimeout = 20 * time.Minute
// deleteTaskConcurrency is the number of concurrent delete tasks.
deleteTaskConcurrency = 10
// deleteTaskMaxRetries is the maximum number of retries for delete tasks.
deleteTaskMaxRetries = 3
// deleteTaskBackoffWaitBetween is waiting for a fixed period of time between calls in backoff linear.
deleteTaskBackoffWaitBetween = 500 * time.Millisecond
)

// Job is an interface for job.
Expand Down Expand Up @@ -329,7 +317,7 @@ func (j *job) listTasks(ctx context.Context, data string) (string, error) {
}

// Get all peers by task id
peers, err := j.getValidPeers(req.TaskID)
peers, err := j.getFinishedPeers(req.TaskID)
if err != nil {
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
return "", err
Expand Down Expand Up @@ -359,7 +347,7 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
}

// Get all peers by task id
peers, err := j.getValidPeers(req.TaskID)
peers, err := j.getFinishedPeers(req.TaskID)
if err != nil {
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
return "", err
Expand All @@ -369,71 +357,40 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
successTasks := make([]*internaljob.Task, 0)
failureTasks := make([]*internaljob.Task, 0)

// Create a wait group to limit delete rpc concurrency
// TODO: Create a limiter to limit delete rpc concurrency
// and avoid too many rpc requests to the host.
wg := sync.WaitGroup{}
deleteTaskLimit := make(chan struct{}, deleteTaskConcurrency)
for _, peer := range peers {
wg.Add(1)
deleteTaskLimit <- struct{}{}
go func(peer *resource.Peer) {
defer func() {
wg.Done()
<-deleteTaskLimit
}()

// Get dfdaemon client from host
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
conn, err := grpc.DialContext(
ctx,
target,
grpc.WithIdleTimeout(0),
grpc.WithDefaultCallOptions(
grpc.MaxCallRecvMsgSize(math.MaxInt32),
grpc.MaxCallSendMsgSize(math.MaxInt32),
),
grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient(
grpc_prometheus.UnaryClientInterceptor,
grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()),
grpc_retry.UnaryClientInterceptor(
grpc_retry.WithMax(deleteTaskMaxRetries),
grpc_retry.WithBackoff(grpc_retry.BackoffLinear(deleteTaskBackoffWaitBetween)),
),
)),
)
if err != nil {
logger.Errorf("create grpc client to %s failed: %s", target, err.Error())
failureTasks = append(failureTasks, &internaljob.Task{
Task: peer.Task,
Peer: peer,
Description: err.Error(),
})
return
}

dfdaemonUploadClient := dfdaemonv2.NewDfdaemonUploadClient(conn)
_, err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
TaskId: req.TaskID,
// Get dfdaemon client from host
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port)
dfdaemonUploadClient, err := dfdaemonclient.GetV2ByAddr(ctx, target)
if err != nil {
logger.Errorf("get dfdaemon client from %s failed: %s", target, err.Error())
failureTasks = append(failureTasks, &internaljob.Task{
Task: peer.Task,
Peer: peer,
Description: err.Error(),
})
if err != nil {
logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error())
failureTasks = append(failureTasks, &internaljob.Task{
Task: peer.Task,
Peer: peer,
Description: err.Error(),
})
return
}

successTasks = append(successTasks, &internaljob.Task{
continue
}
err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{
TaskId: req.TaskID,
})
if err != nil {
logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error())
failureTasks = append(failureTasks, &internaljob.Task{
Task: peer.Task,
Peer: peer,
Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target),
Description: err.Error(),
})
}(peer)
}
continue
}

wg.Wait()
successTasks = append(successTasks, &internaljob.Task{
Task: peer.Task,
Peer: peer,
Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target),
})
}

deleteTaskResponse := &internaljob.DeleteTaskResponse{
SuccessTasks: successTasks,
Expand All @@ -443,34 +400,14 @@ func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
return internaljob.MarshalResponse(deleteTaskResponse)
}

// getValidPeers try to get valid peers by task id
func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) {
// getFinishedPeers try to get valid peers by task id
func (j *job) getFinishedPeers(taskID string) ([]*resource.Peer, error) {
// get task info by task id
task, ok := j.resource.TaskManager().Load(taskID)
if !ok {
logger.Errorf("task %s not found", taskID)
return nil, fmt.Errorf("task %s not found", taskID)
}

// get peer info by task info
peers := make([]*resource.Peer, 0)
for _, vertex := range task.DAG.GetVertices() {
peer := vertex.Value
if peer == nil {
continue
}

peers = append(peers, peer)
}

// Choose finished peers as list tasks result
finishedPeers := make([]*resource.Peer, len(peers))
for _, peer := range peers {
currentState := peer.FSM.Current()
if currentState == resource.PeerStateSucceeded || currentState == resource.PeerStateFailed {
finishedPeers = append(finishedPeers, peer)
}
}

return finishedPeers, nil
return task.LoadFinishedPeers(), nil
}
19 changes: 19 additions & 0 deletions scheduler/resource/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,25 @@ func (t *Task) LoadRandomPeers(n uint) []*Peer {
return peers
}

// LoadFinishedPeers return finished peers.
func (t *Task) LoadFinishedPeers() []*Peer {
// Choose finished peers
var finishedPeers []*Peer
for _, vertex := range t.DAG.GetVertices() {
peer := vertex.Value
if peer == nil {
continue
}

currentState := peer.FSM.Current()
if currentState == PeerStateSucceeded || currentState == PeerStateFailed {
finishedPeers = append(finishedPeers, peer)
}
}

return finishedPeers
}

// StorePeer set peer.
func (t *Task) StorePeer(peer *Peer) {
t.DAG.AddVertex(peer.ID, peer) // nolint: errcheck
Expand Down

0 comments on commit 915d880

Please sign in to comment.