Skip to content

Commit

Permalink
feat: add delete task job and list tasks job in scheduler job impleme…
Browse files Browse the repository at this point in the history
…ntation.

Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo committed Aug 5, 2024
1 parent df2ae9c commit 77ac45c
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 4 deletions.
38 changes: 37 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,52 @@

package job

import "d7y.io/dragonfly/v2/scheduler/resource"

type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
}

type PreheatResponse struct {
TaskID string `json:"taskID"`
}

// ListTasksRequest defines the request parameters for listing tasks.
type ListTasksRequest struct {
TaskID string `json:"task_id" validate:"required"`
Page int `json:"page" validate:"required"`
PerPage int `json:"per_page" validate:"required"`
}

// ListTasksResponse defines the response parameters for listing tasks.
type ListTasksResponse struct {
Peers []*resource.Peer `json:"peers"`
Page int `json:"page"`
Total int `json:"total"`
}

// DeleteTaskRequest defines the request parameters for deleting task.
type DeleteTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// TaskInfo includes information about a task along with peer details and a description.
type TaskInfo struct {
Task *resource.Task `json:"task"`
Peer *resource.Peer `json:"peer"`
Desc string `json:"desc"`
}

// DeleteTaskResponse represents the response after attempting to delete tasks,
// categorizing them into successfully and unsuccessfully deleted.
type DeleteTaskResponse struct {
SuccessTasks []*TaskInfo `json:"success_tasks"`
FailureTasks []*TaskInfo `json:"failure_tasks"`
}
2 changes: 1 addition & 1 deletion manager/job/manager_tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []m

logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err)
logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err)
return nil, err
}

Expand Down
125 changes: 123 additions & 2 deletions scheduler/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ import (
const (
// preheatTimeout is timeout of preheating.
preheatTimeout = 20 * time.Minute
// listTasksTimeout is timeout of listing tasks.
listTasksTimeout = 10 * time.Minute
// deleteTaskTimeout is timeout of deleting task.
deleteTaskTimeout = 20 * time.Minute
)

// Job is an interface for job.
Expand Down Expand Up @@ -109,8 +113,10 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) {
}

namedJobFuncs := map[string]any{
internaljob.PreheatJob: t.preheat,
internaljob.SyncPeersJob: t.syncPeers,
internaljob.PreheatJob: t.preheat,
internaljob.SyncPeersJob: t.syncPeers,
internaljob.ListTasksJob: t.listTasks,
internaljob.DeleteTaskJob: t.deleteTask,
}

if err := localJob.RegisterJob(namedJobFuncs); err != nil {
Expand Down Expand Up @@ -297,3 +303,118 @@ func (j *job) syncPeers() (string, error) {

return internaljob.MarshalResponse(hosts)
}

// listTasks is a job to list tasks.
func (j *job) listTasks(ctx context.Context, data string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, listTasksTimeout)
defer cancel()

req := &internaljob.ListTasksRequest{}
if err := internaljob.UnmarshalRequest(data, req); err != nil {
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data)
return "", err
}

if err := validator.New().Struct(req); err != nil {
logger.Errorf("listTasks %s validate failed: %s", req.TaskID, err.Error())
return "", err
}

// Get all peers by task id
peers, err := j.getPeers(req.TaskID)
if err != nil {
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
return "", err
}

// Return peers by page
listTaskResponse := &internaljob.ListTasksResponse{
Total: len(peers),
Page: req.Page,
Peers: peers[req.Page*req.PerPage : (req.Page+1)*req.PerPage],
}

return internaljob.MarshalResponse(listTaskResponse)
}

// deleteTask is a job to delete task.
func (j *job) deleteTask(ctx context.Context, data string) (string, error) {
ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout)
defer cancel()

req := &internaljob.DeleteTaskRequest{}
if err := internaljob.UnmarshalRequest(data, req); err != nil {
logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data)
return "", err
}

if err := validator.New().Struct(req); err != nil {
logger.Errorf("deleteTask %s validate failed: %s", req.TaskID, err.Error())
return "", err
}

// Get all peers by task id
peers, err := j.getPeers(req.TaskID)
if err != nil {
logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error())
return "", err
}

// Delete task by task id and host id
successTasks := make([]*internaljob.TaskInfo, 0)
failureTasks := make([]*internaljob.TaskInfo, 0)

for _, peer := range peers {
// hostID := peer.Host.ID
// get task info by task id
task, ok := j.resource.TaskManager().Load(req.TaskID)
if !ok {
logger.Errorf("task %s not found", req.TaskID)
failureTasks = append(failureTasks, &internaljob.TaskInfo{
Task: nil,
Peer: peer,
Desc: "task not found",
})
continue
}

// TODO: change to scheduler delete task grpc function
// and add batch delete

successTasks = append(successTasks, &internaljob.TaskInfo{
Task: task,
Peer: peer,
Desc: "success",
})
}

deleteTaskResponse := &internaljob.DeleteTaskResponse{
SuccessTasks: successTasks,
FailureTasks: failureTasks,
}

return internaljob.MarshalResponse(deleteTaskResponse)
}

// getPeers try to get peers by task id
func (j *job) getPeers(taskID string) ([]*resource.Peer, error) {
// get task info by task id
task, ok := j.resource.TaskManager().Load(taskID)
if !ok {
logger.Errorf("task %s not found", taskID)
return nil, fmt.Errorf("task %s not found", taskID)
}

// get peer info by task info
peers := make([]*resource.Peer, 0)
for _, vertex := range task.DAG.GetVertices() {
peer := vertex.Value
if peer == nil {
continue
}

peers = append(peers, peer)
}

return peers, nil
}

0 comments on commit 77ac45c

Please sign in to comment.