-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-45371: [C++] Fix data race in SimpleRecordBatch::columns
#45372
base: main
Are you sure you want to change the base?
GH-45371: [C++] Fix data race in SimpleRecordBatch::columns
#45372
Conversation
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cpp/src/arrow/record_batch_test.cc
Outdated
auto schema = ::arrow::schema({field("f1", utf8())}); | ||
auto record_batch = RecordBatch::Make(schema, length, {array_data}); | ||
std::atomic_bool start_flag{false}; | ||
std::thread t([record_batch, &start_flag]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should more than one thread be tested here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should use several threads that would do the same thing concurrently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the current test the race is between t
and the main thread. Only 2 threads are necessary to produce the data race.
cpp/src/arrow/record_batch_test.cc
Outdated
auto schema = ::arrow::schema({field("f1", utf8())}); | ||
auto record_batch = RecordBatch::Make(schema, length, {array_data}); | ||
std::atomic_bool start_flag{false}; | ||
std::thread t([record_batch, &start_flag]() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this should use several threads that would do the same thing concurrently.
std::thread t([record_batch, &start_flag]() { | ||
start_flag.store(true); | ||
auto columns = record_batch->columns(); | ||
ASSERT_EQ(columns.size(), 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this test? boxed_columns_
is presized in the constructor, so this should always succeed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this assertion should always pass. The purpose is to ensure that columns
is not optimized out. This test will either produce a TSAN warning or not, but there is no assertion that will fail consistently under the data race.
|
||
random::RandomArrayGenerator gen(42); | ||
std::shared_ptr<ArrayData> array_data = gen.ArrayOf(utf8(), length)->data(); | ||
auto schema = ::arrow::schema({field("f1", utf8())}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There could also be several fields and the worker thread would call column(i)
several times with i
being a random number. Something like (untested):
constexpr int kNumFields = 40;
constexpr int kNumThreads = 50;
auto schema = ::arrow::schema(FieldVector(kNumFields, field("f1", utf8())));
auto batch = RecordBatch::Make(schema, length, ArrayDataVector(kNumFields, array_data));
std::random_device rd;
std::vector<std::threads> threads(kNumThreads);
for (auto& thread : threads) {
const auto seed = rd();
thread = std::thread([&]() {
std::default_engine rng(seed);
std::uniform_int_distribution<int> field_dist(0, kNumFields - 1);
for (int i = 0; i < kNumFields; ++i) {
ASSERT_NE(nullptr, batch->column(field_dist(rng)));
}
});
}
for (auto& thread : threads) {
thread.join();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The data race only appears when columns()
is called because it allows non-atomic reads to boxed_columns_
. The column(i)
function alone is thread safe because it only uses atomics. It will also never return nullptr
.
87216a6
to
41e23d7
Compare
Thanks for taking the time to review my PR! Let me just clarify the intent behind my test case. It's a minimal example that produces a data race detected by TSAN. The data race only needs to occur between 2 threads, in this case I use the main thread and
Now, we simultaneously have T1 reading There isn't an assertion that will catch this case because it is impossible for Test output
Running the test again with the proposed fix shows no data race. The Not sure if it makes sense to have a test that only works under TSAN, but I don't think there is any way to surface the bug consistently without tooling. |
Rationale for this change
GH-45371
What changes are included in this PR?
Use
std::atomic_compare_exchange
to initializeboxed_columns_[i]
so they are correctly written only once. This means that a reference toboxed_columns_
is safe to read after each element has been initialized.Are these changes tested?
Yes, there is a test case
TestRecordBatch.ColumnsThreadSafety
which passes under TSAN.Are there any user-facing changes?
No
This PR contains a "Critical Fix".
Without this fix, concurrent calls to
SimpleRecordBatch::columns
could lead to an invalid memory access and crash.SimpleRecordBatch::columns
#45371