Skip to content

Commit

Permalink
chore(query): remove unused parquet2 codes (#16282)
Browse files Browse the repository at this point in the history
* chore(query): remove unused parquet2 codes

* chore(query): remove unused parquet2 codes

* chore(query): remove unused parquet2 codes

* chore(query): update

* chore(query): update
  • Loading branch information
sundy-li authored Aug 20, 2024
1 parent d14f7a5 commit ab8d1a1
Show file tree
Hide file tree
Showing 21 changed files with 172 additions and 280 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions src/common/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ mod column_node;
pub use column_node::ColumnNode;
pub use column_node::ColumnNodes;

mod parquet2;
pub use parquet2::infer_schema_with_extension;
pub use parquet2::read_parquet_metas_in_parallel;

pub mod parquet_rs;
pub use parquet_rs::read_metadata_async;
pub use parquet_rs::read_parquet_schema_async_rs;
Expand Down
132 changes: 0 additions & 132 deletions src/common/storage/src/parquet2.rs

This file was deleted.

53 changes: 53 additions & 0 deletions src/common/storage/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@ pub async fn read_parquet_schema_async_rs(
infer_schema_with_extension(meta.file_metadata())
}

pub fn read_parquet_schema_sync_rs(
operator: &Operator,
path: &str,
file_size: Option<u64>,
) -> Result<ArrowSchema> {
let meta = read_metadata_sync(path, operator, file_size)?;
infer_schema_with_extension(meta.file_metadata())
}

pub fn infer_schema_with_extension(meta: &FileMetaData) -> Result<ArrowSchema> {
let mut arrow_schema = parquet_to_arrow_schema(meta.schema_descr(), meta.key_value_metadata())?;
// Convert data types to extension types using meta information.
Expand Down Expand Up @@ -131,6 +140,50 @@ pub async fn read_metadata_async(
}
}

pub fn read_metadata_sync(
path: &str,
operator: &Operator,
file_size: Option<u64>,
) -> Result<ParquetMetaData> {
let blocking = operator.blocking();
let file_size = match file_size {
None => blocking.stat(path)?.content_length(),
Some(n) => n,
};

check_footer_size(file_size)?;

// read and cache up to DEFAULT_FOOTER_READ_SIZE bytes from the end and process the footer
let default_end_len = DEFAULT_FOOTER_READ_SIZE.min(file_size);
let buffer = blocking
.read_with(path)
.range((file_size - default_end_len)..file_size)
.call()?
.to_vec();
let buffer_len = buffer.len();
let metadata_len = decode_footer(
&buffer[(buffer_len - FOOTER_SIZE as usize)..]
.try_into()
.unwrap(),
)? as u64;
check_meta_size(file_size, metadata_len)?;

let footer_len = FOOTER_SIZE + metadata_len;
if (footer_len as usize) <= buffer_len {
// The whole metadata is in the bytes we already read
let offset = buffer_len - footer_len as usize;
Ok(decode_metadata(&buffer[offset..])?)
} else {
let mut metadata = blocking
.read_with(path)
.range((file_size - footer_len)..(file_size - buffer_len as u64))
.call()?
.to_vec();
metadata.extend(buffer);
Ok(decode_metadata(&metadata)?)
}
}

/// check file is large enough to hold footer
fn check_footer_size(file_size: u64) -> Result<()> {
if file_size < FOOTER_SIZE {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@

use std::sync::Arc;

use arrow_schema::Schema;
use databend_common_expression::TableSchema;
use databend_common_meta_app::schema::TableInfo;

#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
pub struct ResultScanTableInfo {
pub table_info: TableInfo,
pub query_id: String,
pub block_raw_data: Vec<u8>,
pub location: String,
pub schema: Schema,
pub file_size: u64,
}

impl ResultScanTableInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub async fn do_refresh_virtual_column(
let all_generated = if let Some(schema) = schema {
virtual_exprs
.iter()
.all(|virtual_name| schema.fields.iter().any(|f| &f.name == virtual_name))
.all(|virtual_name| schema.fields.iter().any(|f| f.name() == virtual_name))
} else {
false
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ async fn test_fuse_do_refresh_virtual_column() -> Result<()> {
assert!(schema.is_some());
let schema = schema.unwrap();
assert_eq!(schema.fields.len(), 2);
assert_eq!(schema.fields[0].name, "v['a']");
assert_eq!(schema.fields[1].name, "v[0]");
assert_eq!(schema.fields[0].name(), "v['a']");
assert_eq!(schema.fields[1].name(), "v[0]");
}
}

Expand Down
3 changes: 3 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ pub struct QueryContext {
}

impl QueryContext {
// Each table will create a new QueryContext
// So partition_queue could be independent in each table context
// see `builder_join.rs` for more details
pub fn create_from(other: Arc<QueryContext>) -> Arc<QueryContext> {
QueryContext::create_from_shared(other.shared.clone())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ use databend_common_expression::types::NumberScalar;
use databend_common_expression::FunctionKind;
use databend_common_expression::Scalar;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_storage::DataOperator;
use databend_common_storages_result_cache::ResultCacheMetaManager;
use databend_common_storages_result_cache::ResultCacheReader;
use databend_common_storages_result_cache::ResultScan;
use databend_common_users::UserApiProvider;

Expand Down Expand Up @@ -187,22 +185,16 @@ impl Binder {
databend_common_base::runtime::block_on(async move {
let result_cache_mgr = ResultCacheMetaManager::create(kv_store, 0);
let meta_key = meta_key.unwrap();
let (table_schema, block_raw_data) = match result_cache_mgr
.get(meta_key.clone())
.await?
{
Some(value) => {
let op = DataOperator::instance().operator();
ResultCacheReader::read_table_schema_and_data(op, &value.location).await?
}
let location = match result_cache_mgr.get(meta_key.clone()).await? {
Some(value) => value.location,
None => {
return Err(ErrorCode::EmptyData(format!(
"`RESULT_SCAN` failed: Unable to fetch cached data for query ID '{}'. The data may have exceeded its TTL or been cleaned up. Cache key: '{}'",
query_id, meta_key
)).set_span(*span));
}
};
let table = ResultScan::try_create(table_schema, query_id, block_raw_data)?;
let table = ResultScan::try_create(query_id, location).await?;

let table_alias_name = if let Some(table_alias) = alias {
Some(normalize_identifier(&table_alias.name, &self.name_resolution_ctx).name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ impl AggIndexReader {
.iter()
.all(|c| c.pages.iter().map(|p| p.num_values).sum::<u64>() == num_rows)
);

let columns_meta = metadata
.into_iter()
.enumerate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_arrow::arrow::io::parquet::read as pread;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_storage::parquet_rs::read_metadata_sync;
use databend_common_storage::read_metadata_async;
use log::debug;

use super::AggIndexReader;
Expand All @@ -32,20 +33,12 @@ impl AggIndexReader {
) -> Option<(PartInfoPtr, MergeIOReadResult)> {
let op = self.reader.operator.blocking();
match op.stat(loc) {
Ok(meta) => {
let mut reader = op
.reader(loc)
.ok()?
.into_std_read(0..meta.content_length())
.ok()?;
let metadata = pread::read_metadata(&mut reader)
.inspect_err(|e| {
debug!("Read aggregating index `{loc}`'s metadata failed: {e}")
})
.ok()?;
debug_assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
Ok(_meta) => {
let metadata = read_metadata_sync(loc, &self.reader.operator, None).ok()?;
debug_assert_eq!(metadata.num_row_groups(), 1);
let row_group = &metadata.row_groups()[0];
let columns_meta = build_columns_meta(row_group);

let part = FuseBlockPartInfo::create(
loc.to_string(),
row_group.num_rows() as u64,
Expand Down Expand Up @@ -80,16 +73,12 @@ impl AggIndexReader {
loc: &str,
) -> Option<(PartInfoPtr, MergeIOReadResult)> {
match self.reader.operator.stat(loc).await {
Ok(meta) => {
let reader = self.reader.operator.reader(loc).await.ok()?;
let metadata = pread::read_metadata_async(reader, meta.content_length())
Ok(_meta) => {
let metadata = read_metadata_async(loc, &self.reader.operator, None)
.await
.inspect_err(|e| {
debug!("Read aggregating index `{loc}`'s metadata failed: {e}")
})
.ok()?;
debug_assert_eq!(metadata.row_groups.len(), 1);
let row_group = &metadata.row_groups[0];
debug_assert_eq!(metadata.num_row_groups(), 1);
let row_group = &metadata.row_groups()[0];
let columns_meta = build_columns_meta(row_group);
let res = self
.reader
Expand Down
Loading

0 comments on commit ab8d1a1

Please sign in to comment.