From 534a4e4e69eece16c5f6dce076077e72435aeef1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 8 Nov 2024 14:42:31 -0800 Subject: [PATCH] Move log segment into separate module (#438) ## What changes are proposed in this pull request? This PR is a prefactor that moves `LogSegment` into its own module. This will be useful to implement `TableChanges` for [change data feed](https://github.com/delta-incubator/delta-kernel-rs/issues/440). `TableChanges` will represent CDF scans and will depend on `LogSegment`. Since `TableChanges` and `Snapshot` both depend on `LogSegment`, it makes sense to keep it separated. I also use this opportunity to address some nits in the codebase. ## How was this change tested? The changes compile. --------- Co-authored-by: Ryan Johnson --- kernel/src/lib.rs | 5 ++ kernel/src/log_segment.rs | 157 +++++++++++++++++++++++++++++++++++++ kernel/src/scan/mod.rs | 66 ++++++++-------- kernel/src/snapshot.rs | 159 ++------------------------------------ 4 files changed, 199 insertions(+), 188 deletions(-) create mode 100644 kernel/src/log_segment.rs diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 40fa360f5..43263eb0d 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -69,6 +69,11 @@ pub mod path; #[cfg(not(feature = "developer-visibility"))] pub(crate) mod path; +#[cfg(feature = "developer-visibility")] +pub mod log_segment; +#[cfg(not(feature = "developer-visibility"))] +pub(crate) mod log_segment; + pub mod scan; pub mod schema; pub mod snapshot; diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs new file mode 100644 index 000000000..791a77cbd --- /dev/null +++ b/kernel/src/log_segment.rs @@ -0,0 +1,157 @@ +//! Represents a segment of a delta log. [`LogSegment`] wraps a set of checkpoint and commit +//! files. + +use crate::{ + actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}, + schema::SchemaRef, + DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, +}; +use itertools::Itertools; +use std::sync::{Arc, LazyLock}; +use url::Url; + +#[derive(Debug)] +#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] +pub(crate) struct LogSegment { + pub log_root: Url, + /// Reverse order sorted commit files in the log segment + pub commit_files: Vec, + /// checkpoint files in the log segment. + pub checkpoint_files: Vec, +} + +impl LogSegment { + /// Read a stream of log data from this log segment. + /// + /// The log files will be read from most recent to oldest. + /// The boolean flags indicates whether the data was read from + /// a commit file (true) or a checkpoint file (false). + /// + /// `read_schema` is the schema to read the log files with. This can be used + /// to project the log files to a subset of the columns. + /// + /// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the + /// query's predicate, but rather a predicate for filtering log files themselves. + #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] + pub(crate) fn replay( + &self, + engine: &dyn Engine, + commit_read_schema: SchemaRef, + checkpoint_read_schema: SchemaRef, + meta_predicate: Option, + ) -> DeltaResult, bool)>> + Send> { + let commit_stream = engine + .get_json_handler() + .read_json_files( + &self.commit_files, + commit_read_schema, + meta_predicate.clone(), + )? + .map_ok(|batch| (batch, true)); + + let checkpoint_stream = engine + .get_parquet_handler() + .read_parquet_files( + &self.checkpoint_files, + checkpoint_read_schema, + meta_predicate, + )? + .map_ok(|batch| (batch, false)); + + Ok(commit_stream.chain(checkpoint_stream)) + } + + // Get the most up-to-date Protocol and Metadata actions + pub(crate) fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult<(Metadata, Protocol)> { + let data_batches = self.replay_for_metadata(engine)?; + let (mut metadata_opt, mut protocol_opt) = (None, None); + for batch in data_batches { + let (batch, _) = batch?; + if metadata_opt.is_none() { + metadata_opt = Metadata::try_new_from_data(batch.as_ref())?; + } + if protocol_opt.is_none() { + protocol_opt = Protocol::try_new_from_data(batch.as_ref())?; + } + if metadata_opt.is_some() && protocol_opt.is_some() { + // we've found both, we can stop + break; + } + } + match (metadata_opt, protocol_opt) { + (Some(m), Some(p)) => Ok((m, p)), + (None, Some(_)) => Err(Error::MissingMetadata), + (Some(_), None) => Err(Error::MissingProtocol), + (None, None) => Err(Error::MissingMetadataAndProtocol), + } + } + + // Replay the commit log, projecting rows to only contain Protocol and Metadata action columns. + fn replay_for_metadata( + &self, + engine: &dyn Engine, + ) -> DeltaResult, bool)>> + Send> { + let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?; + // filter out log files that do not contain metadata or protocol information + static META_PREDICATE: LazyLock> = LazyLock::new(|| { + Some(Arc::new(Expression::or( + Expression::column([METADATA_NAME, "id"]).is_not_null(), + Expression::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(), + ))) + }); + // read the same protocol and metadata schema for both commits and checkpoints + self.replay(engine, schema.clone(), schema, META_PREDICATE.clone()) + } +} + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use itertools::Itertools; + + use crate::{engine::sync::SyncEngine, Table}; + + // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies + // that the parquet reader properly infers nullcount = rowcount for missing columns. The two + // checkpoint part files that contain transaction app ids have truncated schemas that would + // otherwise fail skipping due to their missing nullcount stat: + // + // Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B + // -------------------------------------------------------------------------------- + // type nulls min / max + // txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..." + // txn.version INT64 0 "4390" / "4390" + #[test] + fn test_replay_for_metadata() { + let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); + let url = url::Url::from_directory_path(path.unwrap()).unwrap(); + let engine = SyncEngine::new(); + + let table = Table::new(url); + let snapshot = table.snapshot(&engine, None).unwrap(); + let data: Vec<_> = snapshot + .log_segment + .replay_for_metadata(&engine) + .unwrap() + .try_collect() + .unwrap(); + + // The checkpoint has five parts, each containing one action: + // 1. txn (physically missing P&M columns) + // 2. metaData + // 3. protocol + // 4. add + // 5. txn (physically missing P&M columns) + // + // The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata` + // always skips parts 4 and 5 because it terminates the iteration after finding both P&M. + // + // NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group. + // + // WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently + // read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for + // missing columns, but can still skip part 3 because has valid nullcount stats for P&M. + assert_eq!(data.len(), 4); + } +} diff --git a/kernel/src/scan/mod.rs b/kernel/src/scan/mod.rs index 02dea456f..bcbef1f53 100644 --- a/kernel/src/scan/mod.rs +++ b/kernel/src/scan/mod.rs @@ -472,42 +472,38 @@ fn transform_to_logical_internal( have_partition_cols: bool, ) -> DeltaResult> { let read_schema = global_state.read_schema.clone(); - if have_partition_cols || global_state.column_mapping_mode != ColumnMappingMode::None { - // need to add back partition cols and/or fix-up mapped columns - let all_fields = all_fields - .iter() - .map(|field| match field { - ColumnType::Partition(field_idx) => { - let field = global_state - .logical_schema - .fields - .get_index(*field_idx) - .ok_or_else(|| { - Error::generic("logical schema did not contain expected field, can't transform data") - })?.1; - let name = field.physical_name(global_state.column_mapping_mode)?; - let value_expression = parse_partition_value( - partition_values.get(name), - field.data_type(), - )?; - Ok::(value_expression.into()) - } - ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()), - }) - .try_collect()?; - let read_expression = Expression::Struct(all_fields); - let result = engine - .get_expression_handler() - .get_evaluator( - read_schema, - read_expression, - global_state.logical_schema.clone().into(), - ) - .evaluate(data.as_ref())?; - Ok(result) - } else { - Ok(data) + if !have_partition_cols && global_state.column_mapping_mode == ColumnMappingMode::None { + return Ok(data); } + // need to add back partition cols and/or fix-up mapped columns + let all_fields = all_fields + .iter() + .map(|field| match field { + ColumnType::Partition(field_idx) => { + let field = global_state.logical_schema.fields.get_index(*field_idx); + let Some((_, field)) = field else { + return Err(Error::generic( + "logical schema did not contain expected field, can't transform data", + )); + }; + let name = field.physical_name(global_state.column_mapping_mode)?; + let value_expression = + parse_partition_value(partition_values.get(name), field.data_type())?; + Ok(value_expression.into()) + } + ColumnType::Selected(field_name) => Ok(ColumnName::new([field_name]).into()), + }) + .try_collect()?; + let read_expression = Expression::Struct(all_fields); + let result = engine + .get_expression_handler() + .get_evaluator( + read_schema, + read_expression, + global_state.logical_schema.clone().into(), + ) + .evaluate(data.as_ref())?; + Ok(result) } // some utils that are used in file_stream.rs and state.rs tests diff --git a/kernel/src/snapshot.rs b/kernel/src/snapshot.rs index 875d581fe..6aa8f20b4 100644 --- a/kernel/src/snapshot.rs +++ b/kernel/src/snapshot.rs @@ -3,123 +3,22 @@ //! use std::cmp::Ordering; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; -use itertools::Itertools; use serde::{Deserialize, Serialize}; use tracing::{debug, warn}; use url::Url; -use crate::actions::{get_log_schema, Metadata, Protocol, METADATA_NAME, PROTOCOL_NAME}; +use crate::actions::{Metadata, Protocol}; use crate::features::{ColumnMappingMode, COLUMN_MAPPING_MODE_KEY}; +use crate::log_segment::LogSegment; use crate::path::ParsedLogPath; use crate::scan::ScanBuilder; -use crate::schema::{Schema, SchemaRef}; +use crate::schema::Schema; use crate::utils::require; -use crate::{DeltaResult, Engine, Error, FileMeta, FileSystemClient, Version}; -use crate::{EngineData, Expression, ExpressionRef}; +use crate::{DeltaResult, Engine, Error, FileSystemClient, Version}; const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint"; - -#[derive(Debug)] -#[cfg_attr(feature = "developer-visibility", visibility::make(pub))] -#[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] -struct LogSegment { - log_root: Url, - /// Reverse order sorted commit files in the log segment - pub(crate) commit_files: Vec, - /// checkpoint files in the log segment. - pub(crate) checkpoint_files: Vec, -} - -impl LogSegment { - /// Read a stream of log data from this log segment. - /// - /// The log files will be read from most recent to oldest. - /// The boolean flags indicates whether the data was read from - /// a commit file (true) or a checkpoint file (false). - /// - /// `read_schema` is the schema to read the log files with. This can be used - /// to project the log files to a subset of the columns. - /// - /// `meta_predicate` is an optional expression to filter the log files with. It is _NOT_ the - /// query's predicate, but rather a predicate for filtering log files themselves. - #[cfg_attr(feature = "developer-visibility", visibility::make(pub))] - #[cfg_attr(not(feature = "developer-visibility"), visibility::make(pub(crate)))] - fn replay( - &self, - engine: &dyn Engine, - commit_read_schema: SchemaRef, - checkpoint_read_schema: SchemaRef, - meta_predicate: Option, - ) -> DeltaResult, bool)>> + Send> { - let json_client = engine.get_json_handler(); - let commit_stream = json_client - .read_json_files( - &self.commit_files, - commit_read_schema, - meta_predicate.clone(), - )? - .map_ok(|batch| (batch, true)); - - let parquet_client = engine.get_parquet_handler(); - let checkpoint_stream = parquet_client - .read_parquet_files( - &self.checkpoint_files, - checkpoint_read_schema, - meta_predicate, - )? - .map_ok(|batch| (batch, false)); - - let batches = commit_stream.chain(checkpoint_stream); - - Ok(batches) - } - - fn read_metadata(&self, engine: &dyn Engine) -> DeltaResult> { - let data_batches = self.replay_for_metadata(engine)?; - let mut metadata_opt: Option = None; - let mut protocol_opt: Option = None; - for batch in data_batches { - let (batch, _) = batch?; - if metadata_opt.is_none() { - metadata_opt = crate::actions::Metadata::try_new_from_data(batch.as_ref())?; - } - if protocol_opt.is_none() { - protocol_opt = crate::actions::Protocol::try_new_from_data(batch.as_ref())?; - } - if metadata_opt.is_some() && protocol_opt.is_some() { - // we've found both, we can stop - break; - } - } - match (metadata_opt, protocol_opt) { - (Some(m), Some(p)) => Ok(Some((m, p))), - (None, Some(_)) => Err(Error::MissingMetadata), - (Some(_), None) => Err(Error::MissingProtocol), - _ => Err(Error::MissingMetadataAndProtocol), - } - } - - // Factored out to facilitate testing - fn replay_for_metadata( - &self, - engine: &dyn Engine, - ) -> DeltaResult, bool)>> + Send> { - let schema = get_log_schema().project(&[PROTOCOL_NAME, METADATA_NAME])?; - // filter out log files that do not contain metadata or protocol information - use Expression as Expr; - static META_PREDICATE: LazyLock> = LazyLock::new(|| { - Some(Arc::new(Expr::or( - Expr::column([METADATA_NAME, "id"]).is_not_null(), - Expr::column([PROTOCOL_NAME, "minReaderVersion"]).is_not_null(), - ))) - }); - // read the same protocol and metadata schema for both commits and checkpoints - self.replay(engine, schema.clone(), schema, META_PREDICATE.clone()) - } -} - // TODO expose methods for accessing the files of a table (with file pruning). /// In-memory representation of a specific snapshot of a Delta table. While a `DeltaTable` exists /// throughout time, `Snapshot`s represent a view of a table at a specific point in time; they @@ -225,9 +124,7 @@ impl Snapshot { version: Version, engine: &dyn Engine, ) -> DeltaResult { - let (metadata, protocol) = log_segment - .read_metadata(engine)? - .ok_or(Error::MissingMetadata)?; + let (metadata, protocol) = log_segment.read_metadata(engine)?; let schema = metadata.schema()?; let column_mapping_mode = match metadata.configuration.get(COLUMN_MAPPING_MODE_KEY) { Some(mode) if protocol.min_reader_version >= 2 => mode.as_str().try_into(), @@ -480,7 +377,6 @@ mod tests { use crate::engine::default::filesystem::ObjectStoreFileSystemClient; use crate::engine::sync::SyncEngine; use crate::schema::StructType; - use crate::Table; #[test] fn test_snapshot_read_metadata() { @@ -655,49 +551,6 @@ mod tests { assert!(invalid.is_none()) } - // NOTE: In addition to testing the meta-predicate for metadata replay, this test also verifies - // that the parquet reader properly infers nullcount = rowcount for missing columns. The two - // checkpoint part files that contain transaction app ids have truncated schemas that would - // otherwise fail skipping due to their missing nullcount stat: - // - // Row group 0: count: 1 total(compressed): 111 B total(uncompressed):107 B - // -------------------------------------------------------------------------------- - // type nulls min / max - // txn.appId BINARY 0 "3ae45b72-24e1-865a-a211-3..." / "3ae45b72-24e1-865a-a211-3..." - // txn.version INT64 0 "4390" / "4390" - #[test] - fn test_replay_for_metadata() { - let path = std::fs::canonicalize(PathBuf::from("./tests/data/parquet_row_group_skipping/")); - let url = url::Url::from_directory_path(path.unwrap()).unwrap(); - let engine = SyncEngine::new(); - - let table = Table::new(url); - let snapshot = table.snapshot(&engine, None).unwrap(); - let data: Vec<_> = snapshot - .log_segment - .replay_for_metadata(&engine) - .unwrap() - .try_collect() - .unwrap(); - - // The checkpoint has five parts, each containing one action: - // 1. txn (physically missing P&M columns) - // 2. metaData - // 3. protocol - // 4. add - // 5. txn (physically missing P&M columns) - // - // The parquet reader should skip parts 1, 3, and 5. Note that the actual `read_metadata` - // always skips parts 4 and 5 because it terminates the iteration after finding both P&M. - // - // NOTE: Each checkpoint part is a single-row file -- guaranteed to produce one row group. - // - // WARNING: https://github.com/delta-incubator/delta-kernel-rs/issues/434 -- We currently - // read parts 1 and 5 (4 in all instead of 2) because row group skipping is disabled for - // missing columns, but can still skip part 3 because has valid nullcount stats for P&M. - assert_eq!(data.len(), 4); - } - #[test_log::test] fn test_read_table_with_checkpoint() { let path = std::fs::canonicalize(PathBuf::from(