Skip to content

Commit

Permalink
fix: incorrect table data disk cache key (#16837)
Browse files Browse the repository at this point in the history
* fix: incorrect table data disck cache key

should use `offset` and `len` of column meta as corresponding parts of
table data cache key

* refactor: enforce using same column data key
  • Loading branch information
dantengsky authored Nov 14, 2024
1 parent 6c05298 commit fb5dbcb
Showing 1 changed file with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ impl BlockReader {
let column_array_cache = CacheManager::instance().get_table_data_array_cache();
let mut cached_column_data = vec![];
let mut cached_column_array = vec![];

let column_cache_key_builder = ColumnCacheKeyBuilder::new(location);

for (_index, (column_id, ..)) in self.project_indices.iter() {
if let Some(ignore_column_ids) = ignore_column_ids {
if ignore_column_ids.contains(column_id) {
Expand All @@ -58,7 +61,7 @@ impl BlockReader {
if let Some(column_meta) = columns_meta.get(column_id) {
let (offset, len) = column_meta.offset_length();

let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len);
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

// first, check in memory table data cache
// column_array_cache
Expand Down Expand Up @@ -91,20 +94,25 @@ impl BlockReader {
.await?;

if self.put_cache {
let table_data_cache = CacheManager::instance().get_table_data_cache();
// add raw data (compressed raw bytes) to column cache
for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets {
let cache_key = TableDataCacheKey::new(
&merge_io_result.block_path,
*column_id,
range.start as u64,
(range.end - range.start) as u64,
);
// Should NOT use `range.start` as part of the cache key,
// as they are not stable and can vary for the same column depending on the query's projection.
// For instance:
// - `SELECT col1, col2 FROM t;`
// - `SELECT col2 FROM t;`
// may result in different ranges for `col2`
// This can lead to cache missing or INCONSISTENCIES

// Safe to unwrap here, since this column has been fetched, its meta must be present.
let column_meta = columns_meta.get(column_id).unwrap();
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

let chunk_data = merge_io_result
.owner_memory
.get_chunk(*chunk_idx, &merge_io_result.block_path)?;
let data = chunk_data.slice(range.clone());
table_data_cache.insert(cache_key.as_ref().to_owned(), data);
column_data_cache.insert(column_cache_key.as_ref().to_owned(), data);
}
}

Expand All @@ -116,3 +124,17 @@ impl BlockReader {
Ok(block_read_res)
}
}

struct ColumnCacheKeyBuilder<'a> {
block_path: &'a str,
}

impl<'a> ColumnCacheKeyBuilder<'a> {
fn new(block_path: &'a str) -> Self {
Self { block_path }
}
fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey {
let (offset, len) = column_meta.offset_length();
TableDataCacheKey::new(self.block_path, *column_id, offset, len)
}
}

0 comments on commit fb5dbcb

Please sign in to comment.