Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat](spill) spill and reserve #47462

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
18 changes: 11 additions & 7 deletions be/src/agent/workload_group_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "agent/workload_group_listener.h"

#include <thrift/protocol/TDebugProtocol.h>

#include "runtime/exec_env.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
Expand All @@ -33,6 +35,8 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
if (!topic_info.__isset.workload_group_info) {
continue;
}
VLOG_DEBUG << "Received publish workload group info request: "
<< apache::thrift::ThriftDebugString(topic_info).c_str();
is_set_workload_group_info = true;

// 1 parse topic info to group info
Expand Down Expand Up @@ -65,13 +69,13 @@ void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>& topi
// 5 upsert io throttle
wg->upsert_scan_io_throttle(&workload_group_info);

LOG(INFO) << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
VLOG_DEBUG << "[topic_publish_wg]update workload group finish, wg info="
<< wg->debug_string() << ", enable_cpu_hard_limit="
<< (_exec_env->workload_group_mgr()->enable_cpu_hard_limit() ? "true" : "false")
<< ", cgroup cpu_shares=" << workload_group_info.cgroup_cpu_shares
<< ", cgroup cpu_hard_limit=" << workload_group_info.cgroup_cpu_hard_limit
<< ", cgroup home path=" << config::doris_cgroup_cpu_path
<< ", list size=" << list_size << ", thread info=" << wg->thread_debug_info();
}

// NOTE(wb) when is_set_workload_group_info=false, it means FE send a empty workload group list
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Status CloudDeltaWriter::batch_init(std::vector<CloudDeltaWriter*> writers) {
}

Status CloudDeltaWriter::write(const vectorized::Block* block,
const std::vector<uint32_t>& row_idxs) {
const DorisVector<uint32_t>& row_idxs) {
if (row_idxs.empty()) [[unlikely]] {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class CloudDeltaWriter final : public BaseDeltaWriter {
const UniqueId& load_id);
~CloudDeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_tablets_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& reques
return Status::OK();
}

std::unordered_map<int64_t, std::vector<uint32_t>> tablet_to_rowidxs;
std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
_build_tablet_to_rowidxs(request, &tablet_to_rowidxs);

std::unordered_set<int64_t> partition_ids;
Expand Down
6 changes: 5 additions & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

#include <fmt/core.h>
#include <gflags/gflags.h>
#include <stdint.h>

#include <algorithm>
Expand Down Expand Up @@ -118,7 +119,7 @@ DEFINE_String(mem_limit, "90%");
DEFINE_Double(soft_mem_limit_frac, "0.9");

// Cache capacity reduce mem limit as a fraction of soft mem limit.
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.6");
DEFINE_mDouble(cache_capacity_reduce_mem_limit_frac, "0.7");

// Schema change memory limit as a fraction of soft memory limit.
DEFINE_Double(schema_change_mem_limit_frac, "0.6");
Expand Down Expand Up @@ -1274,6 +1275,9 @@ DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool {
});
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");

// paused query in queue timeout(ms) will be resumed or canceled
DEFINE_Int64(spill_in_paused_queue_timeout_ms, "60000");

DEFINE_mBool(check_segment_when_build_rowset_meta, "false");

DEFINE_mInt32(max_s3_client_retry, "10");
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1352,6 +1352,7 @@ DECLARE_mInt32(spill_gc_interval_ms);
DECLARE_mInt32(spill_gc_work_time_ms);
DECLARE_Int32(spill_io_thread_pool_thread_num);
DECLARE_Int32(spill_io_thread_pool_queue_size);
DECLARE_Int64(spill_in_paused_queue_timeout_ms);

DECLARE_mBool(check_segment_when_build_rowset_meta);

Expand Down
14 changes: 10 additions & 4 deletions be/src/common/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,18 @@ void Daemon::memory_maintenance_thread() {
doris::ExecEnv::GetInstance()->workload_group_mgr()->do_sweep();
doris::ExecEnv::GetInstance()->workload_group_mgr()->refresh_wg_weighted_memory_limit();

// step 7. Analyze blocking queries.
// step 7: handle paused queries(caused by memory insufficient)
doris::ExecEnv::GetInstance()->workload_group_mgr()->handle_paused_queries();

// step 8. Analyze blocking queries.
// TODO sort the operators that can spill, wake up the pipeline task spill
// or continue execution according to certain rules or cancel query.

// step 8. Flush memtable
// step 9. Flush memtable
doris::GlobalMemoryArbitrator::notify_memtable_memory_refresh();
// TODO notify flush memtable

// step 9. Reset Jemalloc dirty page decay.
// step 10. Reset Jemalloc dirty page decay.
je_reset_dirty_decay();
}
}
Expand Down Expand Up @@ -542,7 +545,9 @@ void Daemon::cache_adjust_capacity_thread() {
doris::GlobalMemoryArbitrator::cache_adjust_capacity_cv.wait_for(
l, std::chrono::milliseconds(100));
}
double adjust_weighted = GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted;
double adjust_weighted = std::min<double>(
GlobalMemoryArbitrator::last_cache_capacity_adjust_weighted,
GlobalMemoryArbitrator::last_wg_trigger_cache_capacity_adjust_weighted);
if (_stop_background_threads_latch.count() == 0) {
break;
}
Expand All @@ -562,6 +567,7 @@ void Daemon::cache_adjust_capacity_thread() {
LOG(INFO) << fmt::format(
"[MemoryGC] refresh cache capacity end, free memory {}, details: {}",
PrettyPrinter::print(freed_mem, TUnit::BYTES), ss.str());
GlobalMemoryArbitrator::last_affected_cache_capacity_adjust_weighted = adjust_weighted;
} while (true);
}

