Skip to content

Commit

Permalink
chore: recluster avoid segments full scan
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 23, 2024
1 parent 163a04a commit 7a8c7c9
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
18 changes: 8 additions & 10 deletions src/query/service/src/interpreters/hook/compact_hook.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,13 @@ async fn do_hook_compact(
block_limit: Some(compaction_num_block_hint as usize),
}
}
_ =>
// for mutations other than Insertions, we use an empirical value of 3 segments as the
// limit for compaction. to be refined later.
{
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
_ => {
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
CompactionLimits {
segment_limit: Some(auto_compaction_segments_limit as usize),
block_limit: None,
}
}
};

let op_name = &trace_ctx.operation_name;
Expand Down Expand Up @@ -151,12 +148,13 @@ async fn compact_table(
)?;

let mut build_res = if do_recluster {
let limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
let recluster = RelOperator::Recluster(Recluster {
catalog: compact_target.catalog,
database: compact_target.database,
table: compact_target.table,
filters: None,
limit: compaction_limits.segment_limit,
limit: Some(limit as usize),
});
let s_expr = SExpr::create_leaf(Arc::new(recluster));
let recluster_interpreter =
Expand Down
8 changes: 4 additions & 4 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,12 @@ impl FuseTable {
let segment_locations = create_segment_location_vector(segment_locations, None);

let max_threads = ctx.get_settings().get_max_threads()? as usize;
let limit = limit.unwrap_or(1000);
let segment_limit = limit.unwrap_or(1000);
// The default limit might be too small, which makes
// the scanning of recluster candidates slow.
let chunk_size = limit.max(max_threads * 4);
let chunk_size = segment_limit.max(max_threads * 4);
// The max number of segments to be reclustered.
let max_seg_num = limit.min(max_threads * 2);
let max_seg_num = segment_limit.min(max_threads * 2);

let mut recluster_seg_num = 0;
let mut recluster_blocks_count = 0;
Expand Down Expand Up @@ -146,7 +146,7 @@ impl FuseTable {
.await?;
}

if !parts.is_empty() {
if !parts.is_empty() || limit.is_some() {
recluster_seg_num = selected_seg_num;
break;
}
Expand Down
17 changes: 14 additions & 3 deletions tests/suites/0_stateless/02_ddl/02_0000_create_drop_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed


def recreate_view(con):
with con.begin() as c:
c.execute(sqlalchemy.text("DROP VIEW IF EXISTS v_issue_16188"))
with con.begin() as c:
c.execute(sqlalchemy.text("CREATE OR REPLACE VIEW v_issue_16188 as select a,b from t_issue_16188"))
c.execute(
sqlalchemy.text(
"CREATE OR REPLACE VIEW v_issue_16188 as select a,b from t_issue_16188"
)
)


def main():
tcp_port = os.getenv("QUERY_MYSQL_HANDLER_PORT")
Expand All @@ -21,7 +27,11 @@ def main():
con = sqlalchemy.create_engine(uri, future=True)
with con.begin() as c:
c.execute(sqlalchemy.text("DROP TABLE IF EXISTS t_issue_16188"))
c.execute(sqlalchemy.text("CREATE TABLE t_issue_16188 (a int not null, b int not null)"))
c.execute(
sqlalchemy.text(
"CREATE TABLE t_issue_16188 (a int not null, b int not null)"
)
)

with ThreadPoolExecutor(max_workers=64) as executor:
futures = []
Expand All @@ -31,5 +41,6 @@ def main():
for future in as_completed(futures):
future.result()

if __name__ == '__main__':

if __name__ == "__main__":
main()

0 comments on commit 7a8c7c9

Please sign in to comment.