Skip to content

Commit

Permalink
fix: delete new partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
fraidev committed Jul 4, 2024
1 parent 04ddf2a commit 99a33fa
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 46 deletions.
6 changes: 0 additions & 6 deletions crates/fluvio-sc/src/controllers/scheduler/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ impl Deref for ReplicaPartitionMap {
#[derive(Debug)]
pub(crate) struct PartitionScheduler<'a, C: MetadataItem> {
spus: &'a SpuLocalStore<C>,
partitions: &'a PartitionLocalStore<C>,
scheduling_groups: ReplicaSchedulingGroups,
}

Expand All @@ -82,7 +81,6 @@ where
let scheduling_groups = partitions.group_by_spu().await;
Self {
spus,
partitions,
scheduling_groups,
}
}
Expand All @@ -91,10 +89,6 @@ where
self.spus
}

pub(crate) fn partitions(&self) -> &'a PartitionLocalStore<C> {
self.partitions
}

/// Generate replica map for a specific topic
#[instrument(level = "debug")]
pub async fn generate_replica_map_for_topic(
Expand Down
12 changes: 4 additions & 8 deletions crates/fluvio-sc/src/controllers/topics/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
next_state.partitions = topic.partitions_from_replicas().await;
}
next_state
}
Expand All @@ -238,8 +237,7 @@ impl<C: MetadataItem> TopicNextState<C> {
update_replica_map_for_assigned_topic(partition_map, scheduler.spus())
.await;
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
next_state.partitions = topic.partitions_from_replicas().await;
}
next_state
}
Expand All @@ -250,8 +248,7 @@ impl<C: MetadataItem> TopicNextState<C> {
);
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
next_state.partitions = topic.partitions_from_replicas().await;
}
next_state
}
Expand Down Expand Up @@ -334,8 +331,7 @@ impl<C: MetadataItem> TopicNextState<C> {
let mut next_state = TopicNextState::same_next_state(topic);
if next_state.resolution == TopicResolution::Provisioned {
debug!("creating new partitions");
next_state.partitions =
topic.create_new_partitions(scheduler.partitions()).await;
next_state.partitions = topic.partitions_from_replicas().await;
}
next_state
}
Expand Down
48 changes: 16 additions & 32 deletions crates/fluvio-sc/src/stores/topic/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@ use fluvio_stream_model::{
store::{MetadataStoreObject, LocalStore},
core::MetadataItem,
};
use tracing::{debug, trace};
use fluvio_types::PartitionId;
use async_trait::async_trait;

use crate::stores::partition::PartitionLocalStore;

use super::*;

pub type TopicMetadata<C> = MetadataStoreObject<TopicSpec, C>;
Expand All @@ -19,42 +17,28 @@ pub type DefaultTopicLocalStore = TopicLocalStore<u32>;

#[async_trait]
pub trait TopicMd<C: MetadataItem> {
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>>;
async fn partitions_from_replicas(&self) -> Vec<PartitionMetadata<C>>;
}

#[async_trait]
impl<C: MetadataItem> TopicMd<C> for TopicMetadata<C>
where
C: MetadataItem + Send + Sync,
{
/// create new partitions from my replica map if it doesn't exists
async fn create_new_partitions(
&self,
partition_store: &PartitionLocalStore<C>,
) -> Vec<PartitionMetadata<C>> {
let mut partitions = vec![];
let replica_map = &self.status.replica_map;
trace!(?replica_map, "creating new partitions for topic");
for (idx, replicas) in replica_map.iter() {
let mirror = self.status.mirror_map.get(idx);

let replica_key = ReplicaKey::new(self.key(), *idx);

let partition_spec = PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
if !partition_store.contains_key(&replica_key).await {
debug!(?replica_key, ?partition_spec, "creating new partition");
partitions.push(
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child()),
)
} else {
debug!(?replica_key, "partition already exists");
}
}
partitions
/// get partitions from replica map
async fn partitions_from_replicas(&self) -> Vec<PartitionMetadata<C>> {
self.status
.replica_map
.iter()
.map(|(idx, replicas)| {
let mirror = self.status.mirror_map.get(idx);
let replica_key = ReplicaKey::new(self.key(), *idx as PartitionId);
let partition_spec =
PartitionSpec::from_replicas(replicas.clone(), &self.spec, mirror);
MetadataStoreObject::with_spec(replica_key, partition_spec)
.with_context(self.ctx.create_child())
})
.collect()
}
}

Expand Down
16 changes: 16 additions & 0 deletions tests/cli/fluvio_smoke_tests/add-partitions.bats
Original file line number Diff line number Diff line change
Expand Up @@ -86,3 +86,19 @@ setup_file() {
assert_failure
}

@test "Delete topic" {
if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on fluvio cli stable version"
fi
if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then
skip "don't run on cluster stable version"
fi
debug_msg "Delete topic"
run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME"
assert_success

sleep 1
debug_msg "Check if the new partition received the message"
run bash -c 'timeout 15s "$FLUVIO_BIN" partition list | grep "$TOPIC_NAME"'
assert [ ${#lines[@]} -eq 0 ]
}

0 comments on commit 99a33fa

Please sign in to comment.