Skip to content

Commit

Permalink
Merge pull request ClickHouse#61923 from azat/local-input
Browse files Browse the repository at this point in the history
Implement input() for clickhouse-local
  • Loading branch information
yakov-olkhovskiy authored Apr 10, 2024
2 parents a1dc43c + 469be8e commit d554dc6
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 12 deletions.
6 changes: 5 additions & 1 deletion programs/client/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,11 @@ bool Client::processWithFuzzing(const String & full_query)
try
{
const char * begin = full_query.data();
orig_ast = parseQuery(begin, begin + full_query.size(), true);
orig_ast = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(),
/*allow_multi_statements=*/ true,
/*is_interactive=*/ is_interactive,
/*ignore_error=*/ ignore_error);
}
catch (const Exception & e)
{
Expand Down
14 changes: 13 additions & 1 deletion programs/local/LocalServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,20 @@ void LocalServer::setupUsers()
void LocalServer::connect()
{
connection_parameters = ConnectionParameters(config(), "localhost");

ReadBuffer * in;
auto table_file = config().getString("table-file", "-");
if (table_file == "-" || table_file == "stdin")
{
in = &std_in;
}
else
{
input = std::make_unique<ReadBufferFromFile>(table_file);
in = input.get();
}
connection = LocalConnection::createConnection(
connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name);
connection_parameters, global_context, in, need_render_progress, need_render_profile_events, server_display_name);
}


Expand Down
2 changes: 2 additions & 0 deletions programs/local/LocalServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ class LocalServer : public ClientBase, public Loggers

std::optional<StatusFile> status;
std::optional<std::filesystem::path> temporary_directory_to_delete;

