Skip to content

Commit

Permalink
feat: added local metadata store impl (#3610)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Galibey committed Oct 23, 2023
1 parent 5554504 commit 09a0a17
Show file tree
Hide file tree
Showing 25 changed files with 1,254 additions and 161 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "fluvio-protocol"
edition = "2021"
version = "0.10.6"
version = "0.10.7"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio streaming protocol"
repository = "https://github.com/infinyon/fluvio"
Expand Down
12 changes: 10 additions & 2 deletions crates/fluvio-protocol/src/record/replica.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::convert::TryFrom;
use std::{convert::TryFrom, str::FromStr};
use std::fmt;

use fluvio_types::PartitionId;
Expand Down Expand Up @@ -53,7 +53,15 @@ impl TryFrom<String> for ReplicaKey {
type Error = PartitionError;

fn try_from(value: String) -> Result<Self, PartitionError> {
let (topic, partition) = decompose_partition_name(&value)?;
value.parse()
}
}

impl FromStr for ReplicaKey {
type Err = PartitionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let (topic, partition) = decompose_partition_name(s)?;
Ok(ReplicaKey::new(topic, partition))
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-sc-schema/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-sc-schema"
version = "0.21.0"
version = "0.21.1"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio API for SC"
Expand Down
1 change: 1 addition & 0 deletions crates/fluvio-sc-schema/src/objects/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use super::DYN_OBJ;
#[cfg_attr(
feature = "use_serde",
derive(serde::Serialize, serde::Deserialize),
serde(bound(deserialize = "S: serde::de::DeserializeOwned")),
serde(rename_all = "camelCase")
)]
pub struct Metadata<S>
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-sc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ fluvio-types = { workspace = true, features = [
"events",
] }
fluvio-sc-schema = { workspace = true, features = ["use_serde", "json"] }
fluvio-stream-model = { workspace = true, features = ["k8"] }
fluvio-stream-model = { workspace = true, features = ["k8", "use_serde"] }
fluvio-controlplane = { workspace = true }
fluvio-controlplane-metadata = { workspace = true, features = ["k8","serde"] }
fluvio-stream-dispatcher = { workspace = true, features = ["k8"]}
fluvio-stream-dispatcher = { workspace = true, features = ["k8", "local"]}
k8-client = { workspace = true, features = ["memory_client"] }
fluvio-protocol = { workspace = true }
fluvio-socket = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/k8/controllers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ mod k8_operator {
) {
let config = global_ctx.config();

let spu_service_ctx: StoreContext<SpuServiceSpec> = StoreContext::new();
let statefulset_ctx: StoreContext<StatefulsetSpec> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec> = StoreContext::new();
let spu_service_ctx: StoreContext<SpuServiceSpec, K8MetaItem> = StoreContext::new();
let statefulset_ctx: StoreContext<StatefulsetSpec, K8MetaItem> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec, K8MetaItem> = StoreContext::new();

let config_ctx: StoreContext<ScK8Config> = StoreContext::new();
let config_ctx: StoreContext<ScK8Config, K8MetaItem> = StoreContext::new();

info!("starting k8 cluster operators");

Expand Down
25 changes: 13 additions & 12 deletions crates/fluvio-sc/src/k8/controllers/spg_stateful.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::time::Duration;

use anyhow::Result;
use fluvio_stream_dispatcher::store::K8ChangeListener;
use fluvio_stream_model::store::k8::K8MetaItem;
use tracing::debug;
use tracing::error;
use tracing::trace;
Expand All @@ -24,22 +25,22 @@ use crate::k8::objects::spg_service::SpgServiceSpec;
/// Update Statefulset and Service from SPG
pub struct SpgStatefulSetController {
namespace: String,
groups: StoreContext<SpuGroupSpec>,
spus: StoreContext<SpuSpec>,
statefulsets: StoreContext<StatefulsetSpec>,
spg_services: StoreContext<SpgServiceSpec>,
configs: StoreContext<ScK8Config>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
spus: StoreContext<SpuSpec, K8MetaItem>,
statefulsets: StoreContext<StatefulsetSpec, K8MetaItem>,
spg_services: StoreContext<SpgServiceSpec, K8MetaItem>,
configs: StoreContext<ScK8Config, K8MetaItem>,
tls: Option<TlsConfig>,
}

impl SpgStatefulSetController {
pub fn start(
namespace: String,
configs: StoreContext<ScK8Config>,
groups: StoreContext<SpuGroupSpec>,
statefulsets: StoreContext<StatefulsetSpec>,
spus: StoreContext<SpuSpec>,
spg_services: StoreContext<SpgServiceSpec>,
configs: StoreContext<ScK8Config, K8MetaItem>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
statefulsets: StoreContext<StatefulsetSpec, K8MetaItem>,
spus: StoreContext<SpuSpec, K8MetaItem>,
spg_services: StoreContext<SpgServiceSpec, K8MetaItem>,
tls: Option<TlsConfig>,
) {
let controller = Self {
Expand Down Expand Up @@ -243,8 +244,8 @@ mod test {
let test_env = TestEnv::create().await;
let (global_ctx, config_ctx) = test_env.create_global_ctx().await;

let statefulset_ctx: StoreContext<StatefulsetSpec> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec> = StoreContext::new();
let statefulset_ctx: StoreContext<StatefulsetSpec, K8MetaItem> = StoreContext::new();
let spg_service_ctx: StoreContext<SpgServiceSpec, K8MetaItem> = StoreContext::new();

// start statefullset dispatcher
MetadataDispatcher::<_, _, K8MetaItem>::start(
Expand Down
12 changes: 6 additions & 6 deletions crates/fluvio-sc/src/k8/controllers/spu_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ use crate::stores::spg::SpuGroupSpec;
/// This is only place where we make changes to SPU
/// For each SPU Group, we map to children SPUs so it's 1:M parent-child relationship
pub struct K8SpuController {
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
spus: StoreContext<SpuSpec>,
services: StoreContext<SpuServiceSpec, K8MetaItem>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
spus: StoreContext<SpuSpec, K8MetaItem>,
}

impl fmt::Display for K8SpuController {
Expand All @@ -43,9 +43,9 @@ impl fmt::Debug for K8SpuController {

impl K8SpuController {
pub fn start(
spus: StoreContext<SpuSpec>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
spus: StoreContext<SpuSpec, K8MetaItem>,
services: StoreContext<SpuServiceSpec, K8MetaItem>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
) {
let controller = Self {
services,
Expand Down
12 changes: 6 additions & 6 deletions crates/fluvio-sc/src/k8/controllers/spu_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ use crate::k8::objects::spu_service::SpuServiceSpec;

/// Sync individual SPU services from SPU Group
pub struct SpuServiceController {
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
configs: StoreContext<ScK8Config>,
services: StoreContext<SpuServiceSpec, K8MetaItem>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
configs: StoreContext<ScK8Config, K8MetaItem>,
}

impl fmt::Display for SpuServiceController {
Expand All @@ -36,9 +36,9 @@ impl fmt::Debug for SpuServiceController {

impl SpuServiceController {
pub fn start(
configs: StoreContext<ScK8Config>,
services: StoreContext<SpuServiceSpec>,
groups: StoreContext<SpuGroupSpec>,
configs: StoreContext<ScK8Config, K8MetaItem>,
services: StoreContext<SpuServiceSpec, K8MetaItem>,
groups: StoreContext<SpuGroupSpec, K8MetaItem>,
) {
let controller = Self {
configs,
Expand Down
7 changes: 5 additions & 2 deletions crates/fluvio-sc/src/k8/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ impl TestEnv {
&self.client
}

pub async fn create_global_ctx(&self) -> (K8SharedContext, StoreContext<ScK8Config>) {
pub async fn create_global_ctx(
&self,
) -> (K8SharedContext, StoreContext<ScK8Config, K8MetaItem>) {
let config_map = ScConfigMetadata::with_spec("fluvio", ScK8Config::default());
let config_store = LocalStore::new_shared();
config_store.sync_all(vec![config_map]).await;

let config_ctx: StoreContext<ScK8Config> = StoreContext::new_with_store(config_store);
let config_ctx: StoreContext<ScK8Config, K8MetaItem> =
StoreContext::new_with_store(config_store);
assert!(config_ctx.store().value("fluvio").await.is_some());

(Context::shared_metadata(ScConfig::default()), config_ctx)
Expand Down
4 changes: 2 additions & 2 deletions crates/fluvio-sc/src/k8/objects/spg_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl SpuGroupObj {
namespace: &str,
spu_k8_config: &ScK8Config,
tls: Option<&TlsConfig>,
) -> (String, WSAction<StatefulsetSpec>) {
) -> (String, WSAction<StatefulsetSpec, K8MetaItem>) {
let statefulset_name = format!("fluvio-spg-{}", self.key());
let k8_spec = k8_convert::generate_k8_stateful(
&self.spec,
Expand Down Expand Up @@ -154,7 +154,7 @@ impl SpuGroupObj {
)
}

pub fn as_service(&self) -> (String, WSAction<SpgServiceSpec>) {
pub fn as_service(&self) -> (String, WSAction<SpgServiceSpec, K8MetaItem>) {
let svc_name = self.svc_name.to_owned();
let k8_service = k8_convert::generate_service(self.spec(), self.key());

Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-sc/src/k8/objects/spu_k8_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::fmt;

use anyhow::Result;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

use fluvio_controlplane_metadata::core::MetadataContext;
Expand All @@ -21,7 +21,7 @@ use crate::dispatcher::core::{Spec, Status};
const CONFIG_MAP_NAME: &str = "spu-k8";

// this is same struct as in helm config
#[derive(Deserialize, Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PodConfig {
#[serde(default)]
Expand All @@ -39,7 +39,7 @@ pub struct PodConfig {
pub extra_volumes: Vec<VolumeSpec>,
}

#[derive(Debug, Eq, PartialEq, Default, Clone)]
#[derive(Debug, Eq, PartialEq, Default, Clone, Serialize, Deserialize)]
pub struct ScK8Config {
pub image: String,
pub pod_security_context: Option<PodSecurityContext>,
Expand Down Expand Up @@ -156,7 +156,7 @@ impl From<ScK8Config> for ConfigMapSpec {
}
}

#[derive(Deserialize, Debug, Eq, PartialEq, Default, Clone)]
#[derive(Debug, Eq, PartialEq, Default, Clone, Serialize, Deserialize)]
pub struct FluvioConfigStatus();

impl Status for FluvioConfigStatus {}
Expand Down
10 changes: 7 additions & 3 deletions crates/fluvio-stream-dispatcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ name = "fluvio_stream_dispatcher"
path = "src/lib.rs"

[features]
k8 = ["fluvio-stream-model/k8", "k8-client"]
local = ["fluvio-stream-model/use_serde", "fluvio-stream-model/k8", "serde_yaml"]
k8 = ["fluvio-stream-model/k8", "k8-client", "serde_json"]

[dependencies]
anyhow = { workspace = true }
Expand All @@ -23,16 +24,19 @@ event-listener = { workspace = true }
futures-util = { workspace = true, features = ["alloc"] }
once_cell = { workspace = true }
serde = { workspace = true, features = ['derive'] }
serde_json = { workspace = true }
serde_json = { workspace = true, optional = true }
serde_yaml = { workspace = true, optional = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
tracing = { workspace = true }
tempfile = { workspace = true }

# Fluvio dependencies
fluvio-types = { workspace = true }
fluvio-stream-model = { workspace = true, features = [ "k8"] }
fluvio-stream-model = { workspace = true }
k8-client = { workspace = true, optional = true, features = ["memory_client"] }
fluvio-future = { workspace = true, features = ["task", "timer"] }

[dev-dependencies]
fluvio-future = { workspace = true, features = ["fixture"] }
fluvio-stream-model = { workspace = true, features = ["fixture"] }
3 changes: 1 addition & 2 deletions crates/fluvio-stream-dispatcher/src/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use std::fmt::Display;

use crate::core::*;
use crate::store::*;
use crate::store::k8::K8MetaItem;

#[derive(PartialEq, Clone)]
pub enum WSAction<S, MetaContext = K8MetaItem>
pub enum WSAction<S, MetaContext>
where
S: Spec + PartialEq,
MetaContext: MetadataItem,
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-stream-dispatcher/src/dispatcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(feature = "k8")]
mod metadata;

#[cfg(feature = "k8")]
pub use metadata::*;

pub mod memory {
Expand Down
2 changes: 2 additions & 0 deletions crates/fluvio-stream-dispatcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod dispatcher;
pub mod actions;

mod error;
#[cfg(feature = "k8")]
pub mod metadata;

pub use error::StoreError;
Expand All @@ -11,4 +12,5 @@ pub mod core {
pub use fluvio_stream_model::core::*;
}

#[cfg(feature = "k8")]
pub use fluvio_stream_model::k8_types;
Loading

0 comments on commit 09a0a17

Please sign in to comment.