Skip to content

Commit

Permalink
feat: add manager job service and service mock for delete task and li…
Browse files Browse the repository at this point in the history
…st tasks.

Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo committed Jul 19, 2024
1 parent a576067 commit 37307c6
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 25 deletions.
11 changes: 11 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ type JobConfig struct {

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`

// Manager tasks configuration.
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
}

type PreheatConfig struct {
Expand All @@ -315,6 +318,11 @@ type SyncPeersConfig struct {
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type ManagerTasksConfig struct {
// Timeout is the timeout for manager tasks information for the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -455,6 +463,9 @@ func New() *Config {
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
ManagerTasks: ManagerTasksConfig{
Timeout: DefaultJobManagerTasksTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down
12 changes: 9 additions & 3 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ package config
import "go.opentelemetry.io/otel/attribute"

const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
)

const (
SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
SpanDeleteTask = "delete-task"
SpanListTasks = "list-tasks"
)
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute

// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
DefaultJobManagerTasksTimeout = 10 * time.Minute
)

const (
Expand Down
10 changes: 7 additions & 3 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Job struct {
*internaljob.Job
Preheat
SyncPeers
ManagerTasks
}

// New returns a new Job.
Expand Down Expand Up @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)

return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
ManagerTasks: managerTasks,
}, nil
}

Expand Down
127 changes: 127 additions & 0 deletions manager/job/manager_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*

Check failure on line 1 in manager/job/manager_tasks.go

View workflow job for this annotation

GitHub Actions / Lint

package comment should not have leading space (golint)
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/delete_task_mock.go -source delete_task.go -package mocks
package job

import (
"context"
"fmt"
"time"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
)

// ManagerTask is an interface for delete and list tasks.
type ManagerTasks interface {
// CreateDeleteTask create a delete task job
CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTasksArgs) (*internaljob.GroupJobState, error)
// CreateListTasks create a list tasks job
CreateListTasks(context.Context, []models.Scheduler, types.ListTasksArgs) (*internaljob.GroupJobState, error)
}

// managerTasks is an implementation of ManagerTasks.
type managerTasks struct {
job *internaljob.Job
registryTimeout time.Duration
}

// newManagerTasks create a new ManagerTasks.
func newManagerTasks(job *internaljob.Job, registryTimeout time.Duration) ManagerTasks {
return &managerTasks{
job: job,
registryTimeout: registryTimeout,
}
}

// Create a delete task job.
func (m *managerTasks) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTasksArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
}

// Initialize queues.
queues := getSchedulerQueues(schedulers)
return m.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues)
}

// Create a list tasks job.
func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models.Scheduler, json types.ListTasksArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeListTasksID.String(json.TaskID))
span.SetAttributes(config.AttributeListTasksPage.Int(json.Page))
span.SetAttributes(config.AttributeListTasksPerPage.Int(json.PerPage))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("list tasks marshal request: %v, error: %v", args, err)
return nil, err
}

// Initialize queues.
queues := getSchedulerQueues(schedulers)
return m.createGroupJob(ctx, internaljob.ListTasksJob, args, queues)
}

// createGroupJob creates a group job.
func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: name,
RoutingKey: queue.String(),
Args: args,
})
}

group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
return nil, err
}

var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
}

logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err)
return nil, err
}

return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
}
81 changes: 81 additions & 0 deletions manager/service/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,87 @@ func (s *service) CreatePreheatJob(ctx context.Context, json types.CreatePreheat
return &job, nil
}

func (s *service) CreateDeleteTaskJob(ctx context.Context, json types.CreateDeleteTaskJobRequest) (*models.Job, error) {
candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)
if err != nil {
return nil, err
}

groupJobState, err := s.job.CreateDeleteTask(ctx, candidateSchedulers, json.Args)
if err != nil {
return nil, err
}

var candidateSchedulerClusters []models.SchedulerCluster
for _, candidateScheduler := range candidateSchedulers {
candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

job := models.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Type: json.Type,
State: groupJobState.State,
Args: args,
UserID: json.UserID,
SchedulerClusters: candidateSchedulerClusters,
}

if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
return nil, err
}

go s.pollingJob(context.Background(), job.ID, job.TaskID)

return &job, nil
}

func (s *service) CreateListTasksJob(ctx context.Context, json types.CreateListTasksJobRequest) (*models.Job, error) {
candidateSchedulers, err := s.findCandidateSchedulers(ctx, json.SchedulerClusterIDs)
if err != nil {
return nil, err
}

groupJobState, err := s.job.CreateListTasks(ctx, candidateSchedulers, json.Args)
if err != nil {
return nil, err
}

var candidateSchedulerClusters []models.SchedulerCluster
for _, candidateScheduler := range candidateSchedulers {
candidateSchedulerClusters = append(candidateSchedulerClusters, candidateScheduler.SchedulerCluster)
}

args, err := structure.StructToMap(json.Args)
if err != nil {
return nil, err
}

job := models.Job{
TaskID: groupJobState.GroupUUID,
BIO: json.BIO,
Type: json.Type,
State: groupJobState.State,
Args: args,
UserID: json.UserID,
SchedulerClusters: candidateSchedulerClusters,
}

if err := s.db.WithContext(ctx).Create(&job).Error; err != nil {
return nil, err
}

go s.pollingJob(context.Background(), job.ID, job.TaskID)

return &job, nil

}

func (s *service) findCandidateSchedulers(ctx context.Context, schedulerClusterIDs []uint) ([]models.Scheduler, error) {
var candidateSchedulers []models.Scheduler
if len(schedulerClusterIDs) != 0 {
Expand Down
30 changes: 30 additions & 0 deletions manager/service/mocks/service_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 37307c6

Please sign in to comment.