Skip to content

Commit

Permalink
feat: add delete task and list tasks manager api with request type an…
Browse files Browse the repository at this point in the history
…d service type. (#3378)

Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo authored Aug 13, 2024
1 parent 53ba603 commit 5547307
Show file tree
Hide file tree
Showing 17 changed files with 674 additions and 11 deletions.
6 changes: 6 additions & 0 deletions internal/job/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const (

// SyncPeersJob is the name of syncing peers job.
SyncPeersJob = "sync_peers"

// ListTasksJob is the name of listing tasks job.
ListTasksJob = "list_tasks"

// DeleteTasksJob is the name of deleting tasks job.
DeleteTaskJob = "delete_task"
)

// Machinery server configuration.
Expand Down
34 changes: 33 additions & 1 deletion internal/job/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,48 @@

package job

import "d7y.io/dragonfly/v2/scheduler/resource"

type PreheatRequest struct {
URL string `json:"url" validate:"required,url"`
Tag string `json:"tag" validate:"omitempty"`
Digest string `json:"digest" validate:"omitempty"`
FilteredQueryParams string `json:"filteredQueryParams" validate:"omitempty"`
FilteredQueryParams string `json:"filtered_query_params" validate:"omitempty"`
Headers map[string]string `json:"headers" validate:"omitempty"`
Application string `json:"application" validate:"omitempty"`
Priority int32 `json:"priority" validate:"omitempty"`
PieceLength uint32 `json:"pieceLength" validate:"omitempty"`
}

type PreheatResponse struct {
TaskID string `json:"taskID"`
}

// ListTasksRequest defines the request parameters for listing tasks.
type ListTasksRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// ListTasksResponse defines the response parameters for listing tasks.
type ListTasksResponse struct {
Peers []*resource.Peer `json:"peers"`
}

// DeleteTaskRequest defines the request parameters for deleting task.
type DeleteTaskRequest struct {
TaskID string `json:"task_id" validate:"required"`
}

// Task includes information about a task along with peer details and a description.
type Task struct {
Task *resource.Task `json:"task"`
Peer *resource.Peer `json:"peer"`
Description string `json:"description"`
}

// DeleteTaskResponse represents the response after attempting to delete tasks,
// categorizing them into successfully and unsuccessfully deleted.
type DeleteTaskResponse struct {
SuccessTasks []*Task `json:"success_tasks"`
FailureTasks []*Task `json:"failure_tasks"`
}
11 changes: 11 additions & 0 deletions manager/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ type JobConfig struct {

// Sync peers configuration.
SyncPeers SyncPeersConfig `yaml:"syncPeers" mapstructure:"syncPeers"`

// Manager tasks configuration.
ManagerTasks ManagerTasksConfig `yaml:"managerTasks" mapstructure:"managerTasks"`
}

type PreheatConfig struct {
Expand All @@ -315,6 +318,11 @@ type SyncPeersConfig struct {
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type ManagerTasksConfig struct {
// Timeout is the timeout for manager tasks information for the single scheduler.
Timeout time.Duration `yaml:"timeout" mapstructure:"timeout"`
}

type PreheatTLSClientConfig struct {
// CACert is the CA certificate for preheat tls handshake, it can be path or PEM format string.
CACert types.PEMContent `yaml:"caCert" mapstructure:"caCert"`
Expand Down Expand Up @@ -455,6 +463,9 @@ func New() *Config {
Interval: DefaultJobSyncPeersInterval,
Timeout: DefaultJobSyncPeersTimeout,
},
ManagerTasks: ManagerTasksConfig{
Timeout: DefaultJobManagerTasksTimeout,
},
},
ObjectStorage: ObjectStorageConfig{
Enable: false,
Expand Down
12 changes: 9 additions & 3 deletions manager/config/constant_otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,20 @@ package config
import "go.opentelemetry.io/otel/attribute"

const (
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeID = attribute.Key("d7y.manager.id")
AttributePreheatType = attribute.Key("d7y.manager.preheat.type")
AttributePreheatURL = attribute.Key("d7y.manager.preheat.url")
AttributeDeleteTaskID = attribute.Key("d7y.manager.delete_task.id")
AttributeListTasksID = attribute.Key("d7y.manager.list_tasks.id")
AttributeListTasksPage = attribute.Key("d7y.manager.list_tasks.page")
AttributeListTasksPerPage = attribute.Key("d7y.manager.list_tasks.per_page")
)

const (
SpanPreheat = "preheat"
SpanSyncPeers = "sync-peers"
SpanGetLayers = "get-layers"
SpanAuthWithRegistry = "auth-with-registry"
SpanDeleteTask = "delete-task"
SpanListTasks = "list-tasks"
)
3 changes: 3 additions & 0 deletions manager/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ const (

// DefaultJobSyncPeersTimeout is the default timeout for syncing all peers information from the scheduler.
DefaultJobSyncPeersTimeout = 10 * time.Minute

// DefaultJobManagerTasksTimeout is the default timeout for manager tasks, for delete task and list tasks.
DefaultJobManagerTasksTimeout = 10 * time.Minute
)

const (
Expand Down
28 changes: 28 additions & 0 deletions manager/handlers/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,34 @@ func (h *Handlers) CreateJob(ctx *gin.Context) {
return
}

ctx.JSON(http.StatusOK, job)
case job.DeleteTaskJob:
var json types.CreateDeleteTaskJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateDeleteTaskJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
case job.ListTasksJob:
var json types.CreateListTasksJobRequest
if err := ctx.ShouldBindBodyWith(&json, binding.JSON); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}

job, err := h.service.CreateListTasksJob(ctx.Request.Context(), json)
if err != nil {
ctx.Error(err) // nolint: errcheck
return
}

ctx.JSON(http.StatusOK, job)
default:
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": "Unknow type"})
Expand Down
66 changes: 66 additions & 0 deletions manager/handlers/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ var (
"user_id": 4,
"bio": "bio"
}`
mockListTasksJobReqBody = `
{
"type": "list_tasks",
"user_id": 4,
"bio": "bio"
}`
mockDeleteTaskJobReqBody = `
{
"type": "delete_task",
"user_id": 4,
"bio": "bio"
}`
mockOtherJobReqBody = `
{
"type": "others",
Expand All @@ -50,6 +62,16 @@ var (
Type: "preheat",
BIO: "bio",
}
mockListTasksCreateJobRequest = types.CreateListTasksJobRequest{
UserID: 4,
Type: "list_tasks",
BIO: "bio",
}
mockDeleteTaskCreateJobRequest = types.CreateDeleteTaskJobRequest{
UserID: 4,
Type: "delete_task",
BIO: "bio",
}
mockUpdateJobRequest = types.UpdateJobRequest{
UserID: 4,
BIO: "bio",
Expand All @@ -61,6 +83,20 @@ var (
BIO: "bio",
TaskID: "2",
}
mockListTasksJobModel = &models.Job{
BaseModel: mockBaseModel,
UserID: 4,
Type: "list_tasks",
BIO: "bio",
TaskID: "2",
}
mockDeleteTaskJobModel = &models.Job{
BaseModel: mockBaseModel,
UserID: 4,
Type: "delete_task",
BIO: "bio",
TaskID: "2",
}
)

func mockJobRouter(h *Handlers) *gin.Engine {
Expand Down Expand Up @@ -115,6 +151,36 @@ func TestHandlers_CreateJob(t *testing.T) {
assert.Equal(mockPreheatJobModel, &job)
},
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockListTasksJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateListTasksJob(gomock.Any(), gomock.Eq(mockListTasksCreateJobRequest)).Return(mockListTasksJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
assert.Equal(http.StatusOK, w.Code)
job := models.Job{}
err := json.Unmarshal(w.Body.Bytes(), &job)
assert.NoError(err)
assert.Equal(mockListTasksJobModel, &job)
},
},
{
name: "success",
req: httptest.NewRequest(http.MethodPost, "/oapi/v1/jobs", strings.NewReader(mockDeleteTaskJobReqBody)),
mock: func(ms *mocks.MockServiceMockRecorder) {
ms.CreateDeleteTaskJob(gomock.Any(), gomock.Eq(mockDeleteTaskCreateJobRequest)).Return(mockDeleteTaskJobModel, nil).Times(1)
},
expect: func(t *testing.T, w *httptest.ResponseRecorder) {
assert := assert.New(t)
assert.Equal(http.StatusOK, w.Code)
job := models.Job{}
err := json.Unmarshal(w.Body.Bytes(), &job)
assert.NoError(err)
assert.Equal(mockDeleteTaskJobModel, &job)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
Expand Down
10 changes: 7 additions & 3 deletions manager/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Job struct {
*internaljob.Job
Preheat
SyncPeers
ManagerTasks
}

// New returns a new Job.
Expand Down Expand Up @@ -74,10 +75,13 @@ func New(cfg *config.Config, gdb *gorm.DB) (*Job, error) {
return nil, err
}

managerTasks := newManagerTasks(j, cfg.Job.ManagerTasks.Timeout)

return &Job{
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
Job: j,
Preheat: preheat,
SyncPeers: syncPeers,
ManagerTasks: managerTasks,
}, nil
}

Expand Down
Loading

0 comments on commit 5547307

Please sign in to comment.