Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-45266: [C++][Acero] Fix the running tasks count of Scheduler when get error tasks in multi-threads #45268

Open
wants to merge 4 commits into
base: main
Choose a base branch
from

Conversation

wuzhoupei
Copy link

@wuzhoupei wuzhoupei commented Jan 15, 2025

Rationale for this change

When the TaskGroup should be canceled, it will move the number which not-start to finished to avoid do them(in TaskSchedulerImpl::Abort). But this is one operation that happens in multi-threads. At the same time, maybe some task start to running and happen some error. Then they will return the bad status.
But the tasks are running for Scheduler, they will just return bad status and not change the running_task count. Because the code uses RETURN_NOT_OK.

What changes are included in this PR?

For any task, what status weather it returns, it will change the running_count before return.

Are these changes tested?

No. It is too hard to build ut.

Are there any user-facing changes?

No. But I am very shocked at hasn't this happened to anyone?

@wuzhoupei wuzhoupei requested a review from westonpace as a code owner January 15, 2025 09:20
Copy link

⚠️ GitHub issue #45266 has been automatically assigned in GitHub to PR creator.

@mapleFU
Copy link
Member

mapleFU commented Jan 15, 2025

Great! Also would it easy to add a test for this case?

@wuzhoupei
Copy link
Author

Great! Also would it easy to add a test for this case?

Actually, I think it is hard to add some tests for this case. Because It is a multi-threads case. If we wanna add a test about it, we need to make sure the Abort-operation's happen time and the task which in this time should be return error-status.

@zanmato1984
Copy link
Contributor

I'm thinking this might not be very necessary for the following two reasons:

  1. I think in the current acero design, once an error occurs in a task, this error will be propagated to the upmost ExecPlan, which will in turn call the Abort method of task scheduler, which will finish all the running tasks regardless of an individual task group's finished count being arguably "wrong".
  2. Even if we think the task group's finished count as "wrong" (once errors occur in a task group), and we want to make it "right" so that the task group will finish in a graceful way (it doesn't depend on the Abort), we have a problem of invoking the continuation. Generally a continuation of a task group shouldn't be called if some tasks of the group meet error - this is the case for current implementation because the task group is not considered "finished" (thanks to the "wrong" finished count). But this will change if we make the finished count "right" - the continuation will be (undesirably) invoked.

What do you think? @wuzhoupei

@wuzhoupei
Copy link
Author

I agree some of you. @zanmato1984
But I still think we need to change the task count when task get an error status. Maybe my word in old comment is not exact. The count is the num of running-tasks which I use the 'finished-tasks' in old comment.
So this case can be said like that:

  1. when the scheduler calling the Abort, if some tasks have started and not finish, this TaskGroup's status will be set as TaskGroupState::ALL_TASKS_STARTED.
  2. if the running-tasks at above get error, they will not change the num of running-tasks.
  3. but the must condition for scheduler calls the AbortFunc is the num of running-tasks is 0.
  4. so this case will let the AbortFunc can not be called, and then the ExecPlan will can not call the StopProducingImpl forever(StopProducingImpl will be called in AbortFunc).
    So I think we should change the count which belongs to scheduler whatever status the task finished with.

@zanmato1984
Copy link
Contributor

Hi @wuzhoupei , thank you for the further explanation. Yes you are right, the problem of abort continuation not being called exists if the task count is not correct. And by looking at the code, the task group continuation being called after task meets error seems to be by design allowed - it will first examine the somewhat internal error state to exist early.

I think we can proceed with this PR, I'll put my review comment on the code. Meanwhile, it will be very helpful to have a test cast about this. I can help if you have trouble on that.

@wuzhoupei
Copy link
Author

Thanks the review from @zanmato1984.
I will be very grateful if you have a good idea to test it and can add the test for this case.

@zanmato1984
Copy link
Contributor

Hi @wuzhoupei , I've committed two test cases that can stably reproduce the issue of abort continuation not being invoked after aborting. Along with them are some more fixes necessary. Will you take a look? Thanks.

Comment on lines +281 to +282
// Mark the current and remaining picked tasks as finished
for (size_t j = i; j < tasks.size(); ++j) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for the serial execution path to correctly set the task count.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So PostExecuteTask need execute the task even if failed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah this is pretty much the whole point of this change.

@@ -413,6 +421,8 @@ void TaskSchedulerImpl::Abort(AbortContinuationImpl impl) {
all_finished = false;
task_group.state_ = TaskGroupState::ALL_TASKS_STARTED;
}
} else if (task_group.state_ == TaskGroupState::ALL_TASKS_STARTED) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed for allowing the Abort to be called in a task itself, otherwise the abort continuation will be called twice.

I suppose the Abort function is not necessarily to be called inside a task body to trigger the issue. In other words, it could happen when Abort is called in a timing that is subtle enough.

Comment on lines +373 to +387
// PostExecuteTask must be called later if any error ocurres during task execution
// (including ScheduleMore), so we preserve the status.
auto status = [&]() {
RETURN_NOT_OK(ScheduleMore(thread_id, 1));
return ExecuteTask(thread_id, group_id, task_id, &task_group_finished);
}();

if (!status.ok()) {
task_group_finished = PostExecuteTask(thread_id, group_id);
}

if (task_group_finished) {
bool all_task_groups_finished = false;
return OnTaskGroupFinished(thread_id, group_id, &all_task_groups_finished);
RETURN_NOT_OK(
OnTaskGroupFinished(thread_id, group_id, &all_task_groups_finished));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made some refinement on top of your original change, PTAL.

@github-actions github-actions bot added awaiting committer review Awaiting committer review and removed awaiting review Awaiting review labels Jan 22, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants