diff --git a/Cargo.lock b/Cargo.lock index da42652795..d563ee9d38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3207,7 +3207,7 @@ dependencies = [ [[package]] name = "fluvio-stream-dispatcher" -version = "0.13.5" +version = "0.13.6" dependencies = [ "anyhow", "async-channel 1.9.0", diff --git a/crates/fluvio-sc/src/controllers/topics/policy.rs b/crates/fluvio-sc/src/controllers/topics/policy.rs index f54b1206a3..5c1b84e8aa 100644 --- a/crates/fluvio-sc/src/controllers/topics/policy.rs +++ b/crates/fluvio-sc/src/controllers/topics/policy.rs @@ -222,7 +222,7 @@ impl TopicNextState { if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); next_state.partitions = - topic.partitions_from_replicas(scheduler.partitions()).await; + topic.create_new_partitions(scheduler.partitions()).await; } next_state } @@ -239,7 +239,7 @@ impl TopicNextState { .await; if next_state.resolution == TopicResolution::Provisioned { next_state.partitions = - topic.partitions_from_replicas(scheduler.partitions()).await; + topic.create_new_partitions(scheduler.partitions()).await; } next_state } @@ -251,7 +251,7 @@ impl TopicNextState { let mut next_state = TopicNextState::same_next_state(topic); if next_state.resolution == TopicResolution::Provisioned { next_state.partitions = - topic.partitions_from_replicas(scheduler.partitions()).await; + topic.create_new_partitions(scheduler.partitions()).await; } next_state } @@ -335,7 +335,7 @@ impl TopicNextState { if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); next_state.partitions = - topic.partitions_from_replicas(scheduler.partitions()).await; + topic.create_new_partitions(scheduler.partitions()).await; } next_state } diff --git a/crates/fluvio-sc/src/stores/topic/store.rs b/crates/fluvio-sc/src/stores/topic/store.rs index 30ef23c177..77ec640b2b 100644 --- a/crates/fluvio-sc/src/stores/topic/store.rs +++ b/crates/fluvio-sc/src/stores/topic/store.rs @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore; #[async_trait] pub trait TopicMd { - async fn partitions_from_replicas( + async fn create_new_partitions( &self, partition_store: &PartitionLocalStore, ) -> Vec>; @@ -30,32 +30,32 @@ impl TopicMd for TopicMetadata where C: MetadataItem + Send + Sync, { - /// get partitions from replica map - async fn partitions_from_replicas( + /// create new partitions from the replica map if it doesn't exists + async fn create_new_partitions( &self, partition_store: &PartitionLocalStore, ) -> Vec> { let mut partitions = vec![]; let replica_map = &self.status.replica_map; trace!(?replica_map, "creating new partitions for topic"); + let store = partition_store.read().await; for (idx, replicas) in replica_map.iter() { let mirror = self.status.mirror_map.get(idx); let replica_key = ReplicaKey::new(self.key(), *idx); let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror); - let store = partition_store.read().await; - let partition = store.get(&replica_key); - if let Some(p) = partition { - partitions.push(p.inner().clone()); - } else { + if !store.contains_key(&replica_key) { debug!(?replica_key, ?partition_spec, "creating new partition"); partitions.push( MetadataStoreObject::with_spec(replica_key, partition_spec) .with_context(self.ctx.create_child()), ) + } else { + debug!(?replica_key, "partition already exists"); } } + drop(store); partitions } } @@ -250,12 +250,10 @@ mod test { let topic = MetadataStoreObject::::new(key, spec, status); let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]); - let partitions = topic.partitions_from_replicas(&partition_store).await; + let partitions = topic.create_new_partitions(&partition_store).await; - assert_eq!(partitions.len(), 2); - assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32)); - assert_eq!(partitions[0].spec.leader, 0); - assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32)); - assert_eq!(partitions[1].spec.leader, 1); + assert_eq!(partitions.len(), 1); + assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 1_u32)); + assert_eq!(partitions[0].spec.leader, 1); } } diff --git a/crates/fluvio-stream-dispatcher/Cargo.toml b/crates/fluvio-stream-dispatcher/Cargo.toml index 19d2a817a6..3f4d11ffd8 100644 --- a/crates/fluvio-stream-dispatcher/Cargo.toml +++ b/crates/fluvio-stream-dispatcher/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "fluvio-stream-dispatcher" edition = "2021" -version = "0.13.5" +version = "0.13.6" authors = ["Fluvio Contributors "] description = "Fluvio Event Stream access" repository = "https://github.com/infinyon/fluvio" diff --git a/crates/fluvio-stream-dispatcher/src/metadata/local.rs b/crates/fluvio-stream-dispatcher/src/metadata/local.rs index 7a2b56230b..d14ac07da3 100644 --- a/crates/fluvio-stream-dispatcher/src/metadata/local.rs +++ b/crates/fluvio-stream-dispatcher/src/metadata/local.rs @@ -168,10 +168,12 @@ cfg_if::cfg_if! { trace!(?value, "apply"); let store = self.get_store::()?; value.ctx_mut().item_mut().id = value.key().to_string(); - if let Some(owner) = value.ctx().item().owner() { - self.link_parent::(owner, value.ctx().item()).await?; + let item = value.ctx().item().clone(); + store.apply(value).await?; + if let Some(owner) = item.owner() { + self.link_parent::(owner, &item).await?; } - store.apply(value).await + Ok(()) } async fn update_spec(&self, metadata: LocalMetadataItem, spec: S) -> Result<()> @@ -290,7 +292,6 @@ cfg_if::cfg_if! { #[derive(Debug, Clone)] struct SpecPointer { inner: Arc, - revision: u64, store_revision: u64, path: PathBuf, } @@ -344,14 +345,11 @@ cfg_if::cfg_if! { ) -> Result<()> { trace!(?parent, ?child, "link parent"); let parent_store = self.get_store::()?; - parent_store - .mut_in_place::(parent.uid(), |parent_obj| { - parent_obj - .ctx_mut() - .item_mut() - .put_child(S::LABEL, child.clone()); - }) - .await?; + let mut parent_obj = parent_store.retrieve_item::(parent).await?; + let mut children_without_parent = child.clone(); + children_without_parent.parent = None; + parent_obj.ctx_mut().item_mut().put_child(S::LABEL, children_without_parent); + parent_store.apply(parent_obj).await?; Ok(()) } @@ -362,14 +360,11 @@ cfg_if::cfg_if! { ) -> Result<()> { trace!(?parent, ?child, "link parent"); let parent_store = self.get_store::()?; - parent_store - .mut_in_place::(parent.uid(), |parent_obj| { - parent_obj - .ctx_mut() - .item_mut() - .remove_child(S::LABEL, child); - }) - .await?; + let mut parent_obj = parent_store.retrieve_item::(parent).await?; + let mut children_without_parent = child.clone(); + children_without_parent.parent = None; + parent_obj.ctx_mut().item_mut().remove_child(S::LABEL, &children_without_parent); + parent_store.apply(parent_obj).await?; Ok(()) } @@ -530,21 +525,6 @@ cfg_if::cfg_if! { self.path.join(format!("{name}.yaml")) } - async fn mut_in_place(&self, key: &str, func: F) -> Result<()> - where - F: Fn(&mut LocalStoreObject), - { - if let Some(spec) = self.data.write().get_mut(key) { - let mut obj = spec.downcast::()?; - func(&mut obj); - spec.set(obj); - spec.flush::()?; - Ok(()) - } else { - anyhow::bail!("'{key}' not found"); - } - } - async fn send_update(&self, mut update: SpecUpdate) { let store_revision = self .version @@ -559,14 +539,12 @@ cfg_if::cfg_if! { impl SpecPointer { fn new>(path: P, obj: LocalStoreObject) -> Self { - let revision = obj.ctx().item().revision; let inner = Arc::new(obj); let path = path.as_ref().to_path_buf(); let store_revision = Default::default(); Self { inner, path, - revision, store_revision, } } @@ -600,11 +578,6 @@ cfg_if::cfg_if! { serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?; Ok(()) } - - fn set(&mut self, obj: LocalStoreObject) { - self.revision = obj.ctx().item().revision; - self.inner = Arc::new(obj); - } } impl SpecUpdate { @@ -1327,12 +1300,12 @@ spec: 1 ); - assert!(parent_meta + assert_eq!(parent_meta .children() .unwrap() .get(TestSpec::LABEL) .expect("test spec children") - .contains(child.ctx().item()),); + .first().unwrap().id, child.ctx().item().id); meta_store .delete_item::(child.ctx().item().clone()) @@ -1353,6 +1326,69 @@ spec: drop(meta_folder) } + #[fluvio_future::test] + async fn test_parent_linking_with_multiple_children_and_do_not_add_children_to_parents_with_stale_version() { + // given + let meta_folder = tempfile::tempdir().expect("temp dir created"); + let meta_store = LocalMetadataStorage::new(&meta_folder); + let (mut parent, mut children) = test_parent_with_children(4); + + let child = children.remove(0); + let child2 = children.remove(0); + let child3 = children.remove(0); + let child4 = children.remove(0); + + // only parent without children + parent.ctx_mut().item_mut().set_children(Default::default()); + meta_store + .apply(parent.clone()) + .await + .expect("applied parent"); + + // when applying 4 children and one parent + let (r1, r2, r3, r4, r5) = tokio::join!( + meta_store.apply(child.clone()), + meta_store.apply(parent.clone()), + meta_store.apply(child2.clone()), + meta_store.apply(child3.clone()), + meta_store.apply(child4.clone()) + ); + + r1.expect("applied child"); + // parent is old, should not be accepted + // it was updated by the child 1 + assert_eq!(r2.unwrap_err().to_string(), "attempt to update by stale value: current version: 1, proposed: 0"); + r3.expect("applied child"); + r4.expect("applied child"); + r5.expect("applied child"); + + let parent_meta = meta_store + .retrieve_items::(&NameSpace::All) + .await + .expect("items") + .items + .remove(0) + .ctx_owned() + .into_inner(); + + + // then + assert_eq!(parent_meta.children().unwrap().len(), 1); + let children = parent_meta + .children() + .unwrap() + .get(TestSpec::LABEL) + .expect("test spec children"); + + assert_eq!(children.len(), 4); + assert!(children.iter().any(|c| c.id == child.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child2.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child3.ctx.item().id)); + assert!(children.iter().any(|c| c.id == child4.ctx.item().id)); + drop(meta_folder) + } + + #[fluvio_future::test] async fn test_parent_is_not_existed() { //given