std::unique_ptr<ReadBufferFromFile> input;
};

}
19 changes: 13 additions & 6 deletions src/Client/ClientBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,11 @@ void ClientBase::setupSignalHandler()
}


ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const
ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements, bool is_interactive, bool ignore_error)
{
std::unique_ptr<IParserBase> parser;
ASTPtr res;

const auto & settings = global_context->getSettingsRef();
size_t max_length = 0;

if (!allow_multi_statements)
Expand All @@ -343,11 +342,11 @@ ASTPtr ClientBase::parseQuery(const char *& pos, const char * end, bool allow_mu
const Dialect & dialect = settings.dialect;

if (dialect == Dialect::kusto)
parser = std::make_unique<ParserKQLStatement>(end, global_context->getSettings().allow_settings_after_format_in_insert);
parser = std::make_unique<ParserKQLStatement>(end, settings.allow_settings_after_format_in_insert);
else if (dialect == Dialect::prql)
parser = std::make_unique<ParserPRQLQuery>(max_length, settings.max_parser_depth, settings.max_parser_backtracks);
else
parser = std::make_unique<ParserQuery>(end, global_context->getSettings().allow_settings_after_format_in_insert);
parser = std::make_unique<ParserQuery>(end, settings.allow_settings_after_format_in_insert);

if (is_interactive || ignore_error)
{
Expand Down Expand Up @@ -916,7 +915,11 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
/// Some parts of a query (result output and formatting) are executed
/// client-side. Thus we need to parse the query.
const char * begin = full_query.data();
auto parsed_query = parseQuery(begin, begin + full_query.size(), false);
auto parsed_query = parseQuery(begin, begin + full_query.size(),
global_context->getSettingsRef(),
/*allow_multi_statements=*/ false,
is_interactive,
ignore_error);

if (!parsed_query)
return;
Expand Down Expand Up @@ -2089,7 +2092,11 @@ MultiQueryProcessingStage ClientBase::analyzeMultiQueryText(
this_query_end = this_query_begin;
try
{
parsed_query = parseQuery(this_query_end, all_queries_end, true);
parsed_query = parseQuery(this_query_end, all_queries_end,
global_context->getSettingsRef(),
/*allow_multi_statements=*/ true,
is_interactive,
ignore_error);
}
catch (Exception & e)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Client/ClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class ClientBase : public Poco::Util::Application, public IHints<2>
void init(int argc, char ** argv);

std::vector<String> getAllRegisteredNames() const override { return cmd_options; }
static ASTPtr parseQuery(const char *& pos, const char * end, const Settings & settings, bool allow_multi_statements, bool is_interactive, bool ignore_error);

protected:
void runInteractive();
Expand All @@ -98,7 +99,6 @@ class ClientBase : public Poco::Util::Application, public IHints<2>
ASTPtr parsed_query, std::optional<bool> echo_query_ = {}, bool report_error = false);

static void adjustQueryEnd(const char *& this_query_end, const char * all_queries_end, uint32_t max_parser_depth, uint32_t max_parser_backtracks);
ASTPtr parseQuery(const char *& pos, const char * end, bool allow_multi_statements) const;
static void setupSignalHandler();

bool executeMultiQuery(const String & all_queries_text);
Expand Down
64 changes: 62 additions & 2 deletions src/Client/LocalConnection.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
#include "LocalConnection.h"
#include <memory>
#include <Client/ClientBase.h>
#include <Core/Protocol.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/executeQuery.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <QueryPipeline/QueryPipeline.h>
#include <QueryPipeline/Pipe.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/IStorage.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentThread.h>
Expand All @@ -22,12 +29,13 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}

LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
LocalConnection::LocalConnection(ContextPtr context_, ReadBuffer * in_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
: WithContext(context_)
, session(getContext(), ClientInfo::Interface::LOCAL)
, send_progress(send_progress_)
, send_profile_events(send_profile_events_)
, server_display_name(server_display_name_)
, in(in_)
{
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
Expand Down Expand Up @@ -130,6 +138,57 @@ void LocalConnection::sendQuery(

next_packet_type.reset();

/// Prepare input() function
query_context->setInputInitializer([this] (ContextPtr context, const StoragePtr & input_storage)
{
if (context != query_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in Input initializer");

auto metadata_snapshot = input_storage->getInMemoryMetadataPtr();
Block sample = metadata_snapshot->getSampleBlock();

next_packet_type = Protocol::Server::Data;
state->block = sample;

String current_format = "Values";
const char * begin = state->query.data();
auto parsed_query = ClientBase::parseQuery(begin, begin + state->query.size(),
context->getSettingsRef(),
/*allow_multi_statements=*/ false,
/*is_interactive=*/ false,
/*ignore_error=*/ false);
if (const auto * insert = parsed_query->as<ASTInsertQuery>())
{
if (!insert->format.empty())
current_format = insert->format;
}

auto source = context->getInputFormat(current_format, *in, sample, context->getSettingsRef().max_insert_block_size);
Pipe pipe(source);

auto columns_description = metadata_snapshot->getColumns();
if (columns_description.hasDefaults())
{
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<AddingDefaultsTransform>(header, columns_description, *source, context);
});
}

state->input_pipeline = std::make_unique<QueryPipeline>(std::move(pipe));
state->input_pipeline_executor = std::make_unique<PullingAsyncPipelineExecutor>(*state->input_pipeline);

});
query_context->setInputBlocksReaderCallback([this] (ContextPtr context) -> Block
{
if (context != query_context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected context in InputBlocksReader");

Block block;
state->input_pipeline_executor->pull(block);
return block;
});

try
{
state->io = executeQuery(state->query, query_context, QueryFlags{}, state->stage).second;
Expand Down Expand Up @@ -537,11 +596,12 @@ void LocalConnection::sendMergeTreeReadTaskResponse(const ParallelReadResponse &
ServerConnectionPtr LocalConnection::createConnection(
const ConnectionParameters &,
ContextPtr current_context,
ReadBuffer * in,
bool send_progress,
bool send_profile_events,
const String & server_display_name)
{
return std::make_unique<LocalConnection>(current_context, send_progress, send_profile_events, server_display_name);
return std::make_unique<LocalConnection>(current_context, in, send_progress, send_profile_events, server_display_name);
}


Expand Down
15 changes: 14 additions & 1 deletion src/Client/LocalConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ namespace DB
class PullingAsyncPipelineExecutor;
class PushingAsyncPipelineExecutor;
class PushingPipelineExecutor;
class QueryPipeline;
class ReadBuffer;

/// State of query processing.
struct LocalQueryState
Expand All @@ -31,6 +33,10 @@ struct LocalQueryState
std::unique_ptr<PullingAsyncPipelineExecutor> executor;
std::unique_ptr<PushingPipelineExecutor> pushing_executor;
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
/// For sending data for input() function.
std::unique_ptr<QueryPipeline> input_pipeline;
std::unique_ptr<PullingAsyncPipelineExecutor> input_pipeline_executor;

InternalProfileEventsQueuePtr profile_queue;

std::unique_ptr<Exception> exception;
Expand Down Expand Up @@ -64,7 +70,11 @@ class LocalConnection : public IServerConnection, WithContext
{
public:
explicit LocalConnection(
ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = "");
ContextPtr context_,
ReadBuffer * in_,
bool send_progress_,
bool send_profile_events_,
const String & server_display_name_);

~LocalConnection() override;

Expand All @@ -73,6 +83,7 @@ class LocalConnection : public IServerConnection, WithContext
static ServerConnectionPtr createConnection(
const ConnectionParameters & connection_parameters,
ContextPtr current_context,
ReadBuffer * in = nullptr,
bool send_progress = false,
bool send_profile_events = false,
const String & server_display_name = "");
Expand Down Expand Up @@ -158,5 +169,7 @@ class LocalConnection : public IServerConnection, WithContext
String current_database;

ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;

ReadBuffer * in;
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# foo
foo
# !foo
# bar
bar
# defaults
bam
20 changes: 20 additions & 0 deletions tests/queries/0_stateless/03031_clickhouse_local_input.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

tmp_file="$CUR_DIR/$CLICKHOUSE_DATABASE.txt"
echo '# foo'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -n -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select * from input('x String') format LineAsString" <<<foo
cat "$tmp_file"
echo '# !foo'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -n -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select * from input('x String') where x != 'foo' format LineAsString" <<<foo
cat "$tmp_file"
echo '# bar'
$CLICKHOUSE_LOCAL --engine_file_truncate_on_insert=1 -n -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select y from input('x String, y String') format TSV" <<<$'foo\tbar'
cat "$tmp_file"
echo '# defaults'
$CLICKHOUSE_LOCAL --input_format_tsv_empty_as_default=1 --engine_file_truncate_on_insert=1 -n -q "insert into function file('$tmp_file', 'LineAsString', 'x String') select y from input('x String, y String DEFAULT \\'bam\\'') format TSV" <<<$'foo\t'
cat "$tmp_file"
rm -f "${tmp_file:?}"

0 comments on commit d554dc6

Please sign in to comment.