Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-32276: [C++][FlightRPC] Align RecordBatch buffers given to IPC #44279

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

EnricoMi
Copy link
Contributor

@EnricoMi EnricoMi commented Oct 1, 2024

Rationale for this change

Data retrieved via IPC is expected to provide memory-aligned arrays, but data retrieved via C++ Flight client is mis-aligned. Datafusion (Rust), which requires proper alignment, cannot handle such data: #43552.

What changes are included in this PR?

This aligns RecordBatch array buffers decoded by IPC if mis-aligned according to the data type byte width.
Implementation mirrors that of align_buffers in arrow-rs (apache/arrow-rs#4681).

Are these changes tested?

Configuration flag tested in unit test.
Manually end-to-end tested that memory alignment fixes issue with reproduction code provided in #43552.

Are there any user-facing changes?

Memory alignment is checked and fixed by default. This is configurable via IpcReadOptions.ensure_memory_alignment.

@EnricoMi EnricoMi changed the title GH-32276: [C++][Flight] Align RecordBatch buffers retrieved via IPC GH-32276: [C++][FlightRPC] Align RecordBatch buffers retrieved via IPC Oct 1, 2024
@github-actions github-actions bot added the awaiting review Awaiting review label Oct 1, 2024
@EnricoMi EnricoMi changed the title GH-32276: [C++][FlightRPC] Align RecordBatch buffers retrieved via IPC GH-32276: [C++][FlightRPC] Align RecordBatch buffers given to IPC Oct 1, 2024
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Oct 2, 2024

@pitrou do you think this fix is viable?

@rok rok requested a review from lidavidm October 2, 2024 10:23
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for picking this up. This seems reasonable to me at a quick glance.

@@ -23,6 +23,7 @@
#include <memory>
#include <utility>
#include <vector>
#include <arrow/util/range.h>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put this include with the rest of the Arrow includes (and use quotes to be consistent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
}
// align children data recursively
for (unsigned int i=0; i<child_data.size(); i++) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could iterate with for (auto& child : child_data) and avoid the explicit index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

much better!

@@ -548,11 +548,16 @@ def test_read_options():
options = pa.ipc.IpcReadOptions()
assert options.use_threads is True
assert options.ensure_native_endian is True
assert options.ensure_memory_alignment is True
assert options.ens is True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels Oct 6, 2024
@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch from 77cc70a to a5d9e2d Compare October 7, 2024 10:57
@github-actions github-actions bot added Component: C++ Component: Python awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Oct 7, 2024
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Oct 7, 2024

While attempting to write some unit tests I found there is util::EnsureAlignment:

Result<std::shared_ptr<ArrayData>> EnsureAlignment(std::shared_ptr<ArrayData> array_data,
int64_t alignment,
MemoryPool* memory_pool) {
if (!CheckAlignment(*array_data, alignment)) {
std::vector<std::shared_ptr<Buffer>> buffers = array_data->buffers;
Type::type type_id = GetTypeForBuffers(*array_data);
for (size_t i = 0; i < buffers.size(); ++i) {
if (buffers[i]) {
int64_t expected_alignment = alignment;
if (alignment == kValueAlignment) {
expected_alignment =
RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
}
ARROW_ASSIGN_OR_RAISE(
buffers[i],
EnsureAlignment(std::move(buffers[i]), expected_alignment, memory_pool));
}
}
for (auto& it : array_data->child_data) {
ARROW_ASSIGN_OR_RAISE(it, EnsureAlignment(std::move(it), alignment, memory_pool));
}
if (array_data->type->id() == Type::DICTIONARY) {
ARROW_ASSIGN_OR_RAISE(
array_data->dictionary,
EnsureAlignment(std::move(array_data->dictionary), alignment, memory_pool));
}
auto new_array_data = ArrayData::Make(
array_data->type, array_data->length, std::move(buffers), array_data->child_data,
array_data->dictionary, array_data->GetNullCount(), array_data->offset);
return new_array_data;
} else {
return array_data;
}
}

I will try to reuse that method rather than re-implementing it. There is also test infrastructure for misaligned array data.

Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rebase? Tests appear to be failing

auto batch = RecordBatch::Make(std::move(filtered_schema), metadata->length(),
std::move(filtered_columns));
if (context.options.ensure_memory_alignment) {
return util::EnsureAlignment(batch, arrow::util::kValueAlignment, default_memory_pool());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the memory pool in context.options.memory_pool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Ideally we should use the buffer's memory manager rather than the default CPU manager:

static std::unique_ptr<PoolBuffer> MakeUnique(MemoryPool* pool, int64_t alignment) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::make_unique<PoolBuffer>(std::move(mm), pool, alignment);
}

