diff --git a/internal/job/constants.go b/internal/job/constants.go index d977f443638..c6303b375a6 100644 --- a/internal/job/constants.go +++ b/internal/job/constants.go @@ -29,6 +29,12 @@ const ( // SyncPeersJob is the name of syncing peers job. SyncPeersJob = "sync_peers" + + // ListTasksJob is the name of listing tasks job. + ListTasksJob = "list_tasks" + + // DeleteTasksJob is the name of deleting tasks job. + DeleteTaskJob = "delete_task" ) // Machinery server configuration. diff --git a/internal/job/types.go b/internal/job/types.go index cdbdb55a760..be1595d56a2 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -16,11 +16,13 @@ package job +import "d7y.io/dragonfly/v2/scheduler/resource" + type PreheatRequest struct { URL string `json:"url" validate:"required,url"` Tag string `json:"tag" validate:"omitempty"` Digest string `json:"digest" validate:"omitempty"` - FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"` + FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"` Headers map[string]string `json:"headers" validate:"omitempty"` Application string `json:"application" validate:"omitempty"` Priority int32 `json:"priority" validate:"omitempty"` @@ -28,4 +30,34 @@ type PreheatRequest struct { } type PreheatResponse struct { + TaskID string `json:"taskID"` +} + +// ListTasksRequest defines the request parameters for listing tasks. +type ListTasksRequest struct { + TaskID string `json:"task_id" validate:"required"` +} + +// ListTasksResponse defines the response parameters for listing tasks. +type ListTasksResponse struct { + Peers []*resource.Peer `json:"peers"` +} + +// DeleteTaskRequest defines the request parameters for deleting task. +type DeleteTaskRequest struct { + TaskID string `json:"task_id" validate:"required"` +} + +// Task includes information about a task along with peer details and a description. +type Task struct { + Task *resource.Task `json:"task"` + Peer *resource.Peer `json:"peer"` + Description string `json:"description"` +} + +// DeleteTaskResponse represents the response after attempting to delete tasks, +// categorizing them into successfully and unsuccessfully deleted. +type DeleteTaskResponse struct { + SuccessTasks []*Task `json:"success_tasks"` + FailureTasks []*Task `json:"failure_tasks"` } diff --git a/manager/config/config.go b/manager/config/config.go index 5c5e300d48b..29914a6b94c 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -296,6 +296,9 @@ type JobConfig struct { // Sync peers configuration. SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"` + + // Manager tasks configuration. + ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"` } type PreheatConfig struct { @@ -315,6 +318,11 @@ type SyncPeersConfig struct { Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` } +type ManagerTasksConfig struct { + // Timeout is the timeout for manager tasks information for the single scheduler. + Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` +} + type PreheatTLSClientConfig struct { // CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string. CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"` @@ -455,6 +463,9 @@ func New() *Config { Interval: DefaultJobSyncPeersInterval, Timeout: DefaultJobSyncPeersTimeout, }, + ManagerTasks: ManagerTasksConfig{ + Timeout: DefaultJobManagerTasksTimeout, + }, }, ObjectStorage: ObjectStorageConfig{ Enable: false, diff --git a/manager/config/constant_otel.go b/manager/config/constant_otel.go index 071c4f9bf92..07c34bbd4b2 100644 --- a/manager/config/constant_otel.go +++ b/manager/config/constant_otel.go @@ -19,9 +19,13 @@ package config import "go.opentelemetry.io/otel/attribute" const ( - AttributeID = attribute.Key("d7y.manager.id") - AttributePreheatType = attribute.Key("d7y.manager.preheat.type") - AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeID = attribute.Key("d7y.manager.id") + AttributePreheatType = attribute.Key("d7y.manager.preheat.type") + AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id") + AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id") + AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page") + AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page") ) const ( @@ -29,4 +33,6 @@ const ( SpanSyncPeers = "sync-peers" SpanGetLayers = "get-layers" SpanAuthWithRegistry = "auth-with-registry" + SpanDeleteTask = "delete-task" + SpanListTasks = "list-tasks" ) diff --git a/manager/config/constants.go b/manager/config/constants.go index 75e6ad8e0a7..b73a6f337da 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -98,6 +98,9 @@ const ( // DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler. DefaultJobSyncPeersTimeout = 10 * time.Minute + + // DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks. + DefaultJobManagerTasksTimeout = 10 * time.Minute ) const ( diff --git a/manager/handlers/job.go b/manager/handlers/job.go index f96ae39aaa7..e7937136173 100644 --- a/manager/handlers/job.go +++ b/manager/handlers/job.go @@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) { return } + ctx.JSON(http.StatusOK, job) + case job.DeleteTaskJob: + var json types.CreateDeleteTaskJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + + ctx.JSON(http.StatusOK, job) + case job.ListTasksJob: + var json types.CreateListTasksJobRequest + if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json) + if err != nil { + ctx.Error(err) // nolint: errcheck + return + } + ctx.JSON(http.StatusOK, job) default: ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"}) diff --git a/manager/handlers/job_test.go b/manager/handlers/job_test.go index b3d02668c59..2a7f2f53771 100644 --- a/manager/handlers/job_test.go +++ b/manager/handlers/job_test.go @@ -39,6 +39,18 @@ var ( "user_id": 4, "bio": "bio" }` + mockListTasksJobReqBody = ` + { + "type": "list_tasks", + "user_id": 4, + "bio": "bio" + }` + mockDeleteTaskJobReqBody = ` + { + "type": "delete_task", + "user_id": 4, + "bio": "bio" + }` mockOtherJobReqBody = ` { "type": "others", @@ -50,6 +62,16 @@ var ( Type: "preheat", BIO: "bio", } + mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{ + UserID: 4, + Type: "list_tasks", + BIO: "bio", + } + mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{ + UserID: 4, + Type: "delete_task", + BIO: "bio", + } mockUpdateJobRequest = types.UpdateJobRequest{ UserID: 4, BIO: "bio", @@ -61,6 +83,20 @@ var ( BIO: "bio", TaskID: "2", } + mockListTasksJobModel = &models.Job{ + BaseModel: mockBaseModel, + UserID: 4, + Type: "list_tasks", + BIO: "bio", + TaskID: "2", + } + mockDeleteTaskJobModel = &models.Job{ + BaseModel: mockBaseModel, + UserID: 4, + Type: "delete_task", + BIO: "bio", + TaskID: "2", + } ) func mockJobRouter(h *Handlers) *gin.Engine { @@ -115,6 +151,36 @@ func TestHandlers_CreateJob(t *testing.T) { assert.Equal(mockPreheatJobModel, &job) }, }, + { + name: "success", + req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + job := models.Job{} + err := json.Unmarshal(w.Body.Bytes(), &job) + assert.NoError(err) + assert.Equal(mockListTasksJobModel, &job) + }, + }, + { + name: "success", + req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)), + mock: func(ms *mocks.MockServiceMockRecorder) { + ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1) + }, + expect: func(t *testing.T, w *httptest.ResponseRecorder) { + assert := assert.New(t) + assert.Equal(http.StatusOK, w.Code) + job := models.Job{} + err := json.Unmarshal(w.Body.Bytes(), &job) + assert.NoError(err) + assert.Equal(mockDeleteTaskJobModel, &job) + }, + }, } for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { diff --git a/manager/job/job.go b/manager/job/job.go index bac8c7a6e86..e939e50f0a9 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -40,6 +40,7 @@ type Job struct { *internaljob.Job Preheat SyncPeers + ManagerTasks } // New returns a new Job. @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return nil, err } + managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout) + return &Job{ - Job: j, - Preheat: preheat, - SyncPeers: syncPeers, + Job: j, + Preheat: preheat, + SyncPeers: syncPeers, + ManagerTasks: managerTasks, }, nil } diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go new file mode 100644 index 00000000000..b42a5ce1734 --- /dev/null +++ b/manager/job/manager_tasks.go @@ -0,0 +1,128 @@ +/* + * Copyright 2024 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination mocks/manager_tasks_mock.go -source manager_tasks.go -package mocks + +package job + +import ( + "context" + "fmt" + "time" + + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" + + logger "d7y.io/dragonfly/v2/internal/dflog" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" +) + +// ManagerTask is an interface for delete and list tasks. +type ManagerTasks interface { + // CreateDeleteTask create a delete task job + CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTasksArgs) (*internaljob.GroupJobState, error) + // CreateListTasks create a list tasks job + CreateListTasks(context.Context, []models.Scheduler, types.ListTasksArgs) (*internaljob.GroupJobState, error) +} + +// managerTasks is an implementation of ManagerTasks. +type managerTasks struct { + job *internaljob.Job + registryTimeout time.Duration +} + +// newManagerTasks create a new ManagerTasks. +func newManagerTasks(job *internaljob.Job, registryTimeout time.Duration) ManagerTasks { + return &managerTasks{ + job: job, + registryTimeout: registryTimeout, + } +} + +// Create a delete task job. +func (m *managerTasks) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("delete task marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues) +} + +// Create a list tasks job. +func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models.Scheduler, json types.ListTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeListTasksID.String(json.TaskID)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("list tasks marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.ListTasksJob, args, queues) +} + +// createGroupJob creates a group job. +func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { + var signatures []*machineryv1tasks.Signature + for _, queue := range queues { + signatures = append(signatures, &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: name, + RoutingKey: queue.String(), + Args: args, + }) + } + + group, err := machineryv1tasks.NewGroup(signatures...) + if err != nil { + return nil, err + } + + var tasks []machineryv1tasks.Signature + for _, signature := range signatures { + tasks = append(tasks, *signature) + } + + logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) + if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, + CreatedAt: time.Now(), + }, nil +} diff --git a/manager/job/mocks/manager_tasks_mock.go b/manager/job/mocks/manager_tasks_mock.go new file mode 100644 index 00000000000..1d9338422de --- /dev/null +++ b/manager/job/mocks/manager_tasks_mock.go @@ -0,0 +1,72 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: manager_tasks.go +// +// Generated by this command: +// +// mockgen -destination mocks/manager_tasks_mock.go -source manager_tasks.go -package mocks +// +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + job "d7y.io/dragonfly/v2/internal/job" + models "d7y.io/dragonfly/v2/manager/models" + types "d7y.io/dragonfly/v2/manager/types" + gomock "go.uber.org/mock/gomock" +) + +// MockManagerTasks is a mock of ManagerTasks interface. +type MockManagerTasks struct { + ctrl *gomock.Controller + recorder *MockManagerTasksMockRecorder +} + +// MockManagerTasksMockRecorder is the mock recorder for MockManagerTasks. +type MockManagerTasksMockRecorder struct { + mock *MockManagerTasks +} + +// NewMockManagerTasks creates a new mock instance. +func NewMockManagerTasks(ctrl *gomock.Controller) *MockManagerTasks { + mock := &MockManagerTasks{ctrl: ctrl} + mock.recorder = &MockManagerTasksMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockManagerTasks) EXPECT() *MockManagerTasksMockRecorder { + return m.recorder +} + +// CreateDeleteTask mocks base method. +func (m *MockManagerTasks) CreateDeleteTask(arg0 context.Context, arg1 []models.Scheduler, arg2 types.DeleteTasksArgs) (*job.GroupJobState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDeleteTask", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDeleteTask indicates an expected call of CreateDeleteTask. +func (mr *MockManagerTasksMockRecorder) CreateDeleteTask(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTask", reflect.TypeOf((*MockManagerTasks)(nil).CreateDeleteTask), arg0, arg1, arg2) +} + +// CreateListTasks mocks base method. +func (m *MockManagerTasks) CreateListTasks(arg0 context.Context, arg1 []models.Scheduler, arg2 types.ListTasksArgs) (*job.GroupJobState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateListTasks", arg0, arg1, arg2) + ret0, _ := ret[0].(*job.GroupJobState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateListTasks indicates an expected call of CreateListTasks. +func (mr *MockManagerTasksMockRecorder) CreateListTasks(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateListTasks", reflect.TypeOf((*MockManagerTasks)(nil).CreateListTasks), arg0, arg1, arg2) +} diff --git a/manager/service/job.go b/manager/service/job.go index f2836a05fc9..3b4319babec 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -75,6 +75,87 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } +func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateDeleteTask(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil +} + +func (s *service) CreateListTasksJob(ctx context.Context, json types.CreateListTasksJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateListTasks(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil + +} + func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index 77eba9d9a72..81294cf9eab 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -169,6 +169,36 @@ func (m *MockService) CreateConfig(arg0 context.Context, arg1 types.CreateConfig return ret0, ret1 } +// CreateDeleteTaskJob mocks base method. +func (m *MockService) CreateDeleteTaskJob(arg0 context.Context, arg1 types.CreateDeleteTaskJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDeleteTaskJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDeleteTaskJob indicates an expected call of CreateDeleteTaskJob. +func (mr *MockServiceMockRecorder) CreateDeleteTaskJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTaskJob", reflect.TypeOf((*MockService)(nil).CreateDeleteTaskJob), arg0, arg1) +} + +// CreateListTasksJob mocks base method. +func (m *MockService) CreateListTasksJob(arg0 context.Context, arg1 types.CreateListTasksJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateListTasksJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateListTasksJob indicates an expected call of CreateListTasksJob. +func (mr *MockServiceMockRecorder) CreateListTasksJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateListTasksJob", reflect.TypeOf((*MockService)(nil).CreateListTasksJob), arg0, arg1) +} + // CreateConfig indicates an expected call of CreateConfig. func (mr *MockServiceMockRecorder) CreateConfig(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() diff --git a/manager/service/service.go b/manager/service/service.go index bde2c0c860c..dc413d0020d 100644 --- a/manager/service/service.go +++ b/manager/service/service.go @@ -114,6 +114,8 @@ type Service interface { GetConfigs(context.Context, types.GetConfigsQuery) ([]models.Config, int64, error) CreatePreheatJob(context.Context, types.CreatePreheatJobRequest) (*models.Job, error) + CreateDeleteTaskJob(context.Context, types.CreateDeleteTaskJobRequest) (*models.Job, error) + CreateListTasksJob(context.Context, types.CreateListTasksJobRequest) (*models.Job, error) DestroyJob(context.Context, uint) error UpdateJob(context.Context, uint, types.UpdateJobRequest) (*models.Job, error) GetJob(context.Context, uint) (*models.Job, error) diff --git a/manager/types/job.go b/manager/types/job.go index 4d3a286ce33..15e5265fd6a 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -57,6 +57,32 @@ type CreatePreheatJobRequest struct { SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } +type CreateDeleteTaskJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args DeleteTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type DeleteTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` +} + +type CreateListTasksJobRequest struct { + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args ListTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` +} + +type ListTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` +} + type PreheatArgs struct { // Type is the preheating type, support image and file. Type string `json:"type" binding:"required,oneof=image file"` @@ -68,10 +94,10 @@ type PreheatArgs struct { Tag string `json:"tag" binding:"omitempty"` // FilteredQueryParams is the filtered query params for preheating. - FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` // PieceLength is the piece length for preheating. - PieceLength uint32 `json:"pieceLength" binding:"omitempty"` + PieceLength uint32 `json:"piece_length" binding:"omitempty"` // Headers is the http headers for authentication. Headers map[string]string `json:"headers" binding:"omitempty"` diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index f4c0c136f16..a92f1ea4eec 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -83,6 +83,41 @@ func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grp }, nil } +// GetV2ByAddr returns v2 version of the dfdaemon client by address. +func GetV2ByAddr(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) { + conn, err := grpc.DialContext( + ctx, + target, + append([]grpc.DialOption{ + grpc.WithIdleTimeout(0), + grpc.WithDefaultCallOptions( + grpc.MaxCallRecvMsgSize(math.MaxInt32), + grpc.MaxCallSendMsgSize(math.MaxInt32), + ), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, opts...)..., + ) + if err != nil { + return nil, err + } + + return &v2{ + DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn), + ClientConn: conn, + }, nil +} + // V2 is the interface for v2 version of the grpc client. type V2 interface { // SyncPieces syncs pieces from the other peers. diff --git a/scheduler/job/job.go b/scheduler/job/job.go index e497cfdbf0b..82c56a5a6d7 100644 --- a/scheduler/job/job.go +++ b/scheduler/job/job.go @@ -39,6 +39,7 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" internaljob "d7y.io/dragonfly/v2/internal/job" "d7y.io/dragonfly/v2/pkg/idgen" + dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" ) @@ -46,6 +47,8 @@ import ( const ( // preheatTimeout is timeout of preheating. preheatTimeout = 20 * time.Minute + // deleteTaskTimeout is timeout of deleting task. + deleteTaskTimeout = 20 * time.Minute ) // Job is an interface for job. @@ -109,8 +112,10 @@ func New(cfg *config.Config, resource resource.Resource) (Job, error) { } namedJobFuncs := map[string]any{ - internaljob.PreheatJob: t.preheat, - internaljob.SyncPeersJob: t.syncPeers, + internaljob.PreheatJob: t.preheat, + internaljob.SyncPeersJob: t.syncPeers, + internaljob.ListTasksJob: t.listTasks, + internaljob.DeleteTaskJob: t.deleteTask, } if err := localJob.RegisterJob(namedJobFuncs); err != nil { @@ -297,3 +302,112 @@ func (j *job) syncPeers() (string, error) { return internaljob.MarshalResponse(hosts) } + +// listTasks is a job to list tasks. +func (j *job) listTasks(ctx context.Context, data string) (string, error) { + req := &internaljob.ListTasksRequest{} + if err := internaljob.UnmarshalRequest(data, req); err != nil { + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) + return "", err + } + + if err := validator.New().Struct(req); err != nil { + logger.Errorf("listTasks %s validate failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Get all peers by task id + peers, err := j.getFinishedPeers(req.TaskID) + if err != nil { + logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) + return "", err + } + + listTaskResponse := &internaljob.ListTasksResponse{ + Peers: peers, + } + + return internaljob.MarshalResponse(listTaskResponse) +} + +// deleteTask is a job to delete task. +func (j *job) deleteTask(ctx context.Context, data string) (string, error) { + ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout) + defer cancel() + + req := &internaljob.DeleteTaskRequest{} + if err := internaljob.UnmarshalRequest(data, req); err != nil { + logger.Errorf("unmarshal request err: %s, request body: %s", err.Error(), data) + return "", err + } + + if err := validator.New().Struct(req); err != nil { + logger.Errorf("deleteTask %s validate failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Get all peers by task id + peers, err := j.getFinishedPeers(req.TaskID) + if err != nil { + logger.Errorf("get peers by task id %s failed: %s", req.TaskID, err.Error()) + return "", err + } + + // Delete task by task id and host id + successTasks := make([]*internaljob.Task, 0) + failureTasks := make([]*internaljob.Task, 0) + + // TODO: Create a limiter to limit delete rpc concurrency + // and avoid too many rpc requests to the host. + for _, peer := range peers { + // Get dfdaemon client from host + target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) + dfdaemonUploadClient, err := dfdaemonclient.GetV2ByAddr(ctx, target) + if err != nil { + logger.Errorf("get dfdaemon client from %s failed: %s", target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: err.Error(), + }) + continue + } + err = dfdaemonUploadClient.DeleteCacheTask(ctx, &dfdaemonv2.DeleteCacheTaskRequest{ + TaskId: req.TaskID, + }) + if err != nil { + logger.Errorf("delete task %s from %s failed: %s", req.TaskID, target, err.Error()) + failureTasks = append(failureTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: err.Error(), + }) + continue + } + + successTasks = append(successTasks, &internaljob.Task{ + Task: peer.Task, + Peer: peer, + Description: fmt.Sprintf("delete task %s from %s success", req.TaskID, target), + }) + } + + deleteTaskResponse := &internaljob.DeleteTaskResponse{ + SuccessTasks: successTasks, + FailureTasks: failureTasks, + } + + return internaljob.MarshalResponse(deleteTaskResponse) +} + +// getFinishedPeers try to get valid peers by task id +func (j *job) getFinishedPeers(taskID string) ([]*resource.Peer, error) { + // get task info by task id + task, ok := j.resource.TaskManager().Load(taskID) + if !ok { + logger.Errorf("task %s not found", taskID) + return nil, fmt.Errorf("task %s not found", taskID) + } + + return task.LoadFinishedPeers(), nil +} diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index b5a3ac864dd..a9d0caa4c8c 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -249,6 +249,25 @@ func (t *Task) LoadRandomPeers(n uint) []*Peer { return peers } +// LoadFinishedPeers return finished peers. +func (t *Task) LoadFinishedPeers() []*Peer { + // Choose finished peers + var finishedPeers []*Peer + for _, vertex := range t.DAG.GetVertices() { + peer := vertex.Value + if peer == nil { + continue + } + + currentState := peer.FSM.Current() + if currentState == PeerStateSucceeded || currentState == PeerStateFailed { + finishedPeers = append(finishedPeers, peer) + } + } + + return finishedPeers +} + // StorePeer set peer. func (t *Task) StorePeer(peer *Peer) { t.DAG.AddVertex(peer.ID, peer) // nolint: errcheck