Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add delete task and list tasks manager api with request type and service type. #3378

Merged
merged 9 commits into from
Aug 13, 2024
6 changes: 6 additions & 0 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const (

// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"

// ListTasksJob is the name of listing tasks job.
ListTasksJob = "list_tasks"

// DeleteTasksJob is the name of deleting tasks job.
DeleteTaskJob = "delete_task"
)

// Machinery server configuration.
Expand Down
34 changes: 33 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,48 @@

package job

import "d7y.io/dragonfly/v2/scheduler/resource"

type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
}

type PreheatResponse struct {
TaskID string `json:"taskID"`
}

// ListTasksRequest defines the request parameters for listing tasks.
type ListTasksRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// ListTasksResponse defines the response parameters for listing tasks.
type ListTasksResponse struct {
Peers []*resource.Peer `json:"peers"`
}

// DeleteTaskRequest defines the request parameters for deleting task.
type DeleteTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// Task includes information about a task along with peer details and a description.
type Task struct {
Task *resource.Task `json:"task"`
Peer *resource.Peer `json:"peer"`
Description string `json:"description"`
}

// DeleteTaskResponse represents the response after attempting to delete tasks,
// categorizing them into successfully and unsuccessfully deleted.
type DeleteTaskResponse struct {
SuccessTasks []*Task `json:"success_tasks"`
FailureTasks []*Task `json:"failure_tasks"`
}
11 changes: 11 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ type JobConfig struct {

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`

// Manager tasks configuration.
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
}

type PreheatConfig struct {
Expand All @@ -315,6 +318,11 @@ type SyncPeersConfig struct {
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type ManagerTasksConfig struct {
// Timeout is the timeout for manager tasks information for the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -455,6 +463,9 @@ func New() *Config {
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
ManagerTasks: ManagerTasksConfig{
Timeout: DefaultJobManagerTasksTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down
12 changes: 9 additions & 3 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ package config
import "go.opentelemetry.io/otel/attribute"

const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
)

const (
SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
SpanDeleteTask = "delete-task"
SpanListTasks = "list-tasks"
)
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute

// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
DefaultJobManagerTasksTimeout = 10 * time.Minute
)

const (
Expand Down
28 changes: 28 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, job)
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
case job.ListTasksJob:
var json types.CreateListTasksJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
default:
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})
Expand Down
66 changes: 66 additions & 0 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
"user_id": 4,
"bio": "bio"
}`
mockListTasksJobReqBody = `
{
"type": "list_tasks",
"user_id": 4,
"bio": "bio"
}`
mockDeleteTaskJobReqBody = `
{
"type": "delete_task",
"user_id": 4,
"bio": "bio"
}`
mockOtherJobReqBody = `
{
"type": "others",
Expand All @@ -50,6 +62,16 @@ var (
Type: "preheat",
BIO: "bio",
}
mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{
UserID: 4,
Type: "list_tasks",
BIO: "bio",
}
mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
}
mockUpdateJobRequest = types.UpdateJobRequest{
UserID: 4,
BIO: "bio",
Expand All @@ -61,6 +83,20 @@ var (
BIO: "bio",
TaskID: "2",
}
mockListTasksJobModel = &models.Job{
BaseModel: mockBaseModel,
UserID: 4,
Type: "list_tasks",
BIO: "bio",
TaskID: "2",
}
mockDeleteTaskJobModel = &models.Job{
BaseModel: mockBaseModel,
UserID: 4,
Type: "delete_task",
BIO: "bio",
TaskID: "2",
}
)

func mockJobRouter(h *Handlers) *gin.Engine {
Expand Down Expand Up @@ -115,6 +151,36 @@ func TestHandlers_CreateJob(t *testing.T) {
assert.Equal(mockPreheatJobModel, &job)
},
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
assert.Equal(http.StatusOK, w.Code)
job := models.Job{}
err := json.Unmarshal(w.Body.Bytes(), &job)
assert.NoError(err)
assert.Equal(mockListTasksJobModel, &job)
},
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
assert.Equal(http.StatusOK, w.Code)
job := models.Job{}
err := json.Unmarshal(w.Body.Bytes(), &job)
assert.NoError(err)
assert.Equal(mockDeleteTaskJobModel, &job)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Job struct {
*internaljob.Job
Preheat
SyncPeers
ManagerTasks
}

// New returns a new Job.
Expand Down Expand Up @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)

return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
ManagerTasks: managerTasks,
}, nil
}

Expand Down
Loading
Loading