@github-actions github-actions bot added awaiting changes Awaiting changes awaiting review Awaiting review awaiting committer review Awaiting committer review and removed awaiting change review Awaiting change review awaiting review Awaiting review awaiting changes Awaiting changes labels Oct 14, 2024
@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch 2 times, most recently from 960cb21 to 9909f13 Compare October 22, 2024 08:55
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Oct 22, 2024

Test arrow-ipc-read-write-test fails with a SIGSEGV in line 45 of

Type::type GetTypeForBuffers(const ArrayData& array) {
Type::type type_id = array.type->storage_id();
if (type_id == Type::DICTIONARY) {
return ::arrow::internal::checked_pointer_cast<DictionaryType>(array.type)
->index_type()
->id();
}
return type_id;
}

https://github.com/apache/arrow/actions/runs/11462607112/job/31894398411?pr=44279#step:13:1548

That test complains a lot about /home/enrico/Work/git/arrow/cpp/src/arrow/status.cc:152: Invalid: RequiredValueAlignmentForBuffer called with invalid type id 29 due to RequiredValueAlignmentForBuffer being called for DICTIONARY data type in line 62:

bool CheckSelfAlignment(const ArrayData& array, int64_t alignment) {
if (alignment == kValueAlignment) {
Type::type type_id = GetTypeForBuffers(array);
for (std::size_t i = 0; i < array.buffers.size(); i++) {
if (array.buffers[i]) {
int expected_alignment =
RequiredValueAlignmentForBuffer(type_id, static_cast<int>(i));
if (!CheckAlignment(*array.buffers[i], expected_alignment)) {
return false;
}
}
}
} else {
for (const auto& buffer : array.buffers) {
if (buffer) {
if (!CheckAlignment(*buffer, alignment)) return false;
}
}
}
return true;
}

Looks like util::EnsureAlignment and util::CheckAlignment is not quite ready for this use case.

@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch from f2dae5b to d1219d2 Compare October 22, 2024 15:50
@EnricoMi
Copy link
Contributor Author

@westonpace @sanjibansg
Any pointers in which situations array.type can be null (causing GetDtypeForBuffers to segfault)?
Any rational on why RequiredValueAlignmentForBuffer does not support DICTIONARY data type?
#44279 (comment)

@rok rok requested review from lidavidm and felipecrv November 7, 2024 11:53
@rok rok requested a review from mapleFU November 7, 2024 12:24
Copy link
Member

@lidavidm lidavidm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is reasonable and we just need to update EnsureAlignment to cover DICTIONARY

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting committer review Awaiting committer review labels Nov 14, 2024
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels Dec 3, 2024
@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch from cc4facf to a1ad4da Compare December 3, 2024 16:46
@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 3, 2024

Problem was that tests define an ExtensionType with storage type DICTIONARY, which caused an invalid cast, returning an empty pointer that was dereferenced. That has been fixes in a1ad4da.

This means that user code that defines ExtensionTypes with other storage types than DICTIONARY may see unexpected behaviour. What would be a good default behaviour in GetTypeForBuffer? Return Type::UINT8 to not realign those unknown data types? Or how can we tell from an unknown ExtensionTypes what to return by GetTypeForBuffers?

@lidavidm
Copy link
Member

lidavidm commented Dec 6, 2024

I'm not sure I follow. An extension type should be treated the same as its storage type. I think the problem with the current code is that it blindly casts the array type to the storage type, not accounting for the fact that the point of the storage type is to allow the extension type.

@EnricoMi
Copy link
Contributor Author

EnricoMi commented Dec 6, 2024

Any non-dictionary type (including extension types) therefore should be covered with array.type->storage_id().
Any dictionary extension types are expected to provide a DictionaryType via array.type->storage_type(), so they are covered with array.type->storage_type()->index_type()->id().

Than that fix should be sound and safe.

Type::type GetTypeForBuffers(const ArrayData& array) {
Type::type type_id = array.type->storage_id();
if (type_id == Type::DICTIONARY) {
// return index type id, provided by the DictionaryType array.type or
// array.type->storage_type() if array.type is an ExtensionType
std::shared_ptr<DataType> dict_type = array.type;
if (array.type->id() == Type::EXTENSION) {
dict_type = checked_pointer_cast<ExtensionType>(array.type)->storage_type();
}
return checked_pointer_cast<DictionaryType>(dict_type)->index_type()->id();
}
return type_id;
}

@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch from eb25914 to 7a01029 Compare December 6, 2024 09:21
@EnricoMi EnricoMi force-pushed the flight-client-align-buffers branch from 7a01029 to 394e1ae Compare January 21, 2025 09:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants