Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: link_parent more realible and delete topic partitions #4219

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -239,7 +239,7 @@ impl<C: MetadataItem> TopicNextState<C> {
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand All @@ -251,7 +251,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down Expand Up @@ -335,7 +335,7 @@ impl<C: MetadataItem> TopicNextState<C> {
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.partitions_from_replicas(scheduler.partitions()).await;
topic.create_new_partitions(scheduler.partitions()).await;
}
next_state
}
Expand Down
26 changes: 12 additions & 14 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn partitions_from_replicas(
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
Expand All @@ -30,32 +30,32 @@ impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// get partitions from replica map
async fn partitions_from_replicas(
/// create new partitions from the replica map if it doesn't exists
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
let mut partitions = vec![];
let replica_map = &self.status.replica_map;
trace!(?replica_map, "creating new partitions for topic");
let store = partition_store.read().await;
for (idx, replicas) in replica_map.iter() {
let mirror = self.status.mirror_map.get(idx);

let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
let store = partition_store.read().await;
let partition = store.get(&replica_key);
if let Some(p) = partition {
partitions.push(p.inner().clone());
} else {
if !store.contains_key(&replica_key) {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
drop(store);
partitions
}
}
Expand Down Expand Up @@ -250,12 +250,10 @@ mod test {
let topic = MetadataStoreObject::<TopicSpec, u32>::new(key, spec, status);
let partition_store = DefaultPartitionStore::bulk_new(vec![partition_stored]);

let partitions = topic.partitions_from_replicas(&partition_store).await;
let partitions = topic.create_new_partitions(&partition_store).await;

assert_eq!(partitions.len(), 2);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 0_u32));
assert_eq!(partitions[0].spec.leader, 0);
assert_eq!(partitions[1].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[1].spec.leader, 1);
assert_eq!(partitions.len(), 1);
assert_eq!(partitions[0].key, ReplicaKey::new("topic-1", 1_u32));
assert_eq!(partitions[0].spec.leader, 1);
}
}
2 changes: 1 addition & 1 deletion crates/fluvio-stream-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-stream-dispatcher"
edition = "2021"
version = "0.13.5"
version = "0.13.6"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio Event Stream access"
repository = "https://github.com/infinyon/fluvio"
Expand Down
61 changes: 17 additions & 44 deletions crates/fluvio-stream-dispatcher/src/metadata/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,12 @@ cfg_if::cfg_if! {
trace!(?value, "apply");
let store = self.get_store::<S>()?;
value.ctx_mut().item_mut().id = value.key().to_string();
if let Some(owner) = value.ctx().item().owner() {
self.link_parent::<S>(owner, value.ctx().item()).await?;
let item = value.ctx().item().clone();
store.apply(value).await?;
if let Some(owner) = item.owner() {
self.link_parent::<S>(owner, &item).await?;
}
store.apply(value).await
Ok(())
}

async fn update_spec<S>(&self, metadata: LocalMetadataItem, spec: S) -> Result<()>
Expand Down Expand Up @@ -290,7 +292,6 @@ cfg_if::cfg_if! {
#[derive(Debug, Clone)]
struct SpecPointer {
inner: Arc<dyn Any + Send + Sync>,
revision: u64,
store_revision: u64,
path: PathBuf,
}
Expand Down Expand Up @@ -344,14 +345,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.put_child(S::LABEL, child.clone());
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().put_child(S::LABEL, children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand All @@ -362,14 +360,11 @@ cfg_if::cfg_if! {
) -> Result<()> {
trace!(?parent, ?child, "link parent");
let parent_store = self.get_store::<S::Owner>()?;
parent_store
.mut_in_place::<S::Owner, _>(parent.uid(), |parent_obj| {
parent_obj
.ctx_mut()
.item_mut()
.remove_child(S::LABEL, child);
})
.await?;
let mut parent_obj = parent_store.retrieve_item::<S::Owner>(parent).await?;
let mut children_without_parent = child.clone();
children_without_parent.parent = None;
parent_obj.ctx_mut().item_mut().remove_child(S::LABEL, &children_without_parent);
parent_store.apply(parent_obj).await?;
Ok(())
}

Expand Down Expand Up @@ -530,21 +525,6 @@ cfg_if::cfg_if! {
self.path.join(format!("{name}.yaml"))
}

async fn mut_in_place<S: Spec, F>(&self, key: &str, func: F) -> Result<()>
where
F: Fn(&mut LocalStoreObject<S>),
{
if let Some(spec) = self.data.write().get_mut(key) {
let mut obj = spec.downcast::<S>()?;
func(&mut obj);
spec.set(obj);
spec.flush::<S>()?;
Ok(())
} else {
anyhow::bail!("'{key}' not found");
}
}

async fn send_update(&self, mut update: SpecUpdate) {
let store_revision = self
.version
Expand All @@ -559,14 +539,12 @@ cfg_if::cfg_if! {

impl SpecPointer {
fn new<S: Spec, P: AsRef<Path>>(path: P, obj: LocalStoreObject<S>) -> Self {
let revision = obj.ctx().item().revision;
let inner = Arc::new(obj);
let path = path.as_ref().to_path_buf();
let store_revision = Default::default();
Self {
inner,
path,
revision,
store_revision,
}
}
Expand Down Expand Up @@ -600,11 +578,6 @@ cfg_if::cfg_if! {
serde_yaml::to_writer(std::fs::File::create(&self.path)?, &storage)?;
Ok(())
}

fn set<S: Spec>(&mut self, obj: LocalStoreObject<S>) {
self.revision = obj.ctx().item().revision;
self.inner = Arc::new(obj);
}
}

impl SpecUpdate {
Expand Down Expand Up @@ -1327,12 +1300,12 @@ spec:
1
);

assert!(parent_meta
assert_eq!(parent_meta
.children()
.unwrap()
.get(TestSpec::LABEL)
.expect("test spec children")
.contains(child.ctx().item()),);
.first().unwrap().id, child.ctx().item().id);

meta_store
.delete_item::<TestSpec>(child.ctx().item().clone())
Expand Down
Loading