Skip to content

Commit

Permalink
first pass
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastiantia committed Feb 1, 2025
1 parent 06d8dbb commit 5b85cdf
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 2 deletions.
45 changes: 44 additions & 1 deletion kernel/src/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub(crate) const SET_TRANSACTION_NAME: &str = "txn";
pub(crate) const COMMIT_INFO_NAME: &str = "commitInfo";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const CDC_NAME: &str = "cdc";
#[cfg_attr(feature = "developer-visibility", visibility::make(pub))]
pub(crate) const SIDECAR_NAME: &str = "sidecar";

static LOG_ADD_SCHEMA: LazyLock<SchemaRef> =
LazyLock::new(|| StructType::new([Option::<Add>::get_struct_field(ADD_NAME)]).into());
Expand All @@ -58,6 +60,7 @@ static LOG_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Option::<SetTransaction>::get_struct_field(SET_TRANSACTION_NAME),
Option::<CommitInfo>::get_struct_field(COMMIT_INFO_NAME),
Option::<Cdc>::get_struct_field(CDC_NAME),
Option::<Sidecar>::get_struct_field(SIDECAR_NAME),
// We don't support the following actions yet
//Option::<DomainMetadata>::get_struct_field(DOMAIN_METADATA_NAME),
])
Expand Down Expand Up @@ -511,6 +514,28 @@ pub struct SetTransaction {
pub last_updated: Option<i64>,
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Debug, Clone, Schema)]
#[cfg_attr(test, derive(Serialize, Default), serde(rename_all = "camelCase"))]
struct Sidecar {
/// A path to the sidecar file. Because sidecar files must always reside in the table's own
/// _delta_log/_sidecars directory, implementations are encouraged to store only the file's name.
/// The path is a URI as specified by [RFC 2396 URI Generic Syntax], which needs to be decoded
/// to get the data file path.
///
/// [RFC 2396 URI Generic Syntax]: https://www.ietf.org/rfc/rfc2396.txt
pub(crate) path: String,

/// The size of the sidecar file in bytes.
pub(crate) size_in_bytes: i64,

/// The time this logical file was created, as milliseconds since the epoch.
pub(crate) modification_time: i64,

/// A map containing any additional metadata about the logicial file.
pub(crate) tags: Option<HashMap<String, String>>,
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -637,7 +662,7 @@ mod tests {
fn test_cdc_schema() {
let schema = get_log_schema()
.project(&[CDC_NAME])
.expect("Couldn't get remove field");
.expect("Couldn't get cdc field");
let expected = Arc::new(StructType::new([StructField::nullable(
"cdc",
StructType::new([
Expand All @@ -654,6 +679,24 @@ mod tests {
assert_eq!(schema, expected);
}

#[test]
fn test_sidecar_schema() {
let schema = get_log_schema()
.project(&[SIDECAR_NAME])
.expect("Couldn't get sidecar field");
let expected = Arc::new(StructType::new([StructField::new(
"sidecar",
StructType::new([
StructField::new("path", DataType::STRING, false),
StructField::new("sizeInBytes", DataType::LONG, false),
StructField::new("modificationTime", DataType::LONG, false),
tags_field(),
]),
true,
)]));
assert_eq!(schema, expected);
}

#[test]
fn test_transaction_schema() {
let schema = get_log_schema()
Expand Down
78 changes: 77 additions & 1 deletion kernel/src/actions/visitors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::collections::HashMap;
use std::sync::LazyLock;

use crate::actions::SIDECAR_NAME;
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::schema::{column_name, ColumnName, ColumnNamesAndTypes, DataType};
use crate::utils::require;
Expand All @@ -12,7 +13,7 @@ use crate::{DeltaResult, Error};
use super::deletion_vector::DeletionVectorDescriptor;
use super::schemas::ToSchema as _;
use super::{
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, ADD_NAME, CDC_NAME,
Add, Cdc, Format, Metadata, Protocol, Remove, SetTransaction, Sidecar, ADD_NAME, CDC_NAME,
METADATA_NAME, PROTOCOL_NAME, REMOVE_NAME, SET_TRANSACTION_NAME,
};

Expand Down Expand Up @@ -444,6 +445,51 @@ impl RowVisitor for SetTransactionVisitor {
}
}

#[allow(unused)] //TODO: Remove once we implement V2 checkpoint file processing
#[derive(Default)]
struct SidecarVisitor {
sidecars: Vec<Sidecar>,
}

impl SidecarVisitor {
fn visit_sidecar<'a>(
row_index: usize,
path: String,
getters: &[&'a dyn GetData<'a>],
) -> DeltaResult<Sidecar> {
Ok(Sidecar {
path,
size_in_bytes: getters[1].get(row_index, "sidecar.sizeInBytes")?,
modification_time: getters[2].get(row_index, "sidecar.modificationTime")?,
tags: getters[3].get_opt(row_index, "sidecar.tags")?,
})
}
}

impl RowVisitor for SidecarVisitor {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| Sidecar::to_schema().leaves(SIDECAR_NAME));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 4,
Error::InternalError(format!(
"Wrong number of SidecarVisitor getters: {}",
getters.len()
))
);
for i in 0..row_count {
// Since path column is required, use it to detect presence of a sidecar action
if let Some(path) = getters[0].get_opt(i, "sidecar.path")? {
self.sidecars.push(Self::visit_sidecar(i, path, getters)?);
}
}
Ok(())
}
}

/// Get a DV out of some engine data. The caller is responsible for slicing the `getters` slice such
/// that the first element contains the `storageType` element of the deletion vector.
pub(crate) fn visit_deletion_vector_at<'a>(
Expand Down Expand Up @@ -544,6 +590,36 @@ mod tests {
Ok(())
}

#[test]
fn test_parse_action_batch_with_sidecar_actions() -> DeltaResult<()> {
let engine = SyncEngine::new();
let json_handler = engine.get_json_handler();
let json_strings: StringArray = vec![
r#"{"sidecar":{"path":"016ae953-37a9-438e-8683-9a9a4a79a395.parquet","sizeInBytes":9268,"modificationTime":1714496113961,"tags": null}}"#,
r#"{"sidecar":{"path":"3a0d65cd-4056-49b8-937b-95f9e3ee90e5.parquet","sizeInBytes":9268,"modificationTime":1714496113962,"tags": null}}"#,
].into();

let output_schema = get_log_schema().clone();
let batch = json_handler
.parse_json(string_array_to_engine_data(json_strings), output_schema)
.unwrap();
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(batch.as_ref())?;

assert_eq!(visitor.sidecars.len(), 2);
Ok(())
}

#[test]
fn test_parse_action_batch_without_sidecar_actions() -> DeltaResult<()> {
let data = action_batch();
let mut visitor = SidecarVisitor::default();
visitor.visit_rows_of(data.as_ref())?;

assert_eq!(visitor.sidecars.len(), 0);
Ok(())
}

#[test]
fn test_parse_metadata() -> DeltaResult<()> {
let data = action_batch();
Expand Down

0 comments on commit 5b85cdf

Please sign in to comment.