Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-15233: [C++] Fix CopyFiles when destination is a FileSystem with background_writes #44897

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions cpp/src/arrow/filesystem/azurefs_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,30 @@ class TestGeneric : public ::testing::Test, public GenericFileSystemTest {
// builddir/main/../../threads.c:580:10 #2 0x7fa914b1cd1e in xmlGetGlobalState
// builddir/main/../../threads.c:666:31
bool have_false_positive_memory_leak_with_generator() const override { return true; }
// This false positive leak is similar to the one pinpointed in the
// have_false_positive_memory_leak_with_generator() comments above,
// though the stack trace is different. It happens when a block list
// is committed from a background thread.
//
// clang-format off
// Direct leak of 968 byte(s) in 1 object(s) allocated from:
// #0 calloc
// #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4)
// #2 __xmlDefaultBufferSize
// #3 xmlBufferCreate
// #4 Azure::Storage::_internal::XmlWriter::XmlWriter()
// #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList
// #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList
// #7 arrow::fs::(anonymous namespace)::CommitBlockList
// #8 arrow::fs::(anonymous namespace)::ObjectAppendStream::FlushAsync()::'lambda'
// clang-format on
//
// TODO perhaps remove this skip once we can rely on
// https://github.com/Azure/azure-sdk-for-cpp/pull/5767
//
// Also note that ClickHouse has a workaround for a similar issue:
// https://github.com/ClickHouse/ClickHouse/pull/45796
bool have_false_positive_memory_leak_with_async_close() const override { return true; }

BaseAzureEnv* env_;
std::shared_ptr<AzureFileSystem> azure_fs_;
Expand Down Expand Up @@ -1536,29 +1560,7 @@ class TestAzureFileSystem : public ::testing::Test {

void TestOpenOutputStreamCloseAsync() {
#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
// This false positive leak is similar to the one pinpointed in the
// have_false_positive_memory_leak_with_generator() comments above,
// though the stack trace is different. It happens when a block list
// is committed from a background thread.
//
// clang-format off
// Direct leak of 968 byte(s) in 1 object(s) allocated from:
// #0 calloc
// #1 (/lib/x86_64-linux-gnu/libxml2.so.2+0xe25a4)
// #2 __xmlDefaultBufferSize
// #3 xmlBufferCreate
// #4 Azure::Storage::_internal::XmlWriter::XmlWriter()
// #5 Azure::Storage::Blobs::_detail::BlockBlobClient::CommitBlockList
// #6 Azure::Storage::Blobs::BlockBlobClient::CommitBlockList
// #7 arrow::fs::(anonymous namespace)::CommitBlockList
// #8 arrow::fs::(anonymous namespace)::ObjectAppendStream::FlushAsync()::'lambda'
// clang-format on
//
// TODO perhaps remove this skip once we can rely on
// https://github.com/Azure/azure-sdk-for-cpp/pull/5767
//
// Also note that ClickHouse has a workaround for a similar issue:
// https://github.com/ClickHouse/ClickHouse/pull/45796
// See comment about have_false_positive_memory_leak_with_generator above.
if (options_.background_writes) {
GTEST_SKIP() << "False positive memory leak in libxml2 with CloseAsync";
}
Expand Down
36 changes: 29 additions & 7 deletions cpp/src/arrow/filesystem/filesystem.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,9 +630,12 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
destinations.size(), " paths.");
}

auto copy_one_file = [&](int i) {
if (sources[i].filesystem->Equals(destinations[i].filesystem)) {
return sources[i].filesystem->CopyFile(sources[i].path, destinations[i].path);
auto copy_one_file = [&](size_t i,
const FileLocator& source_file_locator) -> Result<Future<>> {
if (source_file_locator.filesystem->Equals(destinations[i].filesystem)) {
RETURN_NOT_OK(source_file_locator.filesystem->CopyFile(source_file_locator.path,
destinations[i].path));
return Future<>::MakeFinished();
}

ARROW_ASSIGN_OR_RAISE(auto source,
Expand All @@ -642,12 +645,31 @@ Status CopyFiles(const std::vector<FileLocator>& sources,
ARROW_ASSIGN_OR_RAISE(auto destination, destinations[i].filesystem->OpenOutputStream(
destinations[i].path, metadata));
RETURN_NOT_OK(internal::CopyStream(source, destination, chunk_size, io_context));
return destination->Close();
// Using the blocking Close() here can cause reduced performance and deadlocks because
// FileSystem implementations that implement background_writes need to queue and wait
// for other IO thread(s). There is a risk that most or all the threads in the IO
// thread pool are blocking on a call Close(), leaving no IO threads left to actually
// fulfil the background writes.
return destination->CloseAsync();
};

return ::arrow::internal::OptionalParallelFor(
use_threads, static_cast<int>(sources.size()), std::move(copy_one_file),
io_context.executor());
// Spawn copy_one_file less urgently than default, so that background_writes are done
// with higher priority. Otherwise copy_one_file will keep buffering more data in memory
// without giving the background_writes any chance to upload the data and drop it from
// memory. Therefore, without this large copies would cause OOMs.
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved
TaskHints hints{10};
auto future = ::arrow::internal::OptionalParallelForAsync(
use_threads, sources, std::move(copy_one_file), io_context.executor(), hints);

// Wait for all the copy_one_file instances to complete.
ARROW_ASSIGN_OR_RAISE(auto copy_close_async_future, future.result());

// Wait for all the futures returned by copy_one_file to complete. When the destination
// filesystem uses background_writes this is when most of the upload happens.
for (const auto& result : copy_close_async_future) {
result.Wait();
}
return Status::OK();
}

Status CopyFiles(const std::shared_ptr<FileSystem>& source_fs,
Expand Down
62 changes: 62 additions & 0 deletions cpp/src/arrow/filesystem/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,67 @@ void GenericFileSystemTest::TestCopyFile(FileSystem* fs) {
AssertAllFiles(fs, {"AB/abc", "EF/ghi", "def"});
}

void GenericFileSystemTest::TestCopyFiles(FileSystem* fs) {
#if defined(ADDRESS_SANITIZER) || defined(ARROW_VALGRIND)
if (have_false_positive_memory_leak_with_async_close()) {
GTEST_SKIP() << "Filesystem have false positive memory leak with generator";
}
#endif
auto io_thread_pool =
static_cast<arrow::internal::ThreadPool*>(fs->io_context().executor());
auto original_threads = io_thread_pool->GetCapacity();
// Needs to be smaller than the number of files we test with to catch GH-15233
ASSERT_OK(io_thread_pool->SetCapacity(2));
// Ensure the thread pool capacity is set back to the original value after the test
auto reset_thread_pool = [io_thread_pool, original_threads](void*) {
ASSERT_OK(io_thread_pool->SetCapacity(original_threads));
};
std::unique_ptr<void, decltype(reset_thread_pool)> reset_thread_pool_guard(
nullptr, reset_thread_pool);

auto mock_fs = std::make_shared<arrow::fs::internal::MockFileSystem>(
std::chrono::system_clock::now());
std::vector<std::string> dirs0{"0", "0/AB", "0/AB/CD"};
std::map<std::string, std::string> files0{
{"0/123", "123 data"}, {"0/AB/abc", "abc data"}, {"0/AB/CD/def", "def data"}};

std::vector<std::string> dirs0and1{"0", "0/AB", "0/AB/CD", "1", "1/AB", "1/AB/CD"};
std::map<std::string, std::string> files0and1{
{"0/123", "123 data"}, {"0/AB/abc", "abc data"}, {"0/AB/CD/def", "def data"},
{"1/123", "123 data"}, {"1/AB/abc", "abc data"}, {"1/AB/CD/def", "def data"}};

ASSERT_OK(mock_fs->CreateDir("0/AB/CD"));
for (const auto& kv : files0) {
CreateFile(mock_fs.get(), kv.first, kv.second);
}

auto selector0 = arrow::fs::FileSelector{};
selector0.base_dir = "0";
selector0.recursive = true;

ASSERT_OK(CopyFiles(mock_fs, selector0, fs->shared_from_this(), "0"));
AssertAllDirs(fs, dirs0);
for (const auto& kv : files0) {
AssertFileContents(fs, kv.first, kv.second);
}

ASSERT_OK(CopyFiles(fs->shared_from_this(), selector0, fs->shared_from_this(), "1"));
AssertAllDirs(fs, dirs0and1);
for (const auto& kv : files0and1) {
AssertFileContents(fs, kv.first, kv.second);
}

auto selector1 = arrow::fs::FileSelector{};
selector1.base_dir = "1";
selector1.recursive = true;

ASSERT_OK(CopyFiles(fs->shared_from_this(), selector1, mock_fs, "1"));
AssertAllDirs(mock_fs.get(), dirs0and1);
for (const auto& kv : files0and1) {
AssertFileContents(mock_fs.get(), kv.first, kv.second);
}
}

void GenericFileSystemTest::TestGetFileInfo(FileSystem* fs) {
ASSERT_OK(fs->CreateDir("AB/CD/EF"));
CreateFile(fs, "AB/CD/ghi", "some data");
Expand Down Expand Up @@ -1212,6 +1273,7 @@ GENERIC_FS_TEST_DEFINE(TestDeleteFiles)
GENERIC_FS_TEST_DEFINE(TestMoveFile)
GENERIC_FS_TEST_DEFINE(TestMoveDir)
GENERIC_FS_TEST_DEFINE(TestCopyFile)
GENERIC_FS_TEST_DEFINE(TestCopyFiles)
GENERIC_FS_TEST_DEFINE(TestGetFileInfo)
GENERIC_FS_TEST_DEFINE(TestGetFileInfoVector)
GENERIC_FS_TEST_DEFINE(TestGetFileInfoSelector)
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/filesystem/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
void TestMoveFile();
void TestMoveDir();
void TestCopyFile();
void TestCopyFiles();
void TestGetFileInfo();
void TestGetFileInfoVector();
void TestGetFileInfoSelector();
Expand Down Expand Up @@ -189,6 +190,8 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
virtual bool have_file_metadata() const { return false; }
// - Whether the filesystem has a false positive memory leak with generator
virtual bool have_false_positive_memory_leak_with_generator() const { return false; }
// - Whether the filesystem has a false positive memory leak in async close
virtual bool have_false_positive_memory_leak_with_async_close() const { return false; }

void TestEmpty(FileSystem* fs);
void TestNormalizePath(FileSystem* fs);
Expand All @@ -201,6 +204,7 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
void TestMoveFile(FileSystem* fs);
void TestMoveDir(FileSystem* fs);
void TestCopyFile(FileSystem* fs);
void TestCopyFiles(FileSystem* fs);
void TestGetFileInfo(FileSystem* fs);
void TestGetFileInfoVector(FileSystem* fs);
void TestGetFileInfoSelector(FileSystem* fs);
Expand Down Expand Up @@ -233,6 +237,7 @@ class ARROW_TESTING_EXPORT GenericFileSystemTest {
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, MoveFile) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, MoveDir) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, CopyFile) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, CopyFiles) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfo) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoVector) \
GENERIC_FS_TEST_FUNCTION(TEST_MACRO, TEST_CLASS, GetFileInfoSelector) \
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ Status SetIOThreadPoolCapacity(int threads) {
FileInterface::~FileInterface() = default;

Future<> FileInterface::CloseAsync() {
return DeferNotOk(
default_io_context().executor()->Submit([this]() { return Close(); }));
return DeferNotOk(default_io_context().executor()->Submit(
[self = shared_from_this()]() { return self->Close(); }));
Tom-Newton marked this conversation as resolved.
Show resolved Hide resolved
}

Status FileInterface::Abort() { return Close(); }
Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/util/parallel.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ Status ParallelFor(int num_tasks, FUNCTION&& func,

template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> ParallelForAsync(
std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
Future<std::vector<R>> ParallelForAsync(std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool(),
TaskHints hints = TaskHints{}) {
std::vector<Future<R>> futures(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(futures[i], executor->Submit(func, i, std::move(inputs[i])));
ARROW_ASSIGN_OR_RAISE(futures[i],
executor->Submit(hints, func, i, std::move(inputs[i])));
}
return All(std::move(futures))
.Then([](const std::vector<Result<R>>& results) -> Result<std::vector<R>> {
Expand Down Expand Up @@ -86,9 +87,10 @@ template <class FUNCTION, typename T,
typename R = typename internal::call_traits::return_type<FUNCTION>::ValueType>
Future<std::vector<R>> OptionalParallelForAsync(
bool use_threads, std::vector<T> inputs, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
Executor* executor = internal::GetCpuThreadPool(), TaskHints hints = TaskHints{}) {
if (use_threads) {
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor);
return ParallelForAsync(std::move(inputs), std::forward<FUNCTION>(func), executor,
hints);
} else {
std::vector<R> result(inputs.size());
for (size_t i = 0; i < inputs.size(); ++i) {
Expand Down
Loading
Loading