Skip to content

Commit

Permalink
json compatibility & refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Jan 25, 2025
1 parent 048b30b commit 4c470b6
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 68 deletions.
180 changes: 113 additions & 67 deletions kernel/src/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ impl LogSegment {
commit_read_schema: SchemaRef,
checkpoint_read_schema: SchemaRef,
meta_predicate: Option<ExpressionRef>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send>>
{
// `replay` expects commit files to be sorted in descending order, so we reverse the sorted
// commit files
let commit_files: Vec<_> = self
Expand All @@ -213,88 +214,141 @@ impl LogSegment {
.rev()
.map(|f| f.location.clone())
.collect();

let commit_stream = engine
.get_json_handler()
.read_json_files(&commit_files, commit_read_schema, meta_predicate.clone())?
.map_ok(|batch| (batch, true));

let file_type = self
.checkpoint_parts
.get(0)
.map(|file| file.file_type.clone())
.unwrap_or(LogPathFileType::Unknown);

let checkpoint_parts: Vec<_> = self
.checkpoint_parts
.iter()
.map(|f| f.location.clone())
.collect();

let file_type = if !self.checkpoint_parts.is_empty() {
self.checkpoint_parts[0].file_type.clone()
let is_json_checkpoint = self
.checkpoint_parts
.get(0)
.map_or(false, |file| file.extension == "json");

let log_root = self.log_root.clone();
let json_handler = engine.get_json_handler().clone();
let parquet_handler = engine.get_parquet_handler().clone();

let checkpoint_stream: Box<
dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send,
> = if is_json_checkpoint {
Box::new(
json_handler
.read_json_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Self::create_stream_for_checkpoint_batch(
&file_type,
checkpoint_batch,
checkpoint_read_schema.clone(),
parquet_handler.clone(),
log_root.clone(),
),
Err(e) => Box::new(std::iter::once(Err(e))),
}),
)
} else {
LogPathFileType::Unknown
Box::new(
parquet_handler
.read_parquet_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
.flat_map(move |batch_result| match batch_result {
Ok(checkpoint_batch) => Self::create_stream_for_checkpoint_batch(
&file_type,
checkpoint_batch,
checkpoint_read_schema.clone(),
parquet_handler.clone(),
log_root.clone(),
),
Err(e) => Box::new(std::iter::once(Err(e))),
}),
)
};

let handler = engine.get_parquet_handler();
let log_root_ref = self.log_root.clone();

let checkpoint_stream = engine
.get_parquet_handler()
.read_parquet_files(
&checkpoint_parts,
checkpoint_read_schema.clone(),
meta_predicate,
)?
.flat_map(move |checkpoint_batch_result| {
match checkpoint_batch_result {
Ok(checkpoint_batch) => {
let stream: Box<
dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send,
> = if let LogPathFileType::UuidCheckpoint(_uuid) = &file_type {
if checkpoint_read_schema.contains(SIDECAR_NAME) {
// If the checkpoint is a V2 checkpoint AND the schema contains the sidecar column,
// we need to read the sidecar files and return the iterator of sidecar actions

// Replay is sometimes passed a schema that doesn't contain the sidecar column. (e.g. when reading metadata & protocol)
// In this case, we do not need to read the sidecar files and can chain the checkpoint batch as is.

// Flatten the new batches returned. The new batches could be:
// - the checkpoint batch itself if no sidecar actions are present
// - 1 or more sidecar batch that are referenced by the checkpoint batch
let iterator = Self::process_checkpoint_batch(
handler.clone(),
log_root_ref.clone(),
checkpoint_batch,
);
match iterator {
Ok(iter) => Box::new(iter.map_ok(|batch| (batch, false))),
Err(e) => Box::new(std::iter::once(Err(e))),
}
} else {
// Chain the checkpoint batch as is if we do not need to extract sidecar actions
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
}
} else {
// Chain the checkpoint batch as is if not a UUID checkpoint batch
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
};
stream
}
Err(e) => {
// Chain the error if any
Box::new(std::iter::once(Err(e)))
}
}
});
Ok(Box::new(commit_stream.chain(checkpoint_stream)))
}

Ok(commit_stream.chain(checkpoint_stream))
// fn process_checkpoint_stream<H>(
// handler: Arc<H>,
// checkpoint_parts: &[FileMeta],
// read_schema: SchemaRef,
// meta_predicate: Option<ExpressionRef>,
// file_type: LogPathFileType,
// log_root: Url,
// parquet_handler: Arc<dyn ParquetHandler>,
// ) -> DeltaResult<impl Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send>
// where
// H: JsonHandler + ParquetHandler,
// {
// Ok(handler
// .read_parquet_files(&checkpoint_parts, read_schema.clone(), meta_predicate)?
// .flat_map(move |batch_result| match batch_result {
// Ok(checkpoint_batch) => Self::create_stream_for_checkpoint_batch(
// &file_type,
// checkpoint_batch,
// read_schema.clone(),
// parquet_handler.clone(),
// log_root.clone(),
// ),
// Err(e) => Box::new(std::iter::once(Err(e))),
// }))
// }

fn create_stream_for_checkpoint_batch(
file_type: &LogPathFileType,
checkpoint_batch: Box<dyn EngineData>,
checkpoint_read_schema: SchemaRef,
handler: Arc<dyn ParquetHandler>,
log_root_ref: Url,
) -> Box<dyn Iterator<Item = DeltaResult<(Box<dyn EngineData>, bool)>> + Send> {
match file_type {
LogPathFileType::UuidCheckpoint(_) if checkpoint_read_schema.contains(SIDECAR_NAME) => {
// Continue processing if there is a chance that the checkpoint contains sidecar actions
match Self::process_checkpoint_batch(handler, log_root_ref, checkpoint_batch) {
Ok(iterator) => Box::new(iterator.map_ok(|batch| (batch, false))),
Err(e) => Box::new(std::iter::once(Err(e))),
}
}
_ => {
// The checkpoint does not contain sidecar actions, so we can return the batch as is
Box::new(std::iter::once(Ok((checkpoint_batch, false))))
}
}
}

fn process_checkpoint_batch(
parquet_handler: Arc<dyn ParquetHandler>,
log_root: Url,
batch: Box<dyn EngineData>,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send> {
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send>> {
let mut visitor = SidecarVisitor::default();

// collect sidecars
visitor.visit_rows_of(batch.as_ref())?;

// if there are no sidecars, return the batch as is
if visitor.sidecars.is_empty() {
return Ok(Box::new(std::iter::once(Ok(batch))));
}

//convert sidecar actions to sidecar file paths
let sidecar_files: Result<Vec<_>, _> = visitor
.sidecars
Expand All @@ -305,20 +359,12 @@ impl LogSegment {
let sidecar_read_schema = get_log_add_schema().clone();

// if sidecars exist, read the sidecar files and return the iterator of sidecar actions
if !visitor.sidecars.is_empty() {
return parquet_handler.read_parquet_files(&sidecar_files?, sidecar_read_schema, None);
} else {
return Ok(Box::new(std::iter::once(Ok(batch))));
}
parquet_handler.read_parquet_files(&sidecar_files?, sidecar_read_schema, None)
}

// Helper function to convert a single sidecar action to a FileMeta
fn sidecar_to_filemeta(sidecar: &Sidecar, log_root: &Url) -> Result<FileMeta, Error> {
let location = log_root
.join("_sidecars/")
.map_err(Error::from)?
.join(&sidecar.path)
.map_err(Error::from)?;
let location = log_root.join("_sidecars/")?.join(&sidecar.path)?;
Ok(FileMeta {
location,
last_modified: sidecar.modification_time,
Expand Down
2 changes: 1 addition & 1 deletion kernel/tests/golden_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ golden_test!("time-travel-start", latest_snapshot_test);
golden_test!("time-travel-start-start20", latest_snapshot_test);
golden_test!("time-travel-start-start20-start40", latest_snapshot_test);

// golden_test!("v2-checkpoint-json", latest_snapshot_test);
golden_test!("v2-checkpoint-json", latest_snapshot_test);
golden_test!("v2-checkpoint-parquet", latest_snapshot_test);

// BUG:
Expand Down

0 comments on commit 4c470b6

Please sign in to comment.