-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-45266: [C++][Acero] Fix the running tasks count of Scheduler when get error tasks in multi-threads #45268
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -278,8 +278,8 @@ Status TaskSchedulerImpl::ExecuteMore(size_t thread_id, int num_tasks_to_execute | |
bool task_group_finished = false; | ||
Status status = ExecuteTask(thread_id, group_id, task_id, &task_group_finished); | ||
if (!status.ok()) { | ||
// Mark the remaining picked tasks as finished | ||
for (size_t j = i + 1; j < tasks.size(); ++j) { | ||
// Mark the current and remaining picked tasks as finished | ||
for (size_t j = i; j < tasks.size(); ++j) { | ||
if (PostExecuteTask(thread_id, tasks[j].first)) { | ||
bool all_task_groups_finished = false; | ||
RETURN_NOT_OK( | ||
|
@@ -369,17 +369,25 @@ Status TaskSchedulerImpl::ScheduleMore(size_t thread_id, int num_tasks_finished) | |
int group_id = tasks[i].first; | ||
int64_t task_id = tasks[i].second; | ||
RETURN_NOT_OK(schedule_impl_([this, group_id, task_id](size_t thread_id) -> Status { | ||
RETURN_NOT_OK(ScheduleMore(thread_id, 1)); | ||
|
||
bool task_group_finished = false; | ||
RETURN_NOT_OK(ExecuteTask(thread_id, group_id, task_id, &task_group_finished)); | ||
// PostExecuteTask must be called later if any error ocurres during task execution | ||
// (including ScheduleMore), so we preserve the status. | ||
auto status = [&]() { | ||
RETURN_NOT_OK(ScheduleMore(thread_id, 1)); | ||
return ExecuteTask(thread_id, group_id, task_id, &task_group_finished); | ||
}(); | ||
|
||
if (!status.ok()) { | ||
task_group_finished = PostExecuteTask(thread_id, group_id); | ||
} | ||
|
||
if (task_group_finished) { | ||
bool all_task_groups_finished = false; | ||
return OnTaskGroupFinished(thread_id, group_id, &all_task_groups_finished); | ||
RETURN_NOT_OK( | ||
OnTaskGroupFinished(thread_id, group_id, &all_task_groups_finished)); | ||
Comment on lines
+373
to
+387
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I made some refinement on top of your original change, PTAL. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I am not sure which one should be returned when we get two error status: CurrentTask's and OnTaskGroupFinished's. I think maybe we can just return the first error status(CurrentTask always error before OnTaskGroupFinished). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I don't think it really matters which error to return. So I chose the one can most simplify the code :) |
||
} | ||
|
||
return Status::OK(); | ||
return status; | ||
})); | ||
} | ||
|
||
|
@@ -413,6 +421,8 @@ void TaskSchedulerImpl::Abort(AbortContinuationImpl impl) { | |
all_finished = false; | ||
task_group.state_ = TaskGroupState::ALL_TASKS_STARTED; | ||
} | ||
} else if (task_group.state_ == TaskGroupState::ALL_TASKS_STARTED) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is needed for allowing the I suppose the |
||
all_finished = false; | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is needed for the serial execution path to correctly set the task count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So PostExecuteTask need execute the task even if failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah this is pretty much the whole point of this change.