Expand Down
10 changes: 10 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace ErrorCode {
E(BAD_CAST, -254, true); \
E(ARITHMETIC_OVERFLOW_ERRROR, -255, false); \
E(PERMISSION_DENIED, -256, false); \
E(QUERY_MEMORY_EXCEEDED, -257, false); \
E(WORKLOAD_GROUP_MEMORY_EXCEEDED, -258, false); \
E(PROCESS_MEMORY_EXCEEDED, -259, false); \
E(CE_CMD_PARAMS_ERROR, -300, true); \
E(CE_BUFFER_TOO_SMALL, -301, true); \
E(CE_CMD_NOT_VALID, -302, true); \
Expand Down Expand Up @@ -381,6 +384,11 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::make_unique<ErrMsg>(*rhs._err_msg);
} else {
// If rhs error msg is empty, then should also clear current error msg
// For example, if rhs is OK and current status is error, then copy to current
// status, should clear current error message.
_err_msg.reset();
}
return *this;
}
Expand All @@ -390,6 +398,8 @@ class [[nodiscard]] Status {
_code = rhs._code;
if (rhs._err_msg) {
_err_msg = std::move(rhs._err_msg);
} else {
_err_msg.reset();
}
return *this;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
// name, type, size
{"BE_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"FE_HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"WORKLOAD_GROUP_ID", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"TASK_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
{"TASK_CPU_TIME_MS", TYPE_BIGINT, sizeof(int64_t), false},
Expand All @@ -41,6 +42,8 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendActiveTasksScanner::_s_tbls_
{"SHUFFLE_SEND_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
{"SHUFFLE_SEND_ROWS", TYPE_BIGINT, sizeof(int64_t), false},
{"QUERY_TYPE", TYPE_VARCHAR, sizeof(StringRef), false},
{"SPILL_WRITE_BYTES_TO_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
{"SPILL_READ_BYTES_FROM_LOCAL_STORAGE", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendActiveTasksScanner::SchemaBackendActiveTasksScanner()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ std::vector<SchemaScanner::ColumnDesc> SchemaBackendWorkloadGroupResourceUsage::
{"CPU_USAGE_PERCENT", TYPE_DOUBLE, sizeof(double), false},
{"LOCAL_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"REMOTE_SCAN_BYTES_PER_SECOND", TYPE_BIGINT, sizeof(int64_t), false},
{"WRITE_BUFFER_USAGE_BYTES", TYPE_BIGINT, sizeof(int64_t), false},
};

SchemaBackendWorkloadGroupResourceUsage::SchemaBackendWorkloadGroupResourceUsage()
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ Status BaseDeltaWriter::init() {
return Status::OK();
}

Status DeltaWriter::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriter::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/delta_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BaseDeltaWriter {

virtual ~BaseDeltaWriter();

virtual Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) = 0;
virtual Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) = 0;

// flush the last memtable to flush queue, must call it before build_rowset()
virtual Status close() = 0;
Expand Down Expand Up @@ -123,7 +123,7 @@ class DeltaWriter final : public BaseDeltaWriter {

~DeltaWriter() override;

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) override;
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) override;

Status close() override;

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ Status DeltaWriterV2::init() {
return Status::OK();
}

Status DeltaWriterV2::write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs) {
Status DeltaWriterV2::write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs) {
if (UNLIKELY(row_idxs.empty())) {
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delta_writer_v2.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class DeltaWriterV2 {

Status init();

Status write(const vectorized::Block* block, const std::vector<uint32_t>& row_idxs);
Status write(const vectorized::Block* block, const DorisVector<uint32_t>& row_idxs);

// flush the last memtable to flush queue, must call it before close_wait()
Status close();
Expand Down
Loading
Loading