diff --git a/manager/config/config.go b/manager/config/config.go index 5c5e300d48b..29914a6b94c 100644 --- a/manager/config/config.go +++ b/manager/config/config.go @@ -296,6 +296,9 @@ type JobConfig struct { // Sync peers configuration. SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"` + + // Manager tasks configuration. + ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"` } type PreheatConfig struct { @@ -315,6 +318,11 @@ type SyncPeersConfig struct { Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` } +type ManagerTasksConfig struct { + // Timeout is the timeout for manager tasks information for the single scheduler. + Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"` +} + type PreheatTLSClientConfig struct { // CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string. CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"` @@ -455,6 +463,9 @@ func New() *Config { Interval: DefaultJobSyncPeersInterval, Timeout: DefaultJobSyncPeersTimeout, }, + ManagerTasks: ManagerTasksConfig{ + Timeout: DefaultJobManagerTasksTimeout, + }, }, ObjectStorage: ObjectStorageConfig{ Enable: false, diff --git a/manager/config/constant_otel.go b/manager/config/constant_otel.go index 071c4f9bf92..07c34bbd4b2 100644 --- a/manager/config/constant_otel.go +++ b/manager/config/constant_otel.go @@ -19,9 +19,13 @@ package config import "go.opentelemetry.io/otel/attribute" const ( - AttributeID = attribute.Key("d7y.manager.id") - AttributePreheatType = attribute.Key("d7y.manager.preheat.type") - AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeID = attribute.Key("d7y.manager.id") + AttributePreheatType = attribute.Key("d7y.manager.preheat.type") + AttributePreheatURL = attribute.Key("d7y.manager.preheat.url") + AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id") + AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id") + AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page") + AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page") ) const ( @@ -29,4 +33,6 @@ const ( SpanSyncPeers = "sync-peers" SpanGetLayers = "get-layers" SpanAuthWithRegistry = "auth-with-registry" + SpanDeleteTask = "delete-task" + SpanListTasks = "list-tasks" ) diff --git a/manager/config/constants.go b/manager/config/constants.go index 75e6ad8e0a7..b73a6f337da 100644 --- a/manager/config/constants.go +++ b/manager/config/constants.go @@ -98,6 +98,9 @@ const ( // DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler. DefaultJobSyncPeersTimeout = 10 * time.Minute + + // DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks. + DefaultJobManagerTasksTimeout = 10 * time.Minute ) const ( diff --git a/manager/job/job.go b/manager/job/job.go index bac8c7a6e86..e939e50f0a9 100644 --- a/manager/job/job.go +++ b/manager/job/job.go @@ -40,6 +40,7 @@ type Job struct { *internaljob.Job Preheat SyncPeers + ManagerTasks } // New returns a new Job. @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) { return nil, err } + managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout) + return &Job{ - Job: j, - Preheat: preheat, - SyncPeers: syncPeers, + Job: j, + Preheat: preheat, + SyncPeers: syncPeers, + ManagerTasks: managerTasks, }, nil } diff --git a/manager/job/manager_tasks.go b/manager/job/manager_tasks.go new file mode 100644 index 00000000000..44e45f5bc02 --- /dev/null +++ b/manager/job/manager_tasks.go @@ -0,0 +1,127 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +//go:generate mockgen -destination mocks/delete_task_mock.go -source delete_task.go -package mocks +package job + +import ( + "context" + "fmt" + "time" + + logger "d7y.io/dragonfly/v2/internal/dflog" + internaljob "d7y.io/dragonfly/v2/internal/job" + "d7y.io/dragonfly/v2/manager/config" + "d7y.io/dragonfly/v2/manager/models" + "d7y.io/dragonfly/v2/manager/types" + machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks" + "github.com/google/uuid" + "go.opentelemetry.io/otel/trace" +) + +// ManagerTask is an interface for delete and list tasks. +type ManagerTasks interface { + // CreateDeleteTask create a delete task job + CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTasksArgs) (*internaljob.GroupJobState, error) + // CreateListTasks create a list tasks job + CreateListTasks(context.Context, []models.Scheduler, types.ListTasksArgs) (*internaljob.GroupJobState, error) +} + +// managerTasks is an implementation of ManagerTasks. +type managerTasks struct { + job *internaljob.Job + registryTimeout time.Duration +} + +// newManagerTasks create a new ManagerTasks. +func newManagerTasks(job *internaljob.Job, registryTimeout time.Duration) ManagerTasks { + return &managerTasks{ + job: job, + registryTimeout: registryTimeout, + } +} + +// Create a delete task job. +func (m *managerTasks) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("delete task marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues) +} + +// Create a list tasks job. +func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models.Scheduler, json types.ListTasksArgs) (*internaljob.GroupJobState, error) { + var span trace.Span + ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer)) + span.SetAttributes(config.AttributeListTasksID.String(json.TaskID)) + span.SetAttributes(config.AttributeListTasksPage.Int(json.Page)) + span.SetAttributes(config.AttributeListTasksPerPage.Int(json.PerPage)) + defer span.End() + + args, err := internaljob.MarshalRequest(json) + if err != nil { + logger.Errorf("list tasks marshal request: %v, error: %v", args, err) + return nil, err + } + + // Initialize queues. + queues := getSchedulerQueues(schedulers) + return m.createGroupJob(ctx, internaljob.ListTasksJob, args, queues) +} + +// createGroupJob creates a group job. +func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) { + var signatures []*machineryv1tasks.Signature + for _, queue := range queues { + signatures = append(signatures, &machineryv1tasks.Signature{ + UUID: fmt.Sprintf("task_%s", uuid.New().String()), + Name: name, + RoutingKey: queue.String(), + Args: args, + }) + } + + group, err := machineryv1tasks.NewGroup(signatures...) + if err != nil { + return nil, err + } + + var tasks []machineryv1tasks.Signature + for _, signature := range signatures { + tasks = append(tasks, *signature) + } + + logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) + if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { + logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) + return nil, err + } + + return &internaljob.GroupJobState{ + GroupUUID: group.GroupUUID, + State: machineryv1tasks.StatePending, + CreatedAt: time.Now(), + }, nil +} diff --git a/manager/service/job.go b/manager/service/job.go index f2836a05fc9..3b4319babec 100644 --- a/manager/service/job.go +++ b/manager/service/job.go @@ -75,6 +75,87 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat return &job, nil } +func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateDeleteTask(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil +} + +func (s *service) CreateListTasksJob(ctx context.Context, json types.CreateListTasksJobRequest) (*models.Job, error) { + candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs) + if err != nil { + return nil, err + } + + groupJobState, err := s.job.CreateListTasks(ctx, candidateSchedulers, json.Args) + if err != nil { + return nil, err + } + + var candidateSchedulerClusters []models.SchedulerCluster + for _, candidateScheduler := range candidateSchedulers { + candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster) + } + + args, err := structure.StructToMap(json.Args) + if err != nil { + return nil, err + } + + job := models.Job{ + TaskID: groupJobState.GroupUUID, + BIO: json.BIO, + Type: json.Type, + State: groupJobState.State, + Args: args, + UserID: json.UserID, + SchedulerClusters: candidateSchedulerClusters, + } + + if err := s.db.WithContext(ctx).Create(&job).Error; err != nil { + return nil, err + } + + go s.pollingJob(context.Background(), job.ID, job.TaskID) + + return &job, nil + +} + func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) { var candidateSchedulers []models.Scheduler if len(schedulerClusterIDs) != 0 { diff --git a/manager/service/mocks/service_mock.go b/manager/service/mocks/service_mock.go index c4a39ac4246..2db1f892885 100644 --- a/manager/service/mocks/service_mock.go +++ b/manager/service/mocks/service_mock.go @@ -168,6 +168,36 @@ func (m *MockService) CreateConfig(arg0 context.Context, arg1 types.CreateConfig return ret0, ret1 } +// CreateDeleteTaskJob mocks base method. +func (m *MockService) CreateDeleteTaskJob(arg0 context.Context, arg1 types.CreateDeleteTaskJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateDeleteTaskJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateDeleteTaskJob indicates an expected call of CreateDeleteTaskJob. +func (mr *MockServiceMockRecorder) CreateDeleteTaskJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDeleteTaskJob", reflect.TypeOf((*MockService)(nil).CreateDeleteTaskJob), arg0, arg1) +} + +// CreateListTasksJob mocks base method. +func (m *MockService) CreateListTasksJob(arg0 context.Context, arg1 types.CreateListTasksJobRequest) (*models.Job, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateListTasksJob", arg0, arg1) + ret0, _ := ret[0].(*models.Job) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateListTasksJob indicates an expected call of CreateListTasksJob. +func (mr *MockServiceMockRecorder) CreateListTasksJob(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateListTasksJob", reflect.TypeOf((*MockService)(nil).CreateListTasksJob), arg0, arg1) +} + // CreateConfig indicates an expected call of CreateConfig. func (mr *MockServiceMockRecorder) CreateConfig(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() diff --git a/manager/types/job.go b/manager/types/job.go index 450ac2a1ccf..81cdb90eba4 100644 --- a/manager/types/job.go +++ b/manager/types/job.go @@ -58,31 +58,31 @@ type CreatePreheatJobRequest struct { } type CreateDeleteTaskJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args DeleteTasksJobArgs `json:"args" binding:"omitempty"` - Result map[string]any `json:"result" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args DeleteTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } -type DeleteTasksJobArgs struct { - TaskID string `json:"taskID" binding:"required"` +type DeleteTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` } type CreateListTasksJobRequest struct { - BIO string `json:"bio" binding:"omitempty"` - Type string `json:"type" binding:"required"` - Args ListTasksJobArgs `json:"args" binding:"omitempty"` - Result map[string]any `json:"result" binding:"omitempty"` - UserID uint `json:"user_id" binding:"omitempty"` - SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` + BIO string `json:"bio" binding:"omitempty"` + Type string `json:"type" binding:"required"` + Args ListTasksArgs `json:"args" binding:"omitempty"` + Result map[string]any `json:"result" binding:"omitempty"` + UserID uint `json:"user_id" binding:"omitempty"` + SchedulerClusterIDs []uint `json:"scheduler_cluster_ids" binding:"omitempty"` } -type ListTasksJobArgs struct { - TaskID string `json:"taskID" binding:"required"` +type ListTasksArgs struct { + TaskID string `json:"task_id" binding:"required"` Page int `json:"page" binding:"omitempty,gte=1"` - PerPage int `json:"count" binding:"omitempty,gte=1,lte=10000000"` + PerPage int `json:"per_page" binding:"omitempty,gte=1,lte=10000000"` } type PreheatArgs struct { @@ -96,10 +96,10 @@ type PreheatArgs struct { Tag string `json:"tag" binding:"omitempty"` // FilteredQueryParams is the filtered query params for preheating. - FilteredQueryParams string `json:"filteredQueryParams" binding:"omitempty"` + FilteredQueryParams string `json:"filtered_query_params" binding:"omitempty"` // PieceLength is the piece length for preheating. - PieceLength uint32 `json:"pieceLength" binding:"omitempty"` + PieceLength uint32 `json:"piece_length" binding:"omitempty"` // Headers is the http headers for authentication. Headers map[string]string `json:"headers" binding:"omitempty"`