Skip to content

Commit

Permalink
visit all single part checkpoints with SidecarVisitor
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Jan 31, 2025
1 parent a570a87 commit ba74c24
Showing 1 changed file with 88 additions and 102 deletions.
190 changes: 88 additions & 102 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,54 +3,25 @@
use crate::actions::visitors::SidecarVisitor;
use crate::actions::{
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, METADATA_NAME, PROTOCOL_NAME,
SIDECAR_NAME,
get_log_add_schema, get_log_schema, Metadata, Protocol, Sidecar, ADD_NAME, METADATA_NAME,
PROTOCOL_NAME, SIDECAR_NAME,
};
use crate::path::{LogPathFileType, ParsedLogPath};
use crate::schema::SchemaRef;
use crate::snapshot::CheckpointMetadata;
use crate::utils::require;
use crate::{
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileMeta, FileSystemClient,
JsonHandler, ParquetHandler, RowVisitor, Version,
DeltaResult, Engine, EngineData, Error, Expression, ExpressionRef, FileDataReadResultIterator,
FileMeta, FileSystemClient, ParquetHandler, RowVisitor, Version,
};
use itertools::Either::{Left, Right};
use itertools::Itertools;
use std::collections::HashMap;
use std::convert::identity;
use std::sync::{Arc, LazyLock};
use tracing::warn;
use url::Url;

/// Enum representing different file handlers for reading files.
enum FileHandler {
/// Handler for reading JSON files.
Json(Arc<dyn JsonHandler>),
/// Handler for reading Parquet files.
Parquet(Arc<dyn ParquetHandler>),
}

impl FileHandler {
/// Reads files based on the provided file metadata, schema, and optional predicate.
///
/// This function delegates the reading of files to the appropriate handler based on the file type
/// (JSON or Parquet). It returns an iterator over the results, which can be used to process the
/// data in a streaming fashion.
///
/// This function is particularly useful for abstracting the file reading logic for V2 checkpoints,
/// which can be stored in either JSON or Parquet format
fn read_files(
&self,
parts: &[FileMeta],
schema: SchemaRef,
predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
match self {
FileHandler::Json(handler) => handler.read_json_files(parts, schema, predicate),
FileHandler::Parquet(handler) => handler.read_parquet_files(parts, schema, predicate),
}
}
}

#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -234,8 +205,7 @@ impl LogSegment {
commit_read_schema: SchemaRef,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send>>
{
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// `replay` expects commit files to be sorted in descending order, so we reverse the sorted
// commit files
let commit_files: Vec<_> = self
Expand All @@ -250,108 +220,120 @@ impl LogSegment {
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let checkpoint_stream =
Self::create_checkpoint_stream(self, engine, checkpoint_read_schema, meta_predicate)?;
let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();

Ok(Box::new(commit_stream.chain(checkpoint_stream)))
let checkpoint_stream = if !checkpoint_parts.is_empty() {
Right(Self::create_checkpoint_stream(
self,
engine,
checkpoint_read_schema,
meta_predicate,
checkpoint_parts,
)?)
} else {
Left(std::iter::empty())
};
return Ok(commit_stream.chain(checkpoint_stream));
}

fn create_checkpoint_stream(
&self,
engine: &dyn Engine,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send>>
{
let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();

if checkpoint_parts.is_empty() {
return Ok(Box::new(std::iter::empty()));
}

let file_type = match self.checkpoint_parts.first() {
Some(part) => part.file_type.clone(),
None => LogPathFileType::Unknown,
};
checkpoint_parts: Vec<FileMeta>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
let is_json_checkpoint = self.checkpoint_parts[0].extension == "json";

let json_handler = engine.get_json_handler();
let parquet_handler = engine.get_parquet_handler();
let handler = if is_json_checkpoint {
FileHandler::Json(json_handler)
let actions: FileDataReadResultIterator = if is_json_checkpoint {
engine.get_json_handler().read_json_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
} else {
FileHandler::Parquet(parquet_handler)
engine.get_parquet_handler().read_parquet_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
};

let need_file_actions = checkpoint_read_schema.contains(ADD_NAME);
if need_file_actions {
// if adds were requested, we need sidecars as well!
require!(
checkpoint_read_schema.contains(SIDECAR_NAME),
Error::generic("Checkpoint read schema must contain SIDECAR_NAME")
);
}

let is_multi_part_checkpoint = self.checkpoint_parts.len() > 1;

// Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol)
// In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is.
let log_root = self.log_root.clone();
let parquet_handler = engine.get_parquet_handler().clone();
let checkpoint_stream = if need_file_actions {
Left(
actions
// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present
// - 1 or more sidecar batch that are referenced by the checkpoint batch
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Right(
Self::create_stream_for_checkpoint_batch(
checkpoint_batch,
parquet_handler.clone(),
log_root.clone(),
is_multi_part_checkpoint,
)
.map_or_else(|e| Left(std::iter::once(Err(e))), Right),
),
Err(e) => Left(std::iter::once(Err(e))),
}),
)
} else {
Right(actions.map_ok(|batch| (batch, false)))
};

Ok(Box::new(
handler
.read_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present
// - 1 or more sidecar batch that are referenced by the checkpoint batch
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Self::create_stream_for_checkpoint_batch(
&file_type,
checkpoint_batch,
checkpoint_read_schema.clone(),
parquet_handler.clone(),
log_root.clone(),
),
Err(e) => Box::new(std::iter::once(Err(e))),
}),
))
return Ok(checkpoint_stream);
}

