From 99a33fa21681853b16e5c4fff1743ea38e2e1ed8 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Wed, 3 Jul 2024 22:45:54 -0300 Subject: [PATCH] fix: delete new partitions --- .../src/controllers/scheduler/partition.rs | 6 --- .../src/controllers/topics/policy.rs | 12 ++--- crates/fluvio-sc/src/stores/topic/store.rs | 48 +++++++------------ .../fluvio_smoke_tests/add-partitions.bats | 16 +++++++ 4 files changed, 36 insertions(+), 46 deletions(-) diff --git a/crates/fluvio-sc/src/controllers/scheduler/partition.rs b/crates/fluvio-sc/src/controllers/scheduler/partition.rs index 8f4e56b80c0..87b5d75fed8 100644 --- a/crates/fluvio-sc/src/controllers/scheduler/partition.rs +++ b/crates/fluvio-sc/src/controllers/scheduler/partition.rs @@ -67,7 +67,6 @@ impl Deref for ReplicaPartitionMap { #[derive(Debug)] pub(crate) struct PartitionScheduler<'a, C: MetadataItem> { spus: &'a SpuLocalStore, - partitions: &'a PartitionLocalStore, scheduling_groups: ReplicaSchedulingGroups, } @@ -82,7 +81,6 @@ where let scheduling_groups = partitions.group_by_spu().await; Self { spus, - partitions, scheduling_groups, } } @@ -91,10 +89,6 @@ where self.spus } - pub(crate) fn partitions(&self) -> &'a PartitionLocalStore { - self.partitions - } - /// Generate replica map for a specific topic #[instrument(level = "debug")] pub async fn generate_replica_map_for_topic( diff --git a/crates/fluvio-sc/src/controllers/topics/policy.rs b/crates/fluvio-sc/src/controllers/topics/policy.rs index 5c1b84e8aab..4caca37a250 100644 --- a/crates/fluvio-sc/src/controllers/topics/policy.rs +++ b/crates/fluvio-sc/src/controllers/topics/policy.rs @@ -221,8 +221,7 @@ impl TopicNextState { let mut next_state = TopicNextState::same_next_state(topic); if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); - next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + next_state.partitions = topic.partitions_from_replicas().await; } next_state } @@ -238,8 +237,7 @@ impl TopicNextState { update_replica_map_for_assigned_topic(partition_map, scheduler.spus()) .await; if next_state.resolution == TopicResolution::Provisioned { - next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + next_state.partitions = topic.partitions_from_replicas().await; } next_state } @@ -250,8 +248,7 @@ impl TopicNextState { ); let mut next_state = TopicNextState::same_next_state(topic); if next_state.resolution == TopicResolution::Provisioned { - next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + next_state.partitions = topic.partitions_from_replicas().await; } next_state } @@ -334,8 +331,7 @@ impl TopicNextState { let mut next_state = TopicNextState::same_next_state(topic); if next_state.resolution == TopicResolution::Provisioned { debug!("creating new partitions"); - next_state.partitions = - topic.create_new_partitions(scheduler.partitions()).await; + next_state.partitions = topic.partitions_from_replicas().await; } next_state } diff --git a/crates/fluvio-sc/src/stores/topic/store.rs b/crates/fluvio-sc/src/stores/topic/store.rs index fa1e0a683c7..3dc2502a60f 100644 --- a/crates/fluvio-sc/src/stores/topic/store.rs +++ b/crates/fluvio-sc/src/stores/topic/store.rs @@ -5,11 +5,9 @@ use fluvio_stream_model::{ store::{MetadataStoreObject, LocalStore}, core::MetadataItem, }; -use tracing::{debug, trace}; +use fluvio_types::PartitionId; use async_trait::async_trait; -use crate::stores::partition::PartitionLocalStore; - use super::*; pub type TopicMetadata = MetadataStoreObject; @@ -19,10 +17,7 @@ pub type DefaultTopicLocalStore = TopicLocalStore; #[async_trait] pub trait TopicMd { - async fn create_new_partitions( - &self, - partition_store: &PartitionLocalStore, - ) -> Vec>; + async fn partitions_from_replicas(&self) -> Vec>; } #[async_trait] @@ -30,31 +25,20 @@ impl TopicMd for TopicMetadata where C: MetadataItem + Send + Sync, { - /// create new partitions from my replica map if it doesn't exists - async fn create_new_partitions( - &self, - partition_store: &PartitionLocalStore, - ) -> Vec> { - let mut partitions = vec![]; - let replica_map = &self.status.replica_map; - trace!(?replica_map, "creating new partitions for topic"); - for (idx, replicas) in replica_map.iter() { - let mirror = self.status.mirror_map.get(idx); - - let replica_key = ReplicaKey::new(self.key(), *idx); - - let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror); - if !partition_store.contains_key(&replica_key).await { - debug!(?replica_key, ?partition_spec, "creating new partition"); - partitions.push( - MetadataStoreObject::with_spec(replica_key, partition_spec) - .with_context(self.ctx.create_child()), - ) - } else { - debug!(?replica_key, "partition already exists"); - } - } - partitions + /// get partitions from replica map + async fn partitions_from_replicas(&self) -> Vec> { + self.status + .replica_map + .iter() + .map(|(idx, replicas)| { + let mirror = self.status.mirror_map.get(idx); + let replica_key = ReplicaKey::new(self.key(), *idx as PartitionId); + let partition_spec = + PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror); + MetadataStoreObject::with_spec(replica_key, partition_spec) + .with_context(self.ctx.create_child()) + }) + .collect() } } diff --git a/tests/cli/fluvio_smoke_tests/add-partitions.bats b/tests/cli/fluvio_smoke_tests/add-partitions.bats index 200afd01952..af9a23b1e10 100644 --- a/tests/cli/fluvio_smoke_tests/add-partitions.bats +++ b/tests/cli/fluvio_smoke_tests/add-partitions.bats @@ -86,3 +86,19 @@ setup_file() { assert_failure } +@test "Delete topic" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + debug_msg "Delete topic" + run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + assert_success + + sleep 1 + debug_msg "Check if the new partition received the message" + run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"' + assert [ ${#lines[@]} -eq 0 ] +}