-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add delete task and list tasks manager api with request type and service type. #3378
feat: add delete task and list tasks manager api with request type and service type. #3378
Conversation
105e622
to
a576067
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3378 +/- ##
==========================================
- Coverage 52.76% 52.54% -0.22%
==========================================
Files 193 194 +1
Lines 20485 20578 +93
==========================================
+ Hits 10808 10813 +5
- Misses 8873 8955 +82
- Partials 804 810 +6
Flags with carried forward coverage won't be shown. Click here to find out more.
|
38d3b15
to
987e070
Compare
Signed-off-by: Asklv <[email protected]>
…st tasks. Signed-off-by: Asklv <[email protected]>
Signed-off-by: Asklv <[email protected]>
987e070
to
df2ae9c
Compare
manager/job/manager_tasks.go
Outdated
|
||
logger.Infof("create manager tasks group %s in queues %v, tasks: %#v", group.GroupUUID, queues, tasks) | ||
if _, err := m.job.Server.SendGroupWithContext(ctx, group, 0); err != nil { | ||
logger.Errorf("create manager tasks group %s failed", group.GroupUUID, err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra spaces in log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, done.
manager/types/job.go
Outdated
|
||
type ListTasksArgs struct { | ||
TaskID string `json:"task_id" binding:"required"` | ||
Page int `json:"page" binding:"omitempty,gte=1"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Page
and PerPage
are required by many requests. We can define a common request argument to reuse, but that's not the focus of this PR. We can address it in a subsequent refactoring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, there is a page
and perpage
parameter in the task itself, but this is mainly for handling page
and perpage
in the structure returned within the task result, and I'll consider defining a generic version in subsequent implementations as well.
…ntation. Signed-off-by: Asklv <[email protected]>
Signed-off-by: Asklv <[email protected]>
77ac45c
to
7aee5fd
Compare
scheduler/job/job.go
Outdated
|
||
// listTasks is a job to list tasks. | ||
func (j *job) listTasks(ctx context.Context, data string) (string, error) { | ||
ctx, cancel := context.WithTimeout(ctx, listTasksTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see any reference or usage for the ctx
in this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, this file should belong to another scheduler PR, which I have currently removed.
And I have fixed this ctx
usage in another PR.
|
||
// deleteTask is a job to delete task. | ||
func (j *job) deleteTask(ctx context.Context, data string) (string, error) { | ||
ctx, cancel := context.WithTimeout(ctx, deleteTaskTimeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
f4dd25c
to
dcbb0f7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
572d87c
to
16cb224
Compare
…ntation. Signed-off-by: Asklv <[email protected]>
…ic implement. Signed-off-by: Asklv <[email protected]>
…er valid tasks. Signed-off-by: Asklv <[email protected]>
scheduler/job/job.go
Outdated
} | ||
|
||
// getValidPeers try to get valid peers by task id | ||
func (j *job) getValidPeers(taskID string) ([]*resource.Peer, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getFinishedPeers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have updated this function's name.
scheduler/job/job.go
Outdated
} | ||
|
||
// get peer info by task info | ||
peers := make([]*resource.Peer, 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add LoadFinishedPeers
to task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, moved to scheduler/resource/task.go::LoadFinishedPeers
.
scheduler/job/job.go
Outdated
|
||
// Create a wait group to limit delete rpc concurrency | ||
// and avoid too many rpc requests to the host. | ||
wg := sync.WaitGroup{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No concurrent deletion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, removed.
scheduler/job/job.go
Outdated
|
||
// Get dfdaemon client from host | ||
target := fmt.Sprintf("%s:%d", peer.Host.IP, peer.Host.Port) | ||
conn, err := grpc.DialContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use pkg/rpc
client.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I have added a GetV2ByAddr()
function in pkg/rpc/dfdaemon/client/client_v2.go
, refer to pkg/rpc/manager/client/client_v2.go::GetV2ByAddr()
Signed-off-by: Asklv <[email protected]>
323c9fb
to
2b2d3fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@IRONICBo Fix Lint. |
2b2d3fd
to
915d880
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Description
Add a POST method to start an asynchronous task to clear the cache of the task id on the peer, and a query api to query the status of the asynchronous task and aggregate it into the existing asynchronous task creation logic.
The manager eventually collects the information needed to start the job and creates a GroupJob into the machine to send a message to the distributed task waiting for the scheduler to consume it.
There is already a Job definition that can reuse the job request to support a new Job, define a new JobRequest, and the response reuses the Job response.
Meanwhile, this PR add a delete task job and list tasks job implementation to the scheduler's async job to receive incoming jobs from the manager and call grpc's interface to complete tasks queries and deletions.
Related Issue
Motivation and Context
This change introduces api definitions for delete task and list task and request responsetype definitions, as well as interfaces under the services package, and introduced internaljob configuration parameters and job calls to asynchronously list or delete peers and task information for associated tasks in the scheduler.
Screenshots (if appropriate)
Types of changes
Checklist