Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add delete task and list tasks manager api with request type and service type. #3378

Merged
merged 9 commits into from
Aug 13, 2024
6 changes: 6 additions & 0 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const (

// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"

// ListTasksJob is the name of listing tasks job.
ListTasksJob = "list_tasks"

// DeleteTasksJob is the name of deleting tasks job.
DeleteTaskJob = "delete_task"
)

// Machinery server configuration.
Expand Down
11 changes: 11 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ type JobConfig struct {

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`

// Manager tasks configuration.
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
}

type PreheatConfig struct {
Expand All @@ -315,6 +318,11 @@ type SyncPeersConfig struct {
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type ManagerTasksConfig struct {
// Timeout is the timeout for manager tasks information for the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -455,6 +463,9 @@ func New() *Config {
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
ManagerTasks: ManagerTasksConfig{
Timeout: DefaultJobManagerTasksTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down
12 changes: 9 additions & 3 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ package config
import "go.opentelemetry.io/otel/attribute"

const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
)

const (
SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
SpanDeleteTask = "delete-task"
SpanListTasks = "list-tasks"
)
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute

// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
DefaultJobManagerTasksTimeout = 10 * time.Minute
)

const (
Expand Down
28 changes: 28 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, job)
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
case job.ListTasksJob:
var json types.CreateListTasksJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
default:
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})
Expand Down
10 changes: 7 additions & 3 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Job struct {
*internaljob.Job
Preheat
SyncPeers
ManagerTasks
}

// New returns a new Job.
Expand Down Expand Up @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)

return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
ManagerTasks: managerTasks,
}, nil
}

Expand Down
130 changes: 130 additions & 0 deletions manager/job/manager_tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Copyright 2024 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

//go:generate mockgen -destination mocks/manager_tasks_mock.go -source manager_tasks.go -package mocks

package job

import (
"context"
"fmt"
"time"

machineryv1tasks "github.com/RichardKnop/machinery/v1/tasks"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"

logger "d7y.io/dragonfly/v2/internal/dflog"
internaljob "d7y.io/dragonfly/v2/internal/job"
"d7y.io/dragonfly/v2/manager/config"
"d7y.io/dragonfly/v2/manager/models"
"d7y.io/dragonfly/v2/manager/types"
)

// ManagerTask is an interface for delete and list tasks.
type ManagerTasks interface {
// CreateDeleteTask create a delete task job
CreateDeleteTask(context.Context, []models.Scheduler, types.DeleteTasksArgs) (*internaljob.GroupJobState, error)
// CreateListTasks create a list tasks job
CreateListTasks(context.Context, []models.Scheduler, types.ListTasksArgs) (*internaljob.GroupJobState, error)
}

// managerTasks is an implementation of ManagerTasks.
type managerTasks struct {
job *internaljob.Job
registryTimeout time.Duration
}

// newManagerTasks create a new ManagerTasks.
func newManagerTasks(job *internaljob.Job, registryTimeout time.Duration) ManagerTasks {
return &managerTasks{
job: job,
registryTimeout: registryTimeout,
}
}

// Create a delete task job.
func (m *managerTasks) CreateDeleteTask(ctx context.Context, schedulers []models.Scheduler, json types.DeleteTasksArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanDeleteTask, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeDeleteTaskID.String(json.TaskID))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("delete task marshal request: %v, error: %v", args, err)
return nil, err
}

// Initialize queues.
queues := getSchedulerQueues(schedulers)
return m.createGroupJob(ctx, internaljob.DeleteTaskJob, args, queues)
}

// Create a list tasks job.
func (m *managerTasks) CreateListTasks(ctx context.Context, schedulers []models.Scheduler, json types.ListTasksArgs) (*internaljob.GroupJobState, error) {
var span trace.Span
ctx, span = tracer.Start(ctx, config.SpanListTasks, trace.WithSpanKind(trace.SpanKindProducer))
span.SetAttributes(config.AttributeListTasksID.String(json.TaskID))
span.SetAttributes(config.AttributeListTasksPage.Int(json.Page))
span.SetAttributes(config.AttributeListTasksPerPage.Int(json.PerPage))
defer span.End()

args, err := internaljob.MarshalRequest(json)
if err != nil {
logger.Errorf("list tasks marshal request: %v, error: %v", args, err)
return nil, err
}

// Initialize queues.
queues := getSchedulerQueues(schedulers)
return m.createGroupJob(ctx, internaljob.ListTasksJob, args, queues)
}

// createGroupJob creates a group job.
func (m *managerTasks) createGroupJob(ctx context.Context, name string, args []machineryv1tasks.Arg, queues []internaljob.Queue) (*internaljob.GroupJobState, error) {
var signatures []*machineryv1tasks.Signature
for _, queue := range queues {
signatures = append(signatures, &machineryv1tasks.Signature{
UUID: fmt.Sprintf("task_%s", uuid.New().String()),
Name: name,
RoutingKey: queue.String(),
Args: args,
})
}

group, err := machineryv1tasks.NewGroup(signatures...)
if err != nil {
return nil, err
}

var tasks []machineryv1tasks.Signature
for _, signature := range signatures {
tasks = append(tasks, *signature)
}

logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks)
if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil {
logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra spaces in log.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, done.

return nil, err
}

return &internaljob.GroupJobState{
GroupUUID: group.GroupUUID,
State: machineryv1tasks.StatePending,
CreatedAt: time.Now(),
}, nil
}
72 changes: 72 additions & 0 deletions manager/job/mocks/manager_tasks_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading