Skip to content

Commit

Permalink
feat: allow replacement of entire datafile when the schema lines up c…
Browse files Browse the repository at this point in the history
…orrectly
  • Loading branch information
chebbyChefNEQ committed Jan 30, 2025
1 parent a7c5216 commit c615eb0
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 14 deletions.
9 changes: 8 additions & 1 deletion protos/transaction.proto
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ message Transaction {
}
}

// An operation that replaces the data in a region of the table with new data.
message DataReplacement {
repeated DataFragment old_fragments = 1;
repeated DataFile new_datafiles = 2;
}

// The operation of this transaction.
oneof operation {
Append append = 100;
Expand All @@ -186,11 +192,12 @@ message Transaction {
Update update = 108;
Project project = 109;
UpdateConfig update_config = 110;
DataReplacement data_replacement = 111;
}

// An operation to apply to the blob dataset
oneof blob_operation {
Append blob_append = 200;
Overwrite blob_overwrite = 202;
}
}
}
195 changes: 182 additions & 13 deletions rust/lance/src/dataset/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
//! a conflict. Some operations have additional conditions that must be met for
//! them to be compatible.
//!
//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig |
//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|-------------|
//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ |
//! | Delete / Update | ✅ | (1) | ❌ | ✅ | (1) | ❌ | ❌ | ✅ |
//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | (2) |
//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
//! | Rewrite | ✅ | (1) | ❌ | ❌ | (1) | ❌ | ❌ | ✅ |
//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ |
//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ |
//! | UpdateConfig | ✅ | ✅ | (2) | ✅ | ✅ | ✅ | ✅ | (2) |
//! | | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | DataReplacement |
//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------|-----------------|
//! | Append | ✅ | ✅ | ❌ | ✅ | ✅ | ❌ | ❌ | ✅ | ✅
//! | Delete / Update | ✅ | 1️⃣ | ❌ | ✅ | 1️⃣ | ❌ | ❌ | ✅ | ✅
//! | Overwrite/Create | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅
//! | Create index | ✅ | ✅ | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ | 3️⃣
//! | Rewrite | ✅ | 1️⃣ | ❌ | ❌ | 1️⃣ | ❌ | ❌ | ✅ | 3️⃣
//! | Merge | ❌ | ❌ | ❌ | ❌ | ✅ | ❌ | ❌ | ✅ | ✅
//! | Project | ✅ | ✅ | ❌ | ❌ | ✅ | ❌ | ✅ | ✅ | ✅
//! | UpdateConfig | ✅ | ✅ | 2️⃣ | ✅ | ✅ | ✅ | ✅ | 2️⃣ | ✅
//! | DataReplacement | ✅ | ✅ | ❌ | 3️⃣ | 3️⃣ | ✅ | ❌* | ✅ | 3️⃣
//!
//! (1) Delete, update, and rewrite are compatible with each other and themselves only if
//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if
//! they affect distinct fragments. Otherwise, they conflict.
//! (2) Operations that mutate the config conflict if one of the operations upserts a key
//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key
//! that if referenced by another concurrent operation or if both operations modify the schema
//! metadata or the same field metadata.
//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS
//! the operation does not modify the region of the column being replaced.
//! * This could become allowed in the future
//!
use std::{
collections::{HashMap, HashSet},
Expand All @@ -51,7 +56,7 @@ use lance_io::object_store::ObjectStore;
use lance_table::{
format::{
pb::{self, IndexMetadata},
DataStorageFormat, Fragment, Index, Manifest, RowIdMeta,
DataFile, DataStorageFormat, Fragment, Index, Manifest, RowIdMeta,
},
io::{
commit::CommitHandler,
Expand Down Expand Up @@ -136,6 +141,25 @@ pub enum Operation {
/// Indices that have been updated with the new row addresses
rewritten_indices: Vec<RewrittenIndex>,
},
/// Replace data in a column in the dataset with a new data. This is used for
/// null column population where we replace an entirely null column with a
/// new column that has data.
///
/// This operation will only allow replacing files that contains the same schema
/// e.g. if the original files contains column A, B, C and the new files contains
/// only column A, B then the operation is not allowed. As we would need to split
/// the original files into two files, one with column A, B and the other with column C.
///
/// Corollary to the above: the operation will also not allow replacing files layouts
/// that are not uniform across all fragments.
/// e.g. if fragments being replaced contains files with different schema layouts on
/// the column being replaced, the operation is not allowed.
/// say frag_1: [A] [B, C] and frag_2: [A, B] [C] and we are trying to replace column A
/// with a new column A the operation is not allowed.
DataReplacement {
old_fragments: Vec<Fragment>,
new_datafiles: Vec<DataFile>,
},
/// Merge a new column in
Merge {
fragments: Vec<Fragment>,
Expand Down Expand Up @@ -229,6 +253,9 @@ impl Operation {
.map(|f| f.id)
.chain(removed_fragment_ids.iter().copied()),
),
Self::DataReplacement { old_fragments, .. } => {
Box::new(old_fragments.iter().map(|f| f.id))
}
}
}

Expand Down Expand Up @@ -332,6 +359,7 @@ impl Operation {
Self::Update { .. } => "Update",
Self::Project { .. } => "Project",
Self::UpdateConfig { .. } => "UpdateConfig",
Self::DataReplacement { .. } => "DataReplacement",
}
}
}
Expand Down Expand Up @@ -370,6 +398,7 @@ impl Transaction {
Operation::ReserveFragments { .. } => false,
Operation::Project { .. } => false,
Operation::UpdateConfig { .. } => false,
Operation::DataReplacement { .. } => false,
_ => true,
},
Operation::Rewrite { .. } => match &other.operation {
Expand All @@ -385,6 +414,13 @@ impl Transaction {
}
Operation::Project { .. } => false,
Operation::UpdateConfig { .. } => false,
Operation::DataReplacement {
old_fragments,

Check warning on line 418 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `old_fragments`

Check warning on line 418 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `old_fragments`

Check warning on line 418 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `old_fragments`
new_datafiles,

Check warning on line 419 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `new_datafiles`

Check warning on line 419 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`

Check warning on line 419 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`
} => {
// TODO: check that the fragments being replaced are not part of the groups
true
}
_ => true,
},
// Restore always succeeds
Expand All @@ -411,6 +447,10 @@ impl Transaction {
// if the rewrite changed more than X% of row ids.
Operation::Rewrite { .. } => true,
Operation::UpdateConfig { .. } => false,
Operation::DataReplacement { .. } => {
// TODO: check that the new indices isn't on the column being replaced
true
}
_ => true,
},
Operation::Delete { .. } | Operation::Update { .. } => match &other.operation {
Expand Down Expand Up @@ -467,6 +507,29 @@ impl Transaction {
Operation::UpdateConfig { .. } => false,
_ => true,
},
Operation::DataReplacement { new_datafiles, .. } => match &other.operation {

Check warning on line 510 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `new_datafiles`

Check warning on line 510 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`

Check warning on line 510 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`
Operation::Append { .. }
| Operation::Delete { .. }
| Operation::Update { .. }
| Operation::Merge { .. }
| Operation::UpdateConfig { .. } => false,
Operation::CreateIndex { new_indices, .. } => {

Check warning on line 516 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `new_indices`

Check warning on line 516 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_indices`

Check warning on line 516 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_indices`
// TODO: check that the new indices isn't on the column being replaced
true
}
Operation::Rewrite { groups, .. } => {

Check warning on line 520 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `groups`

Check warning on line 520 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `groups`

Check warning on line 520 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `groups`
// TODO: check that the fragments being replaced are not part of the groups
true
}
Operation::DataReplacement {
old_fragments,

Check warning on line 525 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `old_fragments`

Check warning on line 525 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `old_fragments`

Check warning on line 525 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `old_fragments`
new_datafiles,

Check warning on line 526 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (stable)

unused variable: `new_datafiles`

Check warning on line 526 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`

Check warning on line 526 in rust/lance/src/dataset/transaction.rs

View workflow job for this annotation

GitHub Actions / linux-build (nightly)

unused variable: `new_datafiles`
} => {
// TODO: check cell conflicts
true
}
_ => true,
},
}
}

Expand Down Expand Up @@ -744,6 +807,90 @@ impl Transaction {
Operation::Restore { .. } => {
unreachable!()
}
Operation::DataReplacement {
old_fragments,
new_datafiles,
} => {
// 0. check we have the same number of old fragments as new data files
if old_fragments.len() != new_datafiles.len() {
return Err(Error::invalid_input(
"Number of old fragments must match number of new data files",
location!(),
));
}

// 1. make sure the new files all have the same fields
if new_datafiles
.iter()
.map(|f| f.fields.clone())
.collect::<HashSet<_>>()
.len()
!= 1
{
// TODO: better error message to explain what's wrong
return Err(Error::invalid_input(
"All new data files must have the same fields",
location!(),
));
}

// 2. check that the fragments being modified have isomorphic layouts along the columns being replaced
// 3. add modified fragments to final_fragments
for (frag, new_file) in old_fragments.into_iter().zip(new_datafiles) {
let mut new_frag = frag.clone();

// TODO: check new file and fragment are the same length

let mut columns_covered = HashSet::new();
for file in &mut new_frag.files {
if file.fields == new_file.fields
&& file.file_major_version == new_file.file_major_version
&& file.file_minor_version == new_file.file_minor_version
{
// assign the new file path to the fragment
file.path = new_file.path.clone();
}
columns_covered.extend(file.fields.iter());
}
// SPECIAL CASE: if the column(s) being replaced are not covered by the fragment
// Then it means it's a all-NULL column that is being replaced with real data
// just add it to the final fragments
if columns_covered.is_disjoint(&new_file.fields.iter().collect()) {
new_frag.add_file(
new_file.path.clone(),
new_file.fields.clone(),
new_file.column_indices.clone(),
&LanceFileVersion::try_from_major_minor(
new_file.file_major_version,
new_file.file_minor_version,
)
.expect("Expected valid file version"),
);
}

// Nothing changed in the current fragment, which is not expected -- error out
if &new_frag == frag {
// TODO: better error message to explain what's wrong
return Err(Error::invalid_input(
"Expected to modify the fragment but no changes were made",
location!(),
));
}
final_fragments.push(new_frag);
}

let fragments_changed =
final_fragments.iter().map(|f| f.id).collect::<HashSet<_>>();

// 4. push fragments that didn't change back to final_fragments
let unmodified_fragments = maybe_existing_fragments?
.iter()
.filter(|f| !fragments_changed.contains(&f.id))
.cloned()
.collect::<Vec<_>>();

final_fragments.extend(unmodified_fragments);
}
};

// If a fragment was reserved then it may not belong at the end of the fragments list.
Expand Down Expand Up @@ -1164,6 +1311,21 @@ impl TryFrom<pb::Transaction> for Transaction {
field_metadata,
}
}
Some(pb::transaction::Operation::DataReplacement(
pb::transaction::DataReplacement {
old_fragments,
new_datafiles,
},
)) => Operation::DataReplacement {
old_fragments: old_fragments
.into_iter()
.map(Fragment::try_from)
.collect::<Result<Vec<_>>>()?,
new_datafiles: new_datafiles
.into_iter()
.map(DataFile::try_from)
.collect::<Result<Vec<_>>>()?,
},
None => {
return Err(Error::Internal {
message: "Transaction message did not contain an operation".to_string(),
Expand Down Expand Up @@ -1380,6 +1542,13 @@ impl From<&Transaction> for pb::Transaction {
})
.unwrap_or(Default::default()),
}),
Operation::DataReplacement {
old_fragments,
new_datafiles,
} => pb::transaction::Operation::DataReplacement(pb::transaction::DataReplacement {
old_fragments: old_fragments.iter().map(pb::DataFragment::from).collect(),
new_datafiles: new_datafiles.iter().map(pb::DataFile::from).collect(),
}),
};

let blob_operation = value.blobs_op.as_ref().map(|op| match op {
Expand Down

0 comments on commit c615eb0

Please sign in to comment.