Skip to content

Commit

Permalink
Merge branch 'develop' into bugfix/issue-239-fix-client-cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
yokofly authored Nov 9, 2023
2 parents 98af57a + 47cdcf0 commit 9c63a6b
Show file tree
Hide file tree
Showing 26 changed files with 208 additions and 68 deletions.
3 changes: 2 additions & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_streaming_view_cached_block_bytes, 100 * 1024 * 1024, "Maximum bytes of block cached in streaming view", 0) \
M(UInt64, keep_windows, 0, "How many streaming windows to keep from recycling", 0) \
M(String, seek_to, "", "Seeking to an offset of the streaming/historical store to seek", 0) \
M(Bool, enable_backfill_from_historical_store, false, "Enable backfill data from historical data store", 0) \
M(Bool, enable_backfill_from_historical_store, true, "Enable backfill data from historical data store", 0) \
M(Bool, emit_aggregated_during_backfill, true, "Enable emit intermediate aggr result during backfill historical data", 0) \
M(Bool, include_internal_streams, false, "Show internal streams on SHOW streams query.", 0) \
// End of GLOBAL_SETTINGS

Expand Down
42 changes: 35 additions & 7 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -890,8 +890,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
}

/// proton: starts
checkForStreamingQuery();
checkUDA();
finalCheckAndOptimizeForStreamingQuery();

if (query_info.projection)
storage_snapshot->addProjection(query_info.projection->desc);
Expand Down Expand Up @@ -3363,7 +3362,7 @@ bool InterpreterSelectQuery::shouldKeepState() const
return false;
}

void InterpreterSelectQuery::checkForStreamingQuery() const
void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
{
if (isStreaming())
{
Expand All @@ -3375,6 +3374,29 @@ void InterpreterSelectQuery::checkForStreamingQuery() const
if (proxy->hasGlobalAggregation())
throw Exception("Streaming query doesn't support window func over a global aggregation", ErrorCodes::NOT_IMPLEMENTED);
}

/// For now, for the following scenarios, we disable backfill from historic data store
/// 1) User select some virtual columns which is only available in streaming store, like `_tp_sn`, `_tp_index_time`
/// 2) Seek by streaming store sequence number
/// 3) Replaying a stream.
/// TODO, ideally we shall check if historical data store has `_tp_sn` etc columns, if they have, we can backfill from
/// the historical data store as well technically. This will be a future enhancement.
const auto & settings = context->getSettingsRef();
if (settings.enable_backfill_from_historical_store.value)
{
bool has_streaming_only_virtual_columns = std::ranges::any_of(required_columns, [](const auto & name) {
return name == ProtonConsts::RESERVED_EVENT_SEQUENCE_ID || name == ProtonConsts::RESERVED_APPEND_TIME
|| name == ProtonConsts::RESERVED_INGEST_TIME || name == ProtonConsts::RESERVED_PROCESS_TIME;
});
bool seek_by_sn = !query_info.seek_to_info->getSeekTo().empty() && !query_info.seek_to_info->isTimeBased()
&& query_info.seek_to_info->getSeekTo() != "earliest";
if (has_streaming_only_virtual_columns || seek_by_sn || settings.replay_speed > 0)
context->setSetting("enable_backfill_from_historical_store", false);
}

/// Optimization: no requires backfill data in order for global aggregation with settings `emit_aggregated_during_backfill = false`.
if (!settings.emit_aggregated_during_backfill.value && hasGlobalAggregation())
query_info.require_in_order_backfill = false;
}
else
{
Expand All @@ -3399,6 +3421,8 @@ void InterpreterSelectQuery::checkForStreamingQuery() const
"Neither window_start nor window_end is referenced in the query, but streaming window function is used",
ErrorCodes::WINDOW_COLUMN_NOT_REFERENCED);
}

checkUDA();
}

void InterpreterSelectQuery::buildShufflingQueryPlan(QueryPlan & query_plan)
Expand Down Expand Up @@ -3432,12 +3456,16 @@ void InterpreterSelectQuery::buildWatermarkQueryPlan(QueryPlan & query_plan) con
{
assert(isStreaming());
auto params = std::make_shared<Streaming::WatermarkStamperParams>(
query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params);
query_info.query, query_info.syntax_analyzer_result, query_info.streaming_window_params);

bool skip_stamping_for_backfill_data = !context->getSettingsRef().emit_aggregated_during_backfill.value;

if (query_info.hasPartitionByKeys())
query_plan.addStep(
std::make_unique<Streaming::WatermarkStepWithSubstream>(query_plan.getCurrentDataStream(), std::move(params), log));
query_plan.addStep(std::make_unique<Streaming::WatermarkStepWithSubstream>(
query_plan.getCurrentDataStream(), std::move(params), skip_stamping_for_backfill_data, log));
else
query_plan.addStep(std::make_unique<Streaming::WatermarkStep>(query_plan.getCurrentDataStream(), std::move(params), log));
query_plan.addStep(std::make_unique<Streaming::WatermarkStep>(
query_plan.getCurrentDataStream(), std::move(params), skip_stamping_for_backfill_data, log));
}

void InterpreterSelectQuery::buildStreamingProcessingQueryPlanBeforeJoin(QueryPlan & query_plan)
Expand Down
2 changes: 1 addition & 1 deletion src/Interpreters/InterpreterSelectQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class InterpreterSelectQuery : public IInterpreterUnionOrSelectQuery
void executeStreamingPreLimit(QueryPlan & query_plan, bool do_not_skip_offset);
void executeStreamingLimit(QueryPlan & query_plan);
void executeStreamingOffset(QueryPlan & query_plan);
void checkForStreamingQuery() const;
void finalCheckAndOptimizeForStreamingQuery();
bool shouldKeepState() const;
void buildShufflingQueryPlan(QueryPlan & query_plan);
void buildWatermarkQueryPlan(QueryPlan & query_plan) const;
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ class Chunk
chunk_ctx->setCheckpointContext(nullptr);
}

bool isHistoricalDataStart() const { return chunk_ctx && chunk_ctx->isHistoricalDataStart(); }
bool isHistoricalDataEnd() const { return chunk_ctx && chunk_ctx->isHistoricalDataEnd(); }

/// Dummy interface to make RefCountBlockList happy
Int64 minTimestamp() const { return 0; }
Int64 maxTimestamp() const { return 0;}
Expand Down
38 changes: 38 additions & 0 deletions src/Processors/QueryPlan/ReadFromMergeTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@

/// proton : start
#include <Processors/Sources/MarkSource.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Common/ProtonCommon.h>
/// proton : ends

namespace ProfileEvents
Expand Down Expand Up @@ -1177,6 +1180,41 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
/// proton : starts. Add streaming source after historical source
if (create_streaming_source)
{
if (query_info.require_in_order_backfill)
{
/// TODO: support optimized order for the stream with ordered by `_tp_time`
/// Copy basic code from `SortingStep::fullSort`
/// Sorting backfilled historical data by ascending event time
SortDescription sort_desc;
sort_desc.emplace_back(ProtonConsts::RESERVED_EVENT_TIME, /*ascending*/ 1);

pipe.addSimpleTransform(
[&](const Block & header) -> ProcessorPtr { return std::make_shared<PartialSortingTransform>(header, sort_desc); });

pipe.addSimpleTransform([&](const Block & header) -> ProcessorPtr {
return std::make_shared<MergeSortingTransform>(
header,
sort_desc,
max_block_size,
/*limits*/ 0,
/// increase_sort_description_compile_attempts_current,
settings.max_bytes_before_remerge_sort / pipe.numOutputPorts(),
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
});

/// If there are several streams, then we merge them into one
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_desc, max_block_size, SortingQueueStrategy::Batch, /*limits*/ 0);

pipe.addTransform(std::move(transform));
}
}

Pipes pipes;

cur_header = pipe.getHeader();
Expand Down
10 changes: 7 additions & 3 deletions src/Processors/QueryPlan/Streaming/WatermarkStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,19 @@ DB::ITransformingStep::Traits getTraits()
}};
}
}
WatermarkStep::WatermarkStep(const DataStream & input_stream_, WatermarkStamperParamsPtr params_, Poco::Logger * log_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits()), params(std::move(params_)), log(log_)
WatermarkStep::WatermarkStep(
const DataStream & input_stream_, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, params(std::move(params_))
, skip_stamping_for_backfill_data(skip_stamping_for_backfill_data_)
, log(log_)
{
}

void WatermarkStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* settings */)
{
pipeline.addSimpleTransform([&](const Block & header) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
return std::make_shared<WatermarkTransform>(header, params, log);
return std::make_shared<WatermarkTransform>(header, params, skip_stamping_for_backfill_data, log);
});
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/Processors/QueryPlan/Streaming/WatermarkStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace Streaming
class WatermarkStep final : public ITransformingStep
{
public:
WatermarkStep(const DataStream & input_stream_, WatermarkStamperParamsPtr params_, Poco::Logger * log);
WatermarkStep(
const DataStream & input_stream_, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log);

~WatermarkStep() override = default;

Expand All @@ -20,6 +21,7 @@ class WatermarkStep final : public ITransformingStep

private:
WatermarkStamperParamsPtr params;
bool skip_stamping_for_backfill_data;
Poco::Logger * log;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@ DB::ITransformingStep::Traits getTraits()
}

WatermarkStepWithSubstream::WatermarkStepWithSubstream(
const DataStream & input_stream_, WatermarkStamperParamsPtr params_, Poco::Logger * log_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits()), params(std::move(params_)), log(log_)
const DataStream & input_stream_, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log_)
: ITransformingStep(input_stream_, input_stream_.header, getTraits())
, params(std::move(params_))
, skip_stamping_for_backfill_data(skip_stamping_for_backfill_data_)
, log(log_)
{
}

void WatermarkStepWithSubstream::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /* settings */)
{
pipeline.addSimpleTransform([&](const Block & header) { /// STYLE_CHECK_ALLOW_BRACE_SAME_LINE_LAMBDA
return std::make_shared<WatermarkTransformWithSubstream>(header, params, log);
return std::make_shared<WatermarkTransformWithSubstream>(header, params, skip_stamping_for_backfill_data, log);
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ namespace Streaming
class WatermarkStepWithSubstream final : public ITransformingStep
{
public:
WatermarkStepWithSubstream(const DataStream & input_stream_, WatermarkStamperParamsPtr params_, Poco::Logger * log);
WatermarkStepWithSubstream(
const DataStream & input_stream_, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log);

~WatermarkStepWithSubstream() override = default;

Expand All @@ -20,6 +21,7 @@ class WatermarkStepWithSubstream final : public ITransformingStep

private:
WatermarkStamperParamsPtr params;
bool skip_stamping_for_backfill_data;
Poco::Logger * log;
};
}
Expand Down
12 changes: 6 additions & 6 deletions src/Processors/Transforms/Streaming/AggregatingHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,16 @@ ChunkPair AggregatingHelper::convertWithoutKeyToChangelog(
ChunkPair AggregatingHelper::mergeAndConvertWithoutKeyToChangelog(
ManyAggregatedDataVariants & data, ManyRetractedDataVariants & retracted_data, const AggregatingTransformParams & params)
{
assert(data.at(0)->type == AggregatedDataVariants::Type::without_key);
assert(retracted_data.at(0)->type == AggregatedDataVariants::Type::without_key);

auto prepared_data = params.aggregator.prepareVariantsToMerge(data);
if (prepared_data->empty())
return {};

auto prepared_retracted_data = params.aggregator.prepareVariantsToMerge(retracted_data);
assert(!prepared_retracted_data->empty());

assert(prepared_data->at(0)->type == AggregatedDataVariants::Type::without_key);
assert(prepared_retracted_data->at(0)->type == AggregatedDataVariants::Type::without_key);

params.aggregator.mergeWithoutKeyDataImpl(*prepared_retracted_data, ConvertAction::RETRACTED_EMIT);
params.aggregator.mergeWithoutKeyDataImpl(*prepared_data, ConvertAction::STREAMING_EMIT);
return convertWithoutKeyToChangelog(*prepared_data->at(0), *prepared_retracted_data->at(0), params);
Expand Down Expand Up @@ -126,16 +126,16 @@ ChunkPair AggregatingHelper::convertSingleLevelToChangelog(
ChunkPair AggregatingHelper::mergeAndConvertSingleLevelToChangelog(
ManyAggregatedDataVariants & data, ManyRetractedDataVariants & retracted_data, const AggregatingTransformParams & params)
{
assert(data.at(0)->type != AggregatedDataVariants::Type::without_key && !data.at(0)->isTwoLevel());
assert(retracted_data.at(0)->type != AggregatedDataVariants::Type::without_key && !retracted_data.at(0)->isTwoLevel());

auto prepared_data = params.aggregator.prepareVariantsToMerge(data, /*always_merge_into_empty*/ true);
if (prepared_data->empty())
return {};

auto prepared_retracted_data = params.aggregator.prepareVariantsToMerge(retracted_data, /*always_merge_into_empty*/ true);
assert(!prepared_retracted_data->empty());

assert(prepared_data->at(0)->type != AggregatedDataVariants::Type::without_key && !prepared_data->at(0)->isTwoLevel());
assert(prepared_retracted_data->at(0)->type != AggregatedDataVariants::Type::without_key && !prepared_retracted_data->at(0)->isTwoLevel());

/// To only emit changelog:
/// 1) Merge retracted groups data into first one
/// 2) Merge changed groups data into first one (based on retracted groups)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,11 +125,13 @@ void GlobalAggregatingTransform::finalize(const ChunkContextPtr & chunk_ctx)
many_data->finalized_watermark.store(chunk_ctx->getWatermark(), std::memory_order_relaxed);
});

auto first = many_data->variants.at(0);
if (unlikely(first->isTwoLevel()))
auto first_data_iter = std::ranges::find_if(many_data->variants, [](const auto & data) { return !data->empty(); });
assert(first_data_iter != many_data->variants.end());

if (unlikely((*first_data_iter)->isTwoLevel()))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Two level merge is not implemented in global aggregation");

if (first->type == AggregatedDataVariants::Type::without_key)
if ((*first_data_iter)->type == AggregatedDataVariants::Type::without_key)
{
/// Without key
if (params->emit_changelog)
Expand Down
14 changes: 12 additions & 2 deletions src/Processors/Transforms/Streaming/WatermarkTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ WatermarkStamperPtr initWatermark(const WatermarkStamperParams & params, Poco::L
}
}

WatermarkTransform::WatermarkTransform(const Block & header, WatermarkStamperParamsPtr params_, Poco::Logger * log)
WatermarkTransform::WatermarkTransform(
const Block & header, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log)
: ISimpleTransform(header, header, false, ProcessorID::WatermarkTransformID)
, params(std::move(params_))
, skip_stamping_for_backfill_data(skip_stamping_for_backfill_data_)
{
watermark = initWatermark(*params, log);
assert(watermark);
Expand All @@ -51,7 +53,15 @@ WatermarkTransform::WatermarkTransform(const Block & header, WatermarkStamperPar
void WatermarkTransform::transform(Chunk & chunk)
{
chunk.clearWatermark();
if (!chunk.avoidWatermark())

if (chunk.isHistoricalDataStart())
is_backfilling_data = true;
else if (chunk.isHistoricalDataEnd())
is_backfilling_data = false;

bool avoid_watermark = chunk.avoidWatermark();
avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data;
if (!avoid_watermark)
watermark->process(chunk);
}

Expand Down
5 changes: 4 additions & 1 deletion src/Processors/Transforms/Streaming/WatermarkTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Streaming
class WatermarkTransform final : public ISimpleTransform
{
public:
WatermarkTransform(const Block & header, WatermarkStamperParamsPtr params_, Poco::Logger * log);
WatermarkTransform(const Block & header, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log);

~WatermarkTransform() override = default;

Expand All @@ -31,6 +31,9 @@ class WatermarkTransform final : public ISimpleTransform
private:
WatermarkStamperParamsPtr params;
SERDE WatermarkStamperPtr watermark;

bool skip_stamping_for_backfill_data;
bool is_backfilling_data = false;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ WatermarkStamperPtr initWatermark(const WatermarkStamperParams & params, Poco::L
}
}

WatermarkTransformWithSubstream::WatermarkTransformWithSubstream(const Block & header, WatermarkStamperParamsPtr params_, Poco::Logger * log_)
: IProcessor({header}, {header}, ProcessorID::WatermarkTransformWithSubstreamID), params(std::move(params_)), log(log_)
WatermarkTransformWithSubstream::WatermarkTransformWithSubstream(
const Block & header, WatermarkStamperParamsPtr params_, bool skip_stamping_for_backfill_data_, Poco::Logger * log_)
: IProcessor({header}, {header}, ProcessorID::WatermarkTransformWithSubstreamID)
, params(std::move(params_))
, skip_stamping_for_backfill_data(skip_stamping_for_backfill_data_)
, log(log_)
{
watermark_template = initWatermark(*params, log);
assert(watermark_template);
Expand Down Expand Up @@ -109,12 +113,21 @@ void WatermarkTransformWithSubstream::work()
process_chunk.swap(input_chunk);

process_chunk.clearWatermark();

if (process_chunk.isHistoricalDataStart())
is_backfilling_data = true;
else if (process_chunk.isHistoricalDataEnd())
is_backfilling_data = false;

bool avoid_watermark = process_chunk.avoidWatermark();
avoid_watermark |= is_backfilling_data && skip_stamping_for_backfill_data;

if (unlikely(process_chunk.requestCheckpoint()))
{
checkpoint(process_chunk.getCheckpointContext());
output_chunks.emplace_back(std::move(process_chunk));
}
else if (process_chunk.avoidWatermark())
else if (avoid_watermark)
{
output_chunks.emplace_back(std::move(process_chunk));
}
Expand Down
Loading

0 comments on commit 9c63a6b

Please sign in to comment.