Skip to content

Commit

Permalink
fix: link_parent more realible and delete topic partitions (#4219)
Browse files Browse the repository at this point in the history
* fix: link_parent more realible

* test: test_parent_linking_with_multiple_children_and_do_not_add_children_to_parents_with_stale_version
  • Loading branch information
fraidev authored Oct 29, 2024
1 parent b650cae commit 573751b
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 64 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -239,7 +239,7 @@ impl<C: MetadataItem> TopicNextState<C> {
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -251,7 +251,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down
26 changes: 12 additions & 14 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn partitions_from_replicas(
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
Expand All @@ -30,32 +30,32 @@ impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// get partitions from replica map
async fn partitions_from_replicas(
/// create new partitions from the replica map if it doesn't exists
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
let mut partitions = vec![];
let replica_map = &self.status.replica_map;
trace!(?replica_map, "creating new partitions for topic");
let store = partition_store.read().await;
for (idx, replicas) in replica_map.iter() {
let mirror = self.status.mirror_map.get(idx);

let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
let store = partition_store.read().await;
let partition = store.get(&replica_key);
if let Some(p) = partition {
partitions.push(p.inner().clone());
} else {
if !store.contains_key(&replica_key) {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
drop(store);
partitions
}
}
Expand Down Expand Up @@ -250,12 +250,10 @@ mod test {
let topic = MetadataStoreObject::<TopicSpec, u32>::new(key, spec, status);
let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]);

let partitions = topic.partitions_from_replicas(&partition_store).await;
let partitions = topic.create_new_partitions(&partition_store).await;

assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32));
assert_eq!(partitions[0].spec.leader, 0);
assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[1].spec.leader, 1);
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[0].spec.leader, 1);
}
}
2 changes: 1 addition & 1 deletion crates/fluvio-stream-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-stream-dispatcher"
edition = "2021"
version = "0.13.5"
version = "0.13.6"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio Event Stream access"
repository = "https://github.com/infinyon/fluvio"
Expand Down
124 changes: 80 additions & 44 deletions crates/fluvio-stream-dispatcher/src/metadata/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ cfg_if::cfg_if! {
trace!(?value, "apply");
let store = self.get_store::<S>()?;
value.ctx_mut().item_mut().id = value.key().to_string();
if let Some(owner) = value.ctx().item().owner() {
self.link_parent::<S>(owner, value.ctx().item()).await?;
let item = value.ctx().item().clone();
store.apply(value).await?;
if let Some(owner) = item.owner() {
self.link_parent::<S>(owner, &item).await?;
}
store.apply(value).await
Ok(())
}

async fn update_spec<S>(&self, metadata: LocalMetadataItem, spec: S) -> Result<()>
Expand Down Expand Up @@ -290,7 +292,6 @@ cfg_if::cfg_if! {
#[derive(Debug, Clone)]
struct SpecPointer {
inner: Arc<dyn Any + Send + Sync>,
revision: u64,
store_revision: u64,
path: PathBuf,
}
Expand Down Expand Up @@ -344,14 +345,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.put_child(S::LABEL, child.clone());
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().put_child(S::LABEL, children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand All @@ -362,14 +360,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.remove_child(S::LABEL, child);
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().remove_child(S::LABEL, &children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand Down Expand Up @@ -530,21 +525,6 @@ cfg_if::cfg_if! {
self.path.join(format!("{name}.yaml"))
}

async fn mut_in_place<S: Spec, F>(&self, key: &str, func: F) -> Result<()>
where
F: Fn(&mut LocalStoreObject<S>),
{
if let Some(spec) = self.data.write().get_mut(key) {
let mut obj = spec.downcast::<S>()?;
func(&mut obj);
spec.set(obj);
spec.flush::<S>()?;
Ok(())
} else {
anyhow::bail!("'{key}' not found");
}
}

async fn send_update(&self, mut update: SpecUpdate) {
let store_revision = self
.version
Expand All @@ -559,14 +539,12 @@ cfg_if::cfg_if! {

impl SpecPointer {
fn new<S: Spec, P: AsRef<Path>>(path: P, obj: LocalStoreObject<S>) -> Self {
let revision = obj.ctx().item().revision;
let inner = Arc::new(obj);
let path = path.as_ref().to_path_buf();
let store_revision = Default::default();
Self {
inner,
path,
revision,
store_revision,
}
}
Expand Down Expand Up @@ -600,11 +578,6 @@ cfg_if::cfg_if! {
serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?;
Ok(())
}

fn set<S: Spec>(&mut self, obj: LocalStoreObject<S>) {
self.revision = obj.ctx().item().revision;
self.inner = Arc::new(obj);
}
}

impl SpecUpdate {
Expand Down Expand Up @@ -1327,12 +1300,12 @@ spec:
1
);

assert!(parent_meta
assert_eq!(parent_meta
.children()
.unwrap()
.get(TestSpec::LABEL)
.expect("test spec children")
.contains(child.ctx().item()),);
.first().unwrap().id, child.ctx().item().id);

meta_store
.delete_item::<TestSpec>(child.ctx().item().clone())
Expand All @@ -1353,6 +1326,69 @@ spec:
drop(meta_folder)
}

#[fluvio_future::test]
async fn test_parent_linking_with_multiple_children_and_do_not_add_children_to_parents_with_stale_version() {
// given
let meta_folder = tempfile::tempdir().expect("temp dir created");
let meta_store = LocalMetadataStorage::new(&meta_folder);
let (mut parent, mut children) = test_parent_with_children(4);

let child = children.remove(0);
let child2 = children.remove(0);
let child3 = children.remove(0);
let child4 = children.remove(0);

// only parent without children
parent.ctx_mut().item_mut().set_children(Default::default());
meta_store
.apply(parent.clone())
.await
.expect("applied parent");

// when applying 4 children and one parent
let (r1, r2, r3, r4, r5) = tokio::join!(
meta_store.apply(child.clone()),
meta_store.apply(parent.clone()),
meta_store.apply(child2.clone()),
meta_store.apply(child3.clone()),
meta_store.apply(child4.clone())
);

r1.expect("applied child");
// parent is old, should not be accepted
// it was updated by the child 1
assert_eq!(r2.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0");
r3.expect("applied child");
r4.expect("applied child");
r5.expect("applied child");

let parent_meta = meta_store
.retrieve_items::<ParentSpec>(&NameSpace::All)
.await
.expect("items")
.items
.remove(0)
.ctx_owned()
.into_inner();


// then
assert_eq!(parent_meta.children().unwrap().len(), 1);
let children = parent_meta
.children()
.unwrap()
.get(TestSpec::LABEL)
.expect("test spec children");

assert_eq!(children.len(), 4);
assert!(children.iter().any(|c| c.id == child.ctx.item().id));
assert!(children.iter().any(|c| c.id == child2.ctx.item().id));
assert!(children.iter().any(|c| c.id == child3.ctx.item().id));
assert!(children.iter().any(|c| c.id == child4.ctx.item().id));
drop(meta_folder)
}


#[fluvio_future::test]
async fn test_parent_is_not_existed() {
//given
Expand Down

0 comments on commit 573751b

Please sign in to comment.