Skip to content

Commit

Permalink
enable parallel downloading for text and json
Browse files Browse the repository at this point in the history
  • Loading branch information
taiyang-li committed Sep 29, 2024
1 parent 96d4779 commit 845eb95
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 1 deletion.
46 changes: 46 additions & 0 deletions cpp-ch/local-engine/Storages/SubstraitSource/ReadBufferBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* limitations under the License.
*/

#include "IO/SharedThreadPools.h"
#include "config.h"

#include <memory>
Expand Down Expand Up @@ -72,6 +73,9 @@ namespace Setting
extern const SettingsUInt64 s3_max_redirects;
extern const SettingsBool s3_disable_checksum;
extern const SettingsUInt64 s3_retry_attempts;
extern const SettingsMaxThreads max_download_threads;
extern const SettingsUInt64 max_download_buffer_size;
extern const SettingsBool input_format_allow_seeks;
}
namespace ErrorCodes
{
Expand Down Expand Up @@ -170,6 +174,8 @@ class LocalFileReadBufferBuilder : public ReadBufferBuilder
explicit LocalFileReadBufferBuilder(DB::ContextPtr context_) : ReadBufferBuilder(context_) { }
~LocalFileReadBufferBuilder() override = default;

bool isRemote() const override { return false; }

std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position) override
{
Expand Down Expand Up @@ -794,13 +800,53 @@ std::unique_ptr<DB::ReadBuffer>
ReadBufferBuilder::buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position)
{
auto in = build(file_info, set_read_util_position);
in = wrapParallelReadBufferIfNeeded(file_info, std::move(in));

/// Wrap the read buffer with compression method if exists
Poco::URI file_uri(file_info.uri_file());
DB::CompressionMethod compression = DB::chooseCompressionMethod(file_uri.getPath(), "auto");
return compression != DB::CompressionMethod::None ? DB::wrapReadBufferWithCompressionMethod(std::move(in), compression) : std::move(in);
}

std::unique_ptr<DB::ReadBuffer> ReadBufferBuilder::wrapParallelReadBufferIfNeeded(
const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, std::unique_ptr<DB::ReadBuffer> in)
{
/// Only use parallel downloading for text and json format because data are read serially in those formats.
if (!file_info.has_text() && !file_info.has_json())
return std::move(in);

const auto & settings = context->getSettingsRef();
auto max_download_threads = settings[DB::Setting::max_download_threads];
auto max_download_buffer_size = settings[DB::Setting::max_download_buffer_size];
bool allow_seeks = settings[DB::Setting::input_format_allow_seeks];
bool parallel_read = isRemote() && max_download_threads > 1 && allow_seeks && isBufferWithFileSize(*in);

size_t file_size = 0;
if (parallel_read)
{
file_size = getFileSizeFromReadBuffer(*in);
parallel_read = file_size >= 2 * settings[DB::Setting::max_download_buffer_size];
}

if (parallel_read)
{
LOG_TRACE(
getLogger("ReadBufferBuilder"),
"Using ParallelReadBuffer with {} workers with chunks of {} bytes",
max_download_threads,
max_download_buffer_size);

return wrapInParallelReadBufferIfSupported(
{std::move(in)},
DB::threadPoolCallbackRunnerUnsafe<void>(DB::getIOThreadPool().get(), "ParallelRead"),
max_download_threads,
max_download_buffer_size,
file_size);
}

return std::move(in);
}


ReadBufferBuilderFactory & ReadBufferBuilderFactory::instance()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <memory>
#include <IO/ReadBuffer.h>
#include <substrait/plan.pb.h>
#include <IO/CompressionMethod.h>


namespace local_engine
Expand All @@ -32,6 +33,8 @@ class ReadBufferBuilder

virtual ~ReadBufferBuilder() = default;

virtual bool isRemote() const { return true; }

/// build a new read buffer
virtual std::unique_ptr<DB::ReadBuffer>
build(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false) = 0;
Expand All @@ -40,7 +43,11 @@ class ReadBufferBuilder
std::unique_ptr<DB::ReadBuffer> buildWithCompressionWrapper(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, bool set_read_util_position = false);

protected:
DB::ReadSettings getReadSettings(DB::ContextPtr context) const;
std::unique_ptr<DB::ReadBuffer>
wrapParallelReadBufferIfNeeded(const substrait::ReadRel::LocalFiles::FileOrFiles & file_info, std::unique_ptr<DB::ReadBuffer> in);

DB::ReadSettings
getReadSettings(DB::ContextPtr context) const;
DB::ContextPtr context;

public:
Expand Down

0 comments on commit 845eb95

Please sign in to comment.