fn create_stream_for_checkpoint_batch(
file_type: &LogPathFileType,
checkpoint_batch: Box<dyn EngineData>,
checkpoint_read_schema: SchemaRef,
handler: Arc<dyn ParquetHandler>,
log_root_ref: Url,
) -> Box<dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
match file_type {
LogPathFileType::UuidCheckpoint(_) if checkpoint_read_schema.contains(SIDECAR_NAME) => {
// If the checkpoint is a V2 checkpoint AND the schema contains the sidecar column,
// we need to read the sidecar files and return the iterator of sidecar actions

// Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol)
// In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is.
match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) {
Ok(iterator) => Box::new(iterator.map_ok(|batch| (batch, false))),
Err(e) => Box::new(std::iter::once(Err(e))),
}
}
_ => {
// Return the checkpoint batch as is if we do not need to extract sidecar actions from it
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
is_multi_part_checkpoint: bool,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
// Multi-part checkpoints do not have sidecars, so we can return the batch as is.
if !is_multi_part_checkpoint {
match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) {
Ok(iterator) => Ok(Right(iterator.map_ok(|batch| (batch, false)))),
Err(e) => Ok(Left(std::iter::once(Err(e)))),
}
} else {
Ok(Left(std::iter::once(Ok((checkpoint_batch, false)))))
}
}

fn process_checkpoint_batch(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: Box<dyn EngineData>,
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> {
let mut visitor = SidecarVisitor::default();

// collect sidecars
visitor.visit_rows_of(batch.as_ref())?;

// if there are no sidecars, return the batch as is
if visitor.sidecars.is_empty() {
return Ok(Box::new(std::iter::once(Ok(batch))));
return Ok(Left(std::iter::once(Ok(batch))));
}

//convert sidecar actions to sidecar file paths
Expand All @@ -364,7 +346,11 @@ impl LogSegment {
let sidecar_read_schema = get_log_add_schema().clone();

// if sidecars exist, read the sidecar files and return the iterator of sidecar actions
parquet_handler.read_parquet_files(&sidecar_files?, sidecar_read_schema, None)
Ok(Right(parquet_handler.read_parquet_files(
&sidecar_files?,
sidecar_read_schema,
None,
)?))
}

// Helper function to convert a single sidecar action to a FileMeta
Expand Down

0 comments on commit ba74c24

Please sign in to comment.