diff --git a/kernel/src/log_segment.rs b/kernel/src/log_segment.rs index 540f3e4a8..d0409b40f 100644 --- a/kernel/src/log_segment.rs +++ b/kernel/src/log_segment.rs @@ -3,17 +3,18 @@ use crate::actions::visitors::SidecarVisitor; use crate::actions::{ - get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, METADATA_NAME, PROTOCOL_NAME, - SIDECAR_NAME, + get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME, + PROTOCOL_NAME, SIDECAR_NAME, }; use crate::path::{LogPathFileType, ParsedLogPath}; use crate::schema::SchemaRef; use crate::snapshot::CheckpointMetadata; use crate::utils::require; use crate::{ - DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient, - JsonHandler, ParquetHandler, RowVisitor, Version, + DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileDataReadResultIterator, + FileMeta, FileSystemClient, ParquetHandler, RowVisitor, Version, }; +use itertools::Either::{Left, Right}; use itertools::Itertools; use std::collections::HashMap; use std::convert::identity; @@ -21,36 +22,6 @@ use std::sync::{Arc, LazyLock}; use tracing::warn; use url::Url; -/// Enum representing different file handlers for reading files. -enum FileHandler { - /// Handler for reading JSON files. - Json(Arc), - /// Handler for reading Parquet files. - Parquet(Arc), -} - -impl FileHandler { - /// Reads files based on the provided file metadata, schema, and optional predicate. - /// - /// This function delegates the reading of files to the appropriate handler based on the file type - /// (JSON or Parquet). It returns an iterator over the results, which can be used to process the - /// data in a streaming fashion. - /// - /// This function is particularly useful for abstracting the file reading logic for V2 checkpoints, - /// which can be stored in either JSON or Parquet format - fn read_files( - &self, - parts: &[FileMeta], - schema: SchemaRef, - predicate: Option, - ) -> DeltaResult>> + Send>> { - match self { - FileHandler::Json(handler) => handler.read_json_files(parts, schema, predicate), - FileHandler::Parquet(handler) => handler.read_parquet_files(parts, schema, predicate), - } - } -} - #[cfg(test)] mod tests; @@ -234,8 +205,7 @@ impl LogSegment { commit_read_schema: SchemaRef, checkpoint_read_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult, bool)>> + Send>> - { + ) -> DeltaResult, bool)>> + Send> { // `replay` expects commit files to be sorted in descending order, so we reverse the sorted // commit files let commit_files: Vec<_> = self @@ -250,10 +220,24 @@ impl LogSegment { .read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())? .map_ok(|batch| (batch, true)); - let checkpoint_stream = - Self::create_checkpoint_stream(self, engine, checkpoint_read_schema, meta_predicate)?; + let checkpoint_parts: Vec<_> = self + .checkpoint_parts + .iter() + .map(|f| f.location.clone()) + .collect(); - Ok(Box::new(commit_stream.chain(checkpoint_stream))) + let checkpoint_stream = if !checkpoint_parts.is_empty() { + Right(Self::create_checkpoint_stream( + self, + engine, + checkpoint_read_schema, + meta_predicate, + checkpoint_parts, + )?) + } else { + Left(std::iter::empty()) + }; + return Ok(commit_stream.chain(checkpoint_stream)); } fn create_checkpoint_stream( @@ -261,81 +245,79 @@ impl LogSegment { engine: &dyn Engine, checkpoint_read_schema: SchemaRef, meta_predicate: Option, - ) -> DeltaResult, bool)>> + Send>> - { - let checkpoint_parts: Vec<_> = self - .checkpoint_parts - .iter() - .map(|f| f.location.clone()) - .collect(); - - if checkpoint_parts.is_empty() { - return Ok(Box::new(std::iter::empty())); - } - - let file_type = match self.checkpoint_parts.first() { - Some(part) => part.file_type.clone(), - None => LogPathFileType::Unknown, - }; + checkpoint_parts: Vec, + ) -> DeltaResult, bool)>> + Send> { let is_json_checkpoint = self.checkpoint_parts[0].extension == "json"; - let json_handler = engine.get_json_handler(); - let parquet_handler = engine.get_parquet_handler(); - let handler = if is_json_checkpoint { - FileHandler::Json(json_handler) + let actions: FileDataReadResultIterator = if is_json_checkpoint { + engine.get_json_handler().read_json_files( + &checkpoint_parts, + checkpoint_read_schema.clone(), + meta_predicate, + )? } else { - FileHandler::Parquet(parquet_handler) + engine.get_parquet_handler().read_parquet_files( + &checkpoint_parts, + checkpoint_read_schema.clone(), + meta_predicate, + )? }; + let need_file_actions = checkpoint_read_schema.contains(ADD_NAME); + if need_file_actions { + // if adds were requested, we need sidecars as well! + require!( + checkpoint_read_schema.contains(SIDECAR_NAME), + Error::generic("Checkpoint read schema must contain SIDECAR_NAME") + ); + } + + let is_multi_part_checkpoint = self.checkpoint_parts.len() > 1; + + // Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol) + // In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is. let log_root = self.log_root.clone(); let parquet_handler = engine.get_parquet_handler().clone(); + let checkpoint_stream = if need_file_actions { + Left( + actions + // Flatten the new batches returned. The new batches could be: + // - the checkpoint batch itself if no sidecar actions are present + // - 1 or more sidecar batch that are referenced by the checkpoint batch + .flat_map(move |batch_result| match batch_result { + Ok(checkpoint_batch) => Right( + Self::create_stream_for_checkpoint_batch( + checkpoint_batch, + parquet_handler.clone(), + log_root.clone(), + is_multi_part_checkpoint, + ) + .map_or_else(|e| Left(std::iter::once(Err(e))), Right), + ), + Err(e) => Left(std::iter::once(Err(e))), + }), + ) + } else { + Right(actions.map_ok(|batch| (batch, false))) + }; - Ok(Box::new( - handler - .read_files( - &checkpoint_parts, - checkpoint_read_schema.clone(), - meta_predicate, - )? - // Flatten the new batches returned. The new batches could be: - // - the checkpoint batch itself if no sidecar actions are present - // - 1 or more sidecar batch that are referenced by the checkpoint batch - .flat_map(move |batch_result| match batch_result { - Ok(checkpoint_batch) => Self::create_stream_for_checkpoint_batch( - &file_type, - checkpoint_batch, - checkpoint_read_schema.clone(), - parquet_handler.clone(), - log_root.clone(), - ), - Err(e) => Box::new(std::iter::once(Err(e))), - }), - )) + return Ok(checkpoint_stream); } fn create_stream_for_checkpoint_batch( - file_type: &LogPathFileType, checkpoint_batch: Box, - checkpoint_read_schema: SchemaRef, handler: Arc, log_root_ref: Url, - ) -> Box, bool)>> + Send> { - match file_type { - LogPathFileType::UuidCheckpoint(_) if checkpoint_read_schema.contains(SIDECAR_NAME) => { - // If the checkpoint is a V2 checkpoint AND the schema contains the sidecar column, - // we need to read the sidecar files and return the iterator of sidecar actions - - // Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol) - // In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is. - match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) { - Ok(iterator) => Box::new(iterator.map_ok(|batch| (batch, false))), - Err(e) => Box::new(std::iter::once(Err(e))), - } - } - _ => { - // Return the checkpoint batch as is if we do not need to extract sidecar actions from it - Box::new(std::iter::once(Ok((checkpoint_batch, false)))) + is_multi_part_checkpoint: bool, + ) -> DeltaResult, bool)>> + Send> { + // Multi-part checkpoints do not have sidecars, so we can return the batch as is. + if !is_multi_part_checkpoint { + match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) { + Ok(iterator) => Ok(Right(iterator.map_ok(|batch| (batch, false)))), + Err(e) => Ok(Left(std::iter::once(Err(e)))), } + } else { + Ok(Left(std::iter::once(Ok((checkpoint_batch, false))))) } } @@ -343,7 +325,7 @@ impl LogSegment { parquet_handler: Arc, log_root: Url, batch: Box, - ) -> DeltaResult>> + Send>> { + ) -> DeltaResult>> + Send> { let mut visitor = SidecarVisitor::default(); // collect sidecars @@ -351,7 +333,7 @@ impl LogSegment { // if there are no sidecars, return the batch as is if visitor.sidecars.is_empty() { - return Ok(Box::new(std::iter::once(Ok(batch)))); + return Ok(Left(std::iter::once(Ok(batch)))); } //convert sidecar actions to sidecar file paths @@ -364,7 +346,11 @@ impl LogSegment { let sidecar_read_schema = get_log_add_schema().clone(); // if sidecars exist, read the sidecar files and return the iterator of sidecar actions - parquet_handler.read_parquet_files(&sidecar_files?, sidecar_read_schema, None) + Ok(Right(parquet_handler.read_parquet_files( + &sidecar_files?, + sidecar_read_schema, + None, + )?)) } // Helper function to convert a single sidecar action to a FileMeta