From 3ec2e7fe0d570e5d387d576458a94c6727760f2e Mon Sep 17 00:00:00 2001 From: "ayush.jain@juspay.in" Date: Wed, 22 Jan 2025 23:44:21 +0530 Subject: [PATCH] refactor: OrgId, WorkspaceId, SchemaName cleanup and refactor --- .env.example | 1 - .../src/api/audit_log/handlers.rs | 6 +- .../src/api/config/handlers.rs | 85 ++++----- .../src/api/context/handlers.rs | 96 +++++----- .../src/api/context/helpers.rs | 50 +++--- .../src/api/context/operations.rs | 38 ++-- .../src/api/context/validations.rs | 7 +- .../src/api/default_config/handlers.rs | 72 ++++---- .../src/api/dimension/handlers.rs | 53 +++--- .../src/api/dimension/utils.rs | 21 ++- .../src/api/functions/handlers.rs | 40 ++--- .../src/api/functions/helpers.rs | 14 +- .../src/api/type_templates/handlers.rs | 30 ++-- crates/context_aware_config/src/helpers.rs | 27 ++- .../src/api/experiments/handlers.rs | 163 +++++++---------- .../src/api/experiments/helpers.rs | 35 ++-- crates/service_utils/src/helpers.rs | 11 +- .../service_utils/src/middlewares/tenant.rs | 162 +++++++++-------- crates/service_utils/src/service/types.rs | 170 ++++++------------ crates/superposition/src/app_state.rs | 3 - .../superposition/src/auth/authenticator.rs | 41 +---- crates/superposition/src/auth/oidc.rs | 8 +- crates/superposition_types/src/webhook.rs | 7 +- docs/setup.md | 1 - 24 files changed, 515 insertions(+), 626 deletions(-) diff --git a/.env.example b/.env.example index 200b1affc..d7188edab 100644 --- a/.env.example +++ b/.env.example @@ -18,7 +18,6 @@ SUPERPOSITION_VERSION="v0.1.0" HOSTNAME="---" ACTIX_KEEP_ALIVE=120 MAX_DB_CONNECTION_POOL_SIZE=3 -ENABLE_TENANT_AND_SCOPE=true TENANTS=dev,test,superposition TENANT_MIDDLEWARE_EXCLUSION_LIST="/health,/assets/favicon.ico,/pkg/frontend.js,/pkg,/pkg/frontend_bg.wasm,/pkg/tailwind.css,/pkg/style.css,/assets,/admin,/oidc/login,/admin/organisations,/organisations,/organisations/switch/{organisation_id},/" SERVICE_PREFIX="" diff --git a/crates/context_aware_config/src/api/audit_log/handlers.rs b/crates/context_aware_config/src/api/audit_log/handlers.rs index 180bac97e..565e68e97 100644 --- a/crates/context_aware_config/src/api/audit_log/handlers.rs +++ b/crates/context_aware_config/src/api/audit_log/handlers.rs @@ -5,7 +5,7 @@ use actix_web::{ }; use chrono::{Duration, Utc}; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; -use service_utils::service::types::{DbConnection, Tenant}; +use service_utils::service::types::{DbConnection, SchemaName}; use superposition_types::{ database::{models::cac::EventLog, schema::event_log::dsl as event_log}, result as superposition, PaginatedResponse, @@ -21,12 +21,12 @@ pub fn endpoints() -> Scope { async fn get_audit_logs( filters: Query, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; let query_builder = |filters: &AuditQueryFilters| { - let mut builder = event_log::event_log.schema_name(&tenant).into_boxed(); + let mut builder = event_log::event_log.schema_name(&schema_name).into_boxed(); if let Some(tables) = filters.table.clone() { builder = builder.filter(event_log::table_name.eq_any(tables.0)); } diff --git a/crates/context_aware_config/src/api/config/handlers.rs b/crates/context_aware_config/src/api/config/handlers.rs index 669420c6e..b9455dec3 100644 --- a/crates/context_aware_config/src/api/config/handlers.rs +++ b/crates/context_aware_config/src/api/config/handlers.rs @@ -21,10 +21,9 @@ use itertools::Itertools; use serde_json::{json, Map, Value}; #[cfg(feature = "high-performance-mode")] use service_utils::service::types::AppState; -use service_utils::service::types::Tenant; use service_utils::{ helpers::extract_dimensions, - service::types::{AppHeader, DbConnection}, + service::types::{AppHeader, DbConnection, SchemaName}, }; #[cfg(feature = "high-performance-mode")] use superposition_macros::response_error; @@ -81,13 +80,13 @@ fn validate_version_in_params( pub fn add_audit_id_to_header( conn: &mut DBConnection, resp_builder: &mut HttpResponseBuilder, - tenant: &Tenant, + schema_name: &SchemaName, ) { if let Ok(uuid) = event_log::event_log .select(event_log::id) .filter(event_log::table_name.eq("contexts")) .order_by(event_log::timestamp.desc()) - .schema_name(tenant) + .schema_name(schema_name) .first::(conn) { resp_builder.insert_header((AppHeader::XAuditId.to_string(), uuid.to_string())); @@ -126,12 +125,12 @@ fn add_config_version_to_header( fn get_max_created_at( conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> Result { event_log::event_log .select(max(event_log::timestamp)) .filter(event_log::table_name.eq_any(vec!["contexts", "default_configs"])) - .schema_name(tenant) + .schema_name(schema_name) .first::>(conn) .and_then(|res| res.ok_or(diesel::result::Error::NotFound)) } @@ -156,14 +155,14 @@ fn is_not_modified(max_created_at: Option, req: &HttpRequest) -> pub fn generate_config_from_version( version: &mut Option, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { if let Some(val) = version { let val = val.clone(); let config = config_versions::config_versions .select(config_versions::config) .filter(config_versions::id.eq(val)) - .schema_name(tenant) + .schema_name(schema_name) .get_result::(conn) .map_err(|err| { log::error!("failed to fetch config with error: {}", err); @@ -177,19 +176,19 @@ pub fn generate_config_from_version( match config_versions::config_versions .select((config_versions::id, config_versions::config)) .order(config_versions::created_at.desc()) - .schema_name(tenant) + .schema_name(schema_name) .first::<(i64, Value)>(conn) { Ok((latest_version, config)) => { *version = Some(latest_version); serde_json::from_value::(config).or_else(|err| { log::error!("failed to decode config: {}", err); - generate_cac(conn, tenant) + generate_cac(conn, schema_name) }) } Err(err) => { log::error!("failed to find latest config: {err}"); - generate_cac(conn, tenant) + generate_cac(conn, schema_name) } } } @@ -410,7 +409,7 @@ fn construct_new_payload( #[allow(clippy::too_many_arguments)] async fn reduce_config_key( - user: User, + user: &User, conn: &mut DBConnection, mut og_contexts: Vec, mut og_overrides: HashMap, @@ -418,7 +417,7 @@ async fn reduce_config_key( dimension_schema_map: &HashMap, default_config: Map, is_approve: bool, - tenant: Tenant, + schema_name: &SchemaName, ) -> superposition::Result { let default_config_val = default_config @@ -489,15 +488,21 @@ async fn reduce_config_key( if *to_be_deleted { if is_approve { - let _ = context::delete(cid.clone(), user.clone(), conn, &tenant); + let _ = context::delete(cid.clone(), user, conn, schema_name); } og_contexts.retain(|x| x.id != *cid); } else { if is_approve { - let _ = context::delete(cid.clone(), user.clone(), conn, &tenant); + let _ = context::delete(cid.clone(), user, conn, schema_name); if let Ok(put_req) = construct_new_payload(request_payload) { - let _ = - context::put(put_req, conn, false, &user, &tenant, false); + let _ = context::put( + put_req, + conn, + false, + &user, + schema_name, + false, + ); } } @@ -541,7 +546,7 @@ async fn reduce_config( req: HttpRequest, user: User, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let is_approve = req @@ -550,16 +555,16 @@ async fn reduce_config( .and_then(|value| value.to_str().ok().and_then(|s| s.parse::().ok())) .unwrap_or(false); - let dimensions_vec = get_dimension_data(&mut conn, &tenant)?; + let dimensions_vec = get_dimension_data(&mut conn, &schema_name)?; let dimensions_data_map = get_dimension_data_map(&dimensions_vec)?; - let mut config = generate_cac(&mut conn, &tenant)?; + let mut config = generate_cac(&mut conn, &schema_name)?; let default_config = (config.default_configs).clone(); for (key, _) in default_config { let contexts = config.contexts; let overrides = config.overrides; let default_config = config.default_configs; config = reduce_config_key( - user.clone(), + &user, &mut conn, contexts.clone(), overrides.clone(), @@ -567,11 +572,11 @@ async fn reduce_config( &dimensions_data_map, default_config.clone(), is_approve, - tenant.clone(), + &schema_name, ) .await?; if is_approve { - config = generate_cac(&mut conn, &tenant)?; + config = generate_cac(&mut conn, &schema_name)?; } } @@ -581,16 +586,16 @@ async fn reduce_config( #[cfg(feature = "high-performance-mode")] #[get("/fast")] async fn get_config_fast( - tenant: Tenant, + schema_name: SchemaName, state: Data, ) -> superposition::Result { use fred::interfaces::MetricsInterface; log::debug!("Started redis fetch"); - let config_key = format!("{}::cac_config", *tenant); - let last_modified_at_key = format!("{}::cac_config::last_modified_at", *tenant); - let audit_id_key = format!("{}::cac_config::audit_id", *tenant); - let config_version_key = format!("{}::cac_config::config_version", *tenant); + let config_key = format!("{}::cac_config", *schema_name); + let last_modified_at_key = format!("{}::cac_config::last_modified_at", *schema_name); + let audit_id_key = format!("{}::cac_config::audit_id", *schema_name); + let config_version_key = format!("{}::cac_config::config_version", *schema_name); let client = state.redis.next_connected(); let config = client.get::(config_key).await; let metrics = client.take_latency_metrics(); @@ -672,11 +677,11 @@ async fn get_config( req: HttpRequest, db_conn: DbConnection, query_map: superposition_query::Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; - let max_created_at = get_max_created_at(&mut conn, &tenant) + let max_created_at = get_max_created_at(&mut conn, &schema_name) .map_err(|e| log::error!("failed to fetch max timestamp from event_log: {e}")) .ok(); @@ -691,7 +696,7 @@ async fn get_config( let mut query_params_map = query_map.into_inner(); let mut config_version = validate_version_in_params(&mut query_params_map)?; let mut config = - generate_config_from_version(&mut config_version, &mut conn, &tenant)?; + generate_config_from_version(&mut config_version, &mut conn, &schema_name)?; config = apply_prefix_filter_to_config(&mut query_params_map, config)?; @@ -701,7 +706,7 @@ async fn get_config( let mut response = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, &mut response); - add_audit_id_to_header(&mut conn, &mut response, &tenant); + add_audit_id_to_header(&mut conn, &mut response, &schema_name); add_config_version_to_header(&config_version, &mut response); Ok(response.json(config)) } @@ -711,12 +716,12 @@ async fn get_resolved_config( req: HttpRequest, db_conn: DbConnection, query_map: superposition_query::Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let mut query_params_map = query_map.into_inner(); - let max_created_at = get_max_created_at(&mut conn, &tenant) + let max_created_at = get_max_created_at(&mut conn, &schema_name) .map_err(|e| log::error!("failed to fetch max timestamp from event_log : {e}")) .ok(); @@ -728,7 +733,7 @@ async fn get_resolved_config( let mut config_version = validate_version_in_params(&mut query_params_map)?; let mut config = - generate_config_from_version(&mut config_version, &mut conn, &tenant)?; + generate_config_from_version(&mut config_version, &mut conn, &schema_name)?; config = apply_prefix_filter_to_config(&mut query_params_map, config)?; @@ -772,7 +777,7 @@ async fn get_resolved_config( }; let mut resp = HttpResponse::Ok(); add_last_modified_to_header(max_created_at, &mut resp); - add_audit_id_to_header(&mut conn, &mut resp, &tenant); + add_audit_id_to_header(&mut conn, &mut resp, &schema_name); add_config_version_to_header(&config_version, &mut resp); Ok(resp.json(response)) @@ -782,13 +787,13 @@ async fn get_resolved_config( async fn get_config_versions( db_conn: DbConnection, filters: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; if let Some(true) = filters.all { let config_versions: Vec = config_versions::config_versions - .schema_name(&tenant) + .schema_name(&schema_name) .get_results(&mut conn)?; return Ok(Json(PaginatedResponse { total_pages: 1, @@ -799,12 +804,12 @@ async fn get_config_versions( let n_version: i64 = config_versions::config_versions .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; let limit = filters.count.unwrap_or(10); let mut builder = config_versions::config_versions - .schema_name(&tenant) + .schema_name(&schema_name) .into_boxed() .order(config_versions::created_at.desc()) .limit(limit); diff --git a/crates/context_aware_config/src/api/context/handlers.rs b/crates/context_aware_config/src/api/context/handlers.rs index d0f3d4053..1bba909c6 100644 --- a/crates/context_aware_config/src/api/context/handlers.rs +++ b/crates/context_aware_config/src/api/context/handlers.rs @@ -30,10 +30,9 @@ use chrono::Utc; use diesel::SelectableHelper; use diesel::{delete, Connection, ExpressionMethods, QueryDsl, RunQueryDsl}; use serde_json::{Map, Value}; -use service_utils::service::types::Tenant; use service_utils::{ helpers::parse_config_tags, - service::types::{AppHeader, AppState, CustomHeaders, DbConnection}, + service::types::{AppHeader, AppState, CustomHeaders, DbConnection, SchemaName}, }; use superposition_macros::{bad_argument, db_error, unexpected_error}; use superposition_types::{ @@ -65,7 +64,7 @@ async fn put_handler( req: Json, mut db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let tags = parse_config_tags(custom_headers.config_tags)?; @@ -78,7 +77,7 @@ async fn put_handler( req_mut.description = Some(ensure_description( Value::Object(req_mut.context.clone().into_inner().into()), transaction_conn, - &tenant, + &schema_name, )?); } let put_response = operations::put( @@ -86,7 +85,7 @@ async fn put_handler( transaction_conn, true, &user, - &tenant, + &schema_name, false, ) .map_err(|err: superposition::AppError| { @@ -102,7 +101,7 @@ async fn put_handler( description, change_reason, transaction_conn, - &tenant, + &schema_name, )?; Ok((put_response, version_id)) })?; @@ -116,7 +115,7 @@ async fn put_handler( cfg_if::cfg_if! { if #[cfg(feature = "high-performance-mode")] { let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; } } Ok(http_resp.json(put_response)) @@ -129,7 +128,7 @@ async fn update_override_handler( req: Json, mut db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let tags = parse_config_tags(custom_headers.config_tags)?; let (override_resp, version_id) = db_conn @@ -139,7 +138,7 @@ async fn update_override_handler( req_mut.description = Some(ensure_description( Value::Object(req_mut.context.clone().into_inner().into()), transaction_conn, - &tenant, + &schema_name, )?); } let override_resp = operations::put( @@ -147,7 +146,7 @@ async fn update_override_handler( transaction_conn, true, &user, - &tenant, + &schema_name, true, ) .map_err(|err: superposition::AppError| { @@ -160,7 +159,7 @@ async fn update_override_handler( req_mut.description.unwrap().clone(), req_mut.change_reason.clone(), transaction_conn, - &tenant, + &schema_name, )?; Ok((override_resp, version_id)) })?; @@ -173,7 +172,7 @@ async fn update_override_handler( cfg_if::cfg_if! { if #[cfg(feature = "high-performance-mode")] { let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; } } Ok(http_resp.json(override_resp)) @@ -187,7 +186,7 @@ async fn move_handler( req: Json, mut db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let tags = parse_config_tags(custom_headers.config_tags)?; let (move_response, version_id) = db_conn @@ -198,7 +197,7 @@ async fn move_handler( transaction_conn, true, &user, - &tenant, + &schema_name, ) .map_err(|err| { log::info!("move api failed with error: {:?}", err); @@ -210,7 +209,7 @@ async fn move_handler( move_response.description.clone(), move_response.change_reason.clone(), transaction_conn, - &tenant, + &schema_name, )?; Ok((move_response, version_id)) @@ -224,7 +223,7 @@ async fn move_handler( cfg_if::cfg_if! { if #[cfg(feature = "high-performance-mode")] { let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; } } Ok(http_resp.json(move_response)) @@ -234,7 +233,7 @@ async fn move_handler( async fn get_context_from_condition( db_conn: DbConnection, req: Json>, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { use superposition_types::database::schema::contexts::dsl::*; @@ -243,7 +242,7 @@ async fn get_context_from_condition( let ctx: Context = contexts .filter(id.eq(context_id)) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; Ok(Json(ctx)) @@ -253,7 +252,7 @@ async fn get_context_from_condition( async fn get_context( path: Path, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { use superposition_types::database::schema::contexts::dsl::*; @@ -262,7 +261,7 @@ async fn get_context( let ctx: Context = contexts .filter(id.eq(ctx_id)) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; Ok(Json(ctx)) @@ -273,7 +272,7 @@ async fn list_contexts( filter_params: superposition_query::Query, dimension_params: DimensionQuery, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { use superposition_types::database::schema::contexts::dsl::*; let DbConnection(mut conn) = db_conn; @@ -289,7 +288,7 @@ async fn list_contexts( } let dimension_params = dimension_params.into_inner(); - let builder = contexts.schema_name(&tenant).into_boxed(); + let builder = contexts.schema_name(&schema_name).into_boxed(); #[rustfmt::skip] let mut builder = match (filter_params.sort_on.unwrap_or_default(), filter_params.sort_by.unwrap_or(SortBy::Asc)) { @@ -335,7 +334,7 @@ async fn list_contexts( (data, total_items as i64) } else { - let mut total_count_builder = contexts.schema_name(&tenant).into_boxed(); + let mut total_count_builder = contexts.schema_name(&schema_name).into_boxed(); if let Some(created_bys) = filter_params.created_by { total_count_builder = total_count_builder.filter(created_by.eq_any(created_bys.0)) @@ -362,7 +361,7 @@ async fn delete_context_handler( path: Path, custom_headers: CustomHeaders, user: User, - tenant: Tenant, + schema_name: SchemaName, mut db_conn: DbConnection, ) -> superposition::Result { use superposition_types::database::schema::contexts::dsl::{ @@ -374,9 +373,9 @@ async fn delete_context_handler( db_conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { let context = contexts_table .filter(context_id.eq(ctx_id.clone())) - .schema_name(&tenant) + .schema_name(&schema_name) .first::(transaction_conn)?; - operations::delete(ctx_id.clone(), user.clone(), transaction_conn, &tenant)?; + operations::delete(ctx_id.clone(), &user, transaction_conn, &schema_name)?; let description = context.description; let change_reason = format!("Deleted context by {}", user.username); let version_id = add_config_version( @@ -385,14 +384,14 @@ async fn delete_context_handler( description, change_reason, transaction_conn, - &tenant, + &schema_name, )?; Ok(version_id) })?; cfg_if::cfg_if! { if #[cfg(feature = "high-performance-mode")] { let DbConnection(mut conn) = db_conn; - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; } } Ok(HttpResponse::NoContent() @@ -410,7 +409,7 @@ async fn bulk_operations( reqs: Json>, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { use contexts::dsl::contexts; let DbConnection(mut conn) = db_conn; @@ -429,7 +428,7 @@ async fn bulk_operations( transaction_conn, true, &user, - &tenant, + &schema_name, false, ) .map_err(|err| { @@ -448,7 +447,7 @@ async fn bulk_operations( ensure_description( ctx_condition_value.clone(), transaction_conn, - &tenant, + &schema_name, )? } else { put_req @@ -462,12 +461,12 @@ async fn bulk_operations( ContextAction::Delete(ctx_id) => { let context: Context = contexts .filter(id.eq(&ctx_id)) - .schema_name(&tenant) + .schema_name(&schema_name) .first::(transaction_conn)?; let deleted_row = delete(contexts) .filter(id.eq(&ctx_id)) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn); let description = context.description; @@ -505,7 +504,7 @@ async fn bulk_operations( transaction_conn, true, &user, - &tenant, + &schema_name, ) .map_err(|err| { log::error!( @@ -531,7 +530,7 @@ async fn bulk_operations( combined_description, combined_change_reasons, transaction_conn, - &tenant, + &schema_name, )?; Ok((response, version_id)) })?; @@ -543,7 +542,7 @@ async fn bulk_operations( // Commit the transaction #[cfg(feature = "high-performance-mode")] - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; Ok(http_resp.json(response)) } @@ -552,7 +551,7 @@ async fn weight_recompute( state: Data, custom_headers: CustomHeaders, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, user: User, ) -> superposition::Result { use superposition_types::database::schema::contexts::dsl::{ @@ -560,16 +559,15 @@ async fn weight_recompute( }; let DbConnection(mut conn) = db_conn; - let result: Vec = - contexts - .schema_name(&tenant) - .load(&mut conn) - .map_err(|err| { - log::error!("failed to fetch contexts with error: {}", err); - unexpected_error!("Something went wrong") - })?; + let result: Vec = contexts + .schema_name(&schema_name) + .load(&mut conn) + .map_err(|err| { + log::error!("failed to fetch contexts with error: {}", err); + unexpected_error!("Something went wrong") + })?; - let dimension_data = get_dimension_data(&mut conn, &tenant)?; + let dimension_data = get_dimension_data(&mut conn, &schema_name)?; let dimension_data_map = get_dimension_data_map(&dimension_data)?; let mut response: Vec = vec![]; let tags = parse_config_tags(custom_headers.config_tags)?; @@ -612,7 +610,7 @@ async fn weight_recompute( last_modified_at.eq(last_modified_time.clone()), last_modified_by.eq(user.get_email()) )) - .schema_name(&tenant) + .schema_name(&schema_name) .returning(Context::as_returning()) .execute(transaction_conn).map_err(|err| { log::error!( @@ -623,11 +621,11 @@ async fn weight_recompute( } let description = "Recomputed weight".to_string(); let change_reason = "Recomputed weight".to_string(); - let version_id = add_config_version(&state, tags, description, change_reason, transaction_conn, &tenant)?; + let version_id = add_config_version(&state, tags, description, change_reason, transaction_conn, &schema_name)?; Ok(version_id) })?; #[cfg(feature = "high-performance-mode")] - put_config_in_redis(config_version_id, state, tenant, &mut conn).await?; + put_config_in_redis(config_version_id, state, &schema_name, &mut conn).await?; let mut http_resp = HttpResponse::Ok(); http_resp.insert_header(( diff --git a/crates/context_aware_config/src/api/context/helpers.rs b/crates/context_aware_config/src/api/context/helpers.rs index db6ec595b..cd7f1bc9f 100644 --- a/crates/context_aware_config/src/api/context/helpers.rs +++ b/crates/context_aware_config/src/api/context/helpers.rs @@ -9,7 +9,7 @@ use cac_client::utils::json_to_sorted_string; use chrono::Utc; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; use serde_json::{json, Map, Value}; -use service_utils::{helpers::extract_dimensions, service::types::Tenant}; +use service_utils::{helpers::extract_dimensions, service::types::SchemaName}; use superposition_macros::{bad_argument, unexpected_error, validation_error}; use superposition_types::{ database::{ @@ -63,7 +63,7 @@ pub fn validate_condition_with_mandatory_dimensions( pub fn validate_condition_with_functions( conn: &mut DBConnection, context: &Condition, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result<()> { use dimensions::dsl; let context = extract_dimensions(context)?; @@ -71,7 +71,7 @@ pub fn validate_condition_with_functions( let keys_function_array: Vec<(String, Option)> = dsl::dimensions .filter(dsl::dimension.eq_any(dimensions_list)) .select((dsl::dimension, dsl::function_name)) - .schema_name(tenant) + .schema_name(schema_name) .load(conn)?; let new_keys_function_array: Vec<(String, String)> = keys_function_array .into_iter() @@ -79,7 +79,7 @@ pub fn validate_condition_with_functions( .collect(); let dimension_functions_map = - get_functions_map(conn, new_keys_function_array, tenant)?; + get_functions_map(conn, new_keys_function_array, schema_name)?; for (key, value) in context.iter() { if let Some(functions_map) = dimension_functions_map.get(key) { if let (function_name, Some(function_code)) = @@ -95,13 +95,13 @@ pub fn validate_condition_with_functions( pub fn validate_override_with_functions( conn: &mut DBConnection, override_: &Map, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result<()> { let default_config_keys: Vec = override_.keys().cloned().collect(); let keys_function_array: Vec<(String, Option)> = dsl::default_configs .filter(dsl::key.eq_any(default_config_keys)) .select((dsl::key, dsl::function_name)) - .schema_name(tenant) + .schema_name(schema_name) .load(conn)?; let new_keys_function_array: Vec<(String, String)> = keys_function_array .into_iter() @@ -109,7 +109,7 @@ pub fn validate_override_with_functions( .collect(); let default_config_functions_map = - get_functions_map(conn, new_keys_function_array, tenant)?; + get_functions_map(conn, new_keys_function_array, schema_name)?; for (key, value) in override_.iter() { if let Some(functions_map) = default_config_functions_map.get(key) { if let (function_name, Some(function_code)) = @@ -125,7 +125,7 @@ pub fn validate_override_with_functions( fn get_functions_map( conn: &mut DBConnection, keys_function_array: Vec<(String, String)>, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result> { let functions_map: HashMap> = get_published_functions_by_names( @@ -134,7 +134,7 @@ fn get_functions_map( .iter() .map(|(_, f_name)| f_name.clone()) .collect(), - tenant, + schema_name, )? .into_iter() .collect(); @@ -185,7 +185,7 @@ pub fn validate_value_with_function( pub fn ensure_description( context: Value, transaction_conn: &mut diesel::PgConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> Result { use superposition_types::database::schema::contexts::dsl::{ contexts as contexts_table, id as context_id, @@ -196,7 +196,7 @@ pub fn ensure_description( // Perform the database query let existing_context = contexts_table .filter(context_id.eq(context_id_value)) - .schema_name(tenant) + .schema_name(schema_name) .first::(transaction_conn); match existing_context { @@ -215,7 +215,7 @@ pub fn create_ctx_from_put_req( req: Json, conn: &mut DBConnection, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { let ctx_condition = req.context.to_owned().into_inner(); let condition_val = Value::Object(ctx_condition.clone().into()); @@ -223,25 +223,25 @@ pub fn create_ctx_from_put_req( let ctx_override = Value::Object(r_override.clone().into()); let description = if req.description.is_none() { let ctx_condition_value = json!(ctx_condition); - ensure_description(ctx_condition_value, conn, &tenant)? + ensure_description(ctx_condition_value, conn, schema_name)? } else { req.description .clone() .ok_or_else(|| bad_argument!("Description should not be empty"))? }; - let workspace_settings = get_workspace(&tenant, conn)?; + let workspace_settings = get_workspace(schema_name, conn)?; let change_reason = req.change_reason.clone(); validate_condition_with_mandatory_dimensions( &ctx_condition, &workspace_settings.mandatory_dimensions.unwrap_or_default(), )?; - validate_override_with_default_configs(conn, &r_override, tenant)?; - validate_condition_with_functions(conn, &ctx_condition, tenant)?; - validate_override_with_functions(conn, &r_override, tenant)?; + validate_override_with_default_configs(conn, &r_override, schema_name)?; + validate_condition_with_functions(conn, &ctx_condition, schema_name)?; + validate_override_with_functions(conn, &r_override, schema_name)?; - let dimension_data = get_dimension_data(conn, tenant)?; + let dimension_data = get_dimension_data(conn, schema_name)?; let dimension_data_map = get_dimension_data_map(&dimension_data)?; validate_dimensions("context", &condition_val, &dimension_data_map)?; @@ -269,7 +269,7 @@ fn db_update_override( conn: &mut DBConnection, ctx: Context, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { use contexts::dsl; let update_resp = diesel::update(dsl::contexts) @@ -283,7 +283,7 @@ fn db_update_override( dsl::change_reason.eq(ctx.change_reason), )) .returning(Context::as_returning()) - .schema_name(tenant) + .schema_name(schema_name) .get_result::(conn)?; Ok(update_resp.into()) } @@ -292,7 +292,7 @@ pub fn replace_override_of_existing_ctx( conn: &mut DBConnection, ctx: Context, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { let new_override = ctx.override_; let new_override_id = hash(&Value::Object(new_override.clone().into())); @@ -301,20 +301,20 @@ pub fn replace_override_of_existing_ctx( override_id: new_override_id, ..ctx }; - db_update_override(conn, new_ctx, user, tenant) + db_update_override(conn, new_ctx, user, schema_name) } pub fn update_override_of_existing_ctx( conn: &mut DBConnection, ctx: Context, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { use contexts::dsl; let mut new_override: Value = dsl::contexts .filter(dsl::id.eq(ctx.id.clone())) .select(dsl::override_) - .schema_name(&tenant) + .schema_name(schema_name) .first(conn)?; cac_client::merge( &mut new_override, @@ -336,5 +336,5 @@ pub fn update_override_of_existing_ctx( override_id: new_override_id, ..ctx }; - db_update_override(conn, new_ctx, user, tenant) + db_update_override(conn, new_ctx, user, schema_name) } diff --git a/crates/context_aware_config/src/api/context/operations.rs b/crates/context_aware_config/src/api/context/operations.rs index 30d868d98..08378ef99 100644 --- a/crates/context_aware_config/src/api/context/operations.rs +++ b/crates/context_aware_config/src/api/context/operations.rs @@ -6,7 +6,7 @@ use diesel::{ Connection, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl, SelectableHelper, }; use serde_json::Value; -use service_utils::service::types::Tenant; +use service_utils::service::types::SchemaName; use superposition_macros::{bad_argument, db_error, not_found, unexpected_error}; use superposition_types::{ database::{models::cac::Context, schema::contexts}, @@ -38,11 +38,11 @@ pub fn put( conn: &mut PooledConnection>, already_under_txn: bool, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, replace: bool, ) -> result::Result { use contexts::dsl::contexts; - let new_ctx = create_ctx_from_put_req(req, conn, user, &tenant)?; + let new_ctx = create_ctx_from_put_req(req, conn, user, schema_name)?; if already_under_txn { diesel::sql_query("SAVEPOINT put_ctx_savepoint").execute(conn)?; @@ -50,7 +50,7 @@ pub fn put( let insert = diesel::insert_into(contexts) .values(&new_ctx) .returning(Context::as_returning()) - .schema_name(&tenant) + .schema_name(schema_name) .execute(conn); match insert { @@ -60,9 +60,9 @@ pub fn put( diesel::sql_query("ROLLBACK TO put_ctx_savepoint").execute(conn)?; } if replace { - replace_override_of_existing_ctx(conn, new_ctx, user, tenant) + replace_override_of_existing_ctx(conn, new_ctx, user, schema_name) } else { - update_override_of_existing_ctx(conn, new_ctx, user, tenant) + update_override_of_existing_ctx(conn, new_ctx, user, schema_name) } } Err(e) => { @@ -78,14 +78,14 @@ pub fn r#move( conn: &mut PooledConnection>, already_under_txn: bool, user: &User, - tenant: &Tenant, + schema_name: &SchemaName, ) -> result::Result { use contexts::dsl; let req = req.into_inner(); let ctx_condition = req.context.to_owned().into_inner(); let ctx_condition_value = Value::Object(ctx_condition.clone().into()); let description = if req.description.is_none() { - ensure_description(ctx_condition_value.clone(), conn, tenant)? + ensure_description(ctx_condition_value.clone(), conn, schema_name)? } else { req.description .ok_or_else(|| bad_argument!("Description should not be empty"))? @@ -94,13 +94,13 @@ pub fn r#move( let new_ctx_id = hash(&ctx_condition_value); - let dimension_data = get_dimension_data(conn, &tenant)?; + let dimension_data = get_dimension_data(conn, schema_name)?; let dimension_data_map = get_dimension_data_map(&dimension_data)?; validate_dimensions("context", &ctx_condition_value, &dimension_data_map)?; let weight = calculate_context_weight(&ctx_condition_value, &dimension_data_map) .map_err(|_| unexpected_error!("Something Went Wrong"))?; - let workspace_settings = get_workspace(&tenant, conn)?; + let workspace_settings = get_workspace(schema_name, conn)?; validate_condition_with_mandatory_dimensions( &req.context.into_inner(), @@ -121,7 +121,7 @@ pub fn r#move( dsl::last_modified_by.eq(user.get_email()), )) .returning(Context::as_returning()) - .schema_name(&tenant) + .schema_name(schema_name) .get_result::(conn); let contruct_new_ctx_with_old_overrides = |ctx: Context| Context { @@ -143,19 +143,19 @@ pub fn r#move( if already_under_txn { let deleted_ctxt = diesel::delete(dsl::contexts) .filter(dsl::id.eq(&old_ctx_id)) - .schema_name(&tenant) + .schema_name(schema_name) .get_result(db_conn)?; let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt); - update_override_of_existing_ctx(db_conn, ctx, user, tenant) + update_override_of_existing_ctx(db_conn, ctx, user, schema_name) } else { db_conn.transaction(|conn| { let deleted_ctxt = diesel::delete(dsl::contexts) .filter(dsl::id.eq(&old_ctx_id)) - .schema_name(&tenant) + .schema_name(schema_name) .get_result(conn)?; let ctx = contruct_new_ctx_with_old_overrides(deleted_ctxt); - update_override_of_existing_ctx(conn, ctx, user, tenant) + update_override_of_existing_ctx(conn, ctx, user, schema_name) }) } }; @@ -177,9 +177,9 @@ pub fn r#move( pub fn delete( ctx_id: String, - user: User, + user: &User, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> result::Result<()> { use contexts::dsl; diesel::update(dsl::contexts) @@ -189,10 +189,10 @@ pub fn delete( dsl::last_modified_by.eq(user.get_email()), )) .returning(Context::as_returning()) - .schema_name(&tenant) + .schema_name(schema_name) .execute(conn)?; let deleted_row = diesel::delete(dsl::contexts.filter(dsl::id.eq(&ctx_id))) - .schema_name(&tenant) + .schema_name(schema_name) .execute(conn); match deleted_row { Ok(0) => Err(not_found!("Context Id `{}` doesn't exists", ctx_id)), diff --git a/crates/context_aware_config/src/api/context/validations.rs b/crates/context_aware_config/src/api/context/validations.rs index 10233c4e3..1261e2e4b 100644 --- a/crates/context_aware_config/src/api/context/validations.rs +++ b/crates/context_aware_config/src/api/context/validations.rs @@ -3,8 +3,7 @@ use std::collections::HashMap; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::{json, Map, Value}; -use service_utils::helpers::validation_err_to_str; -use service_utils::service::types::Tenant; +use service_utils::{helpers::validation_err_to_str, service::types::SchemaName}; use superposition_macros::{bad_argument, validation_error}; use superposition_types::{database::schema, result, DBConnection}; @@ -15,7 +14,7 @@ use super::types::DimensionCondition; pub fn validate_override_with_default_configs( conn: &mut DBConnection, override_: &Map, - tenant: &Tenant, + schema_name: &SchemaName, ) -> result::Result<()> { let keys_array: Vec<&String> = override_.keys().collect(); let res: Vec<(String, Value)> = schema::default_configs::dsl::default_configs @@ -24,7 +23,7 @@ pub fn validate_override_with_default_configs( schema::default_configs::dsl::key, schema::default_configs::dsl::schema, )) - .schema_name(tenant) + .schema_name(schema_name) .get_results::<(String, Value)>(conn)?; let map = Map::from_iter(res); diff --git a/crates/context_aware_config/src/api/default_config/handlers.rs b/crates/context_aware_config/src/api/default_config/handlers.rs index ca2546800..07b7788e8 100644 --- a/crates/context_aware_config/src/api/default_config/handlers.rs +++ b/crates/context_aware_config/src/api/default_config/handlers.rs @@ -11,7 +11,7 @@ use jsonschema::{Draft, JSONSchema, ValidationError}; use serde_json::Value; use service_utils::{ helpers::{parse_config_tags, validation_err_to_str}, - service::types::{AppHeader, AppState, CustomHeaders, DbConnection, Tenant}, + service::types::{AppHeader, AppState, CustomHeaders, DbConnection, SchemaName}, }; use superposition_macros::{ bad_argument, db_error, not_found, unexpected_error, validation_error, @@ -52,7 +52,7 @@ async fn create_default_config( custom_headers: CustomHeaders, request: web::Json, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, user: User, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; @@ -112,7 +112,7 @@ async fn create_default_config( default_config.function_name.as_ref(), &default_config.key, &default_config.value, - &tenant, + &schema_name, ) { log::info!("Validation failed: {:?}", e); return Err(e); @@ -123,7 +123,7 @@ async fn create_default_config( diesel::insert_into(dsl::default_configs) .values(&default_config) .returning(DefaultConfig::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn) .map_err(|e| { log::info!("DefaultConfig creation failed with error: {e}"); @@ -137,13 +137,13 @@ async fn create_default_config( description, change_reason, transaction_conn, - &tenant, + &schema_name, )?; Ok(version_id) })?; #[cfg(feature = "high-performance-mode")] - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; let mut http_resp = HttpResponse::Ok(); http_resp.insert_header(( @@ -161,7 +161,7 @@ async fn update_default_config( custom_headers: CustomHeaders, request: web::Json, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, user: User, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; @@ -170,7 +170,7 @@ async fn update_default_config( let tags = parse_config_tags(custom_headers.config_tags)?; let existing = - fetch_default_key(&key_str, &mut conn, &tenant).map_err(|e| match e { + fetch_default_key(&key_str, &mut conn, &schema_name).map_err(|e| match e { superposition::AppError::DbError(diesel::NotFound) => { bad_argument!( "No record found for {}. Use create endpoint instead.", @@ -235,7 +235,7 @@ async fn update_default_config( updated_config.function_name.as_ref(), &updated_config.key, &updated_config.value, - &tenant, + &schema_name, ) { log::info!("Validation failed: {:?}", e); return Err(e); @@ -249,7 +249,7 @@ async fn update_default_config( .do_update() .set(&updated_config) .returning(DefaultConfig::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn) .map_err(|e| { log::info!("Update failed: {e}"); @@ -262,14 +262,14 @@ async fn update_default_config( description, change_reason, transaction_conn, - &tenant, + &schema_name, )?; Ok(version_id) })?; #[cfg(feature = "high-performance-mode")] - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; let mut http_resp = HttpResponse::Ok(); http_resp.insert_header(( @@ -284,11 +284,12 @@ fn validate_and_get_function_code( function_name: Option<&String>, key: &str, value: &Value, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result<()> { if let Some(f_name) = function_name { - let function_code = get_published_function_code(conn, f_name.clone(), &tenant) - .map_err(|_| bad_argument!("Function {} doesn't exist.", f_name))?; + let function_code = + get_published_function_code(conn, f_name.clone(), schema_name) + .map_err(|_| bad_argument!("Function {} doesn't exist.", f_name))?; if let Some(f_code) = function_code { validate_value_with_function( f_name.as_str(), @@ -304,12 +305,12 @@ fn validate_and_get_function_code( fn fetch_default_key( key: &String, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { let res = dsl::default_configs .filter(schema::default_configs::key.eq(key)) .select(models::DefaultConfig::as_select()) - .schema_name(tenant) + .schema_name(schema_name) .get_result(conn)?; Ok(res) } @@ -318,13 +319,13 @@ fn fetch_default_key( async fn get( db_conn: DbConnection, filters: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; if let Some(true) = filters.all { let result: Vec = dsl::default_configs - .schema_name(&tenant) + .schema_name(&schema_name) .get_results(&mut conn)?; return Ok(Json(PaginatedResponse { total_pages: 1, @@ -335,13 +336,13 @@ async fn get( let n_default_configs: i64 = dsl::default_configs .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; let limit = filters.count.unwrap_or(10); let mut builder = dsl::default_configs .order(dsl::created_at.desc()) .limit(limit) - .schema_name(&tenant) + .schema_name(&schema_name) .into_boxed(); if let Some(page) = filters.page { let offset = (page - 1) * limit; @@ -359,13 +360,16 @@ async fn get( pub fn get_key_usage_context_ids( key: &str, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result> { let result: Vec = - contexts.schema_name(tenant).load(conn).map_err(|err| { - log::error!("failed to fetch contexts with error: {}", err); - db_error!(err) - })?; + contexts + .schema_name(schema_name) + .load(conn) + .map_err(|err| { + log::error!("failed to fetch contexts with error: {}", err); + db_error!(err) + })?; let mut context_ids = vec![]; for context in result.iter() { @@ -383,7 +387,7 @@ async fn delete( path: Path, custom_headers: CustomHeaders, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, user: User, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; @@ -392,9 +396,9 @@ async fn delete( let key: String = path.into_inner().into(); let mut version_id = 0; - fetch_default_key(&key, &mut conn, &tenant)?; + fetch_default_key(&key, &mut conn, &schema_name)?; - let context_ids = get_key_usage_context_ids(&key, &mut conn, &tenant) + let context_ids = get_key_usage_context_ids(&key, &mut conn, &schema_name) .map_err(|_| unexpected_error!("Something went wrong"))?; if context_ids.is_empty() { let resp = @@ -406,19 +410,19 @@ async fn delete( dsl::last_modified_by.eq(user.get_email()), )) .returning(DefaultConfig::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)?; let default_config: DefaultConfig = dsl::default_configs .filter(dsl::key.eq(&key)) - .schema_name(&tenant) + .schema_name(&schema_name) .first::(transaction_conn)?; let description = default_config.description; let change_reason = format!("Context Deleted by {}", user.get_email()); let deleted_row = diesel::delete(dsl::default_configs.filter(dsl::key.eq(&key))) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn); match deleted_row { Ok(0) => { @@ -431,7 +435,7 @@ async fn delete( description, change_reason, transaction_conn, - &tenant, + &schema_name, )?; log::info!( "default config key: {key} deleted by {}", @@ -451,7 +455,7 @@ async fn delete( } }); #[cfg(feature = "high-performance-mode")] - put_config_in_redis(version_id, state, tenant, &mut conn).await?; + put_config_in_redis(version_id, state, &schema_name, &mut conn).await?; resp } else { Err(bad_argument!( diff --git a/crates/context_aware_config/src/api/dimension/handlers.rs b/crates/context_aware_config/src/api/dimension/handlers.rs index 53d0ffe54..44bd5634d 100644 --- a/crates/context_aware_config/src/api/dimension/handlers.rs +++ b/crates/context_aware_config/src/api/dimension/handlers.rs @@ -9,7 +9,7 @@ use chrono::Utc; use diesel::{ delete, Connection, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper, }; -use service_utils::service::types::{AppState, DbConnection, Tenant}; +use service_utils::service::types::{AppState, DbConnection, SchemaName}; use superposition_macros::{bad_argument, db_error, not_found, unexpected_error}; use superposition_types::{ custom_query::PaginationParams, @@ -45,7 +45,7 @@ async fn create( req: web::Json, user: User, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; @@ -54,7 +54,7 @@ async fn create( let num_rows = dimensions .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn) .map_err(|err| { log::error!("failed to fetch number of dimension with error: {}", err); @@ -90,18 +90,18 @@ async fn create( dimensions::position.eq(dimensions::position + 1), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)?; let insert_resp = diesel::insert_into(dimensions::table) .values(&dimension_data) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(transaction_conn); match insert_resp { Ok(inserted_dimension) => { let workspace_settings: Workspace = - get_workspace(&tenant, transaction_conn)?; + get_workspace(&schema_name, transaction_conn)?; let is_mandatory = workspace_settings .mandatory_dimensions .unwrap_or_default() @@ -137,7 +137,7 @@ async fn update( req: web::Json, user: User, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let name: String = path.clone().into(); use dimensions::dsl; @@ -145,12 +145,12 @@ async fn update( let mut dimension_row: Dimension = dsl::dimensions .filter(dimensions::dimension.eq(name.clone())) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; let num_rows = dimensions .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn) .map_err(|err| { log::error!("failed to fetch number of dimension with error: {}", err); @@ -196,7 +196,7 @@ async fn update( dimensions::position.eq((num_rows + 100) as i32), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(transaction_conn)?; if previous_position < new_position { @@ -209,7 +209,7 @@ async fn update( dimensions::position.eq(dimensions::position - 1), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)? } else { diesel::update(dsl::dimensions) @@ -221,7 +221,7 @@ async fn update( dimensions::position.eq(dimensions::position + 1), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)? }; } @@ -238,12 +238,12 @@ async fn update( dimensions::change_reason.eq(dimension_row.change_reason), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(transaction_conn) .map_err(|err| db_error!(err)) })?; - let workspace_settings = get_workspace(&tenant, &mut conn)?; + let workspace_settings = get_workspace(&schema_name, &mut conn)?; let is_mandatory = workspace_settings .mandatory_dimensions .unwrap_or_default() @@ -256,24 +256,25 @@ async fn update( async fn get( db_conn: DbConnection, filters: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; let (total_pages, total_items, result) = match filters.all { Some(true) => { - let result: Vec = - dimensions.schema_name(&tenant).get_results(&mut conn)?; + let result: Vec = dimensions + .schema_name(&schema_name) + .get_results(&mut conn)?; (1, result.len() as i64, result) } _ => { let n_dimensions: i64 = dimensions .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; let limit = filters.count.unwrap_or(10); let mut builder = dimensions - .schema_name(&tenant) + .schema_name(&schema_name) .order(created_at.desc()) .limit(limit) .into_boxed(); @@ -287,7 +288,7 @@ async fn get( } }; - let workspace_settings = get_workspace(&tenant, &mut conn)?; + let workspace_settings = get_workspace(&schema_name, &mut conn)?; let mandatory_dimensions = workspace_settings.mandatory_dimensions.unwrap_or_default(); @@ -312,16 +313,16 @@ async fn delete_dimension( path: Path, user: User, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let name: String = path.into_inner().into(); let DbConnection(mut conn) = db_conn; let dimension_data: Dimension = dimensions::dsl::dimensions .filter(dimensions::dimension.eq(&name)) .select(Dimension::as_select()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; - let context_ids = get_dimension_usage_context_ids(&name, &mut conn, &tenant) + let context_ids = get_dimension_usage_context_ids(&name, &mut conn, &schema_name) .map_err(|_| unexpected_error!("Something went wrong"))?; if context_ids.is_empty() { conn.transaction::<_, superposition::AppError, _>(|transaction_conn| { @@ -333,16 +334,16 @@ async fn delete_dimension( dsl::last_modified_by.eq(user.get_email()), )) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)?; diesel::update(dimensions::dsl::dimensions) .filter(dimensions::position.gt(dimension_data.position)) .set(dimensions::position.eq(dimensions::position - 1)) .returning(Dimension::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn)?; let deleted_row = delete(dsl::dimensions.filter(dsl::dimension.eq(&name))) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(transaction_conn); match deleted_row { Ok(0) => Err(not_found!("Dimension `{}` doesn't exists", name)), diff --git a/crates/context_aware_config/src/api/dimension/utils.rs b/crates/context_aware_config/src/api/dimension/utils.rs index 6e2838a71..1cb2beb52 100644 --- a/crates/context_aware_config/src/api/dimension/utils.rs +++ b/crates/context_aware_config/src/api/dimension/utils.rs @@ -1,7 +1,7 @@ use crate::helpers::DimensionData; use diesel::{query_dsl::methods::SchemaNameDsl, RunQueryDsl}; use jsonschema::{Draft, JSONSchema}; -use service_utils::{helpers::extract_dimensions, service::types::Tenant}; +use service_utils::{helpers::extract_dimensions, service::types::SchemaName}; use std::collections::HashMap; use superposition_macros::{bad_argument, db_error, unexpected_error}; use superposition_types::{ @@ -16,9 +16,11 @@ use super::types::{DimensionName, Position}; pub fn get_dimension_data( conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result> { - Ok(dimensions.schema_name(tenant).load::(conn)?) + Ok(dimensions + .schema_name(schema_name) + .load::(conn)?) } pub fn get_dimension_data_map( @@ -48,13 +50,16 @@ pub fn get_dimension_data_map( pub fn get_dimension_usage_context_ids( key: &str, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result> { let result: Vec = - contexts.schema_name(tenant).load(conn).map_err(|err| { - log::error!("failed to fetch contexts with error: {}", err); - db_error!(err) - })?; + contexts + .schema_name(schema_name) + .load(conn) + .map_err(|err| { + log::error!("failed to fetch contexts with error: {}", err); + db_error!(err) + })?; let mut context_ids = vec![]; for context in result.iter() { diff --git a/crates/context_aware_config/src/api/functions/handlers.rs b/crates/context_aware_config/src/api/functions/handlers.rs index 93b38f8c6..742254ae2 100644 --- a/crates/context_aware_config/src/api/functions/handlers.rs +++ b/crates/context_aware_config/src/api/functions/handlers.rs @@ -9,7 +9,7 @@ use base64::prelude::*; use chrono::Utc; use diesel::{delete, ExpressionMethods, QueryDsl, RunQueryDsl, SelectableHelper}; use serde_json::json; -use service_utils::service::types::{DbConnection, Tenant}; +use service_utils::service::types::{DbConnection, SchemaName}; use superposition_macros::{bad_argument, not_found, unexpected_error}; use superposition_types::{ custom_query::PaginationParams, @@ -48,7 +48,7 @@ async fn create( request: web::Json, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; let req = request.into_inner(); @@ -74,7 +74,7 @@ async fn create( let insert: Result = diesel::insert_into(functions) .values(&function) .returning(Function::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn); match insert { @@ -110,13 +110,13 @@ async fn update( request: web::Json, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; let req = request.into_inner(); let f_name: String = params.into_inner().into(); - let result = match fetch_function(&f_name, &mut conn, &tenant) { + let result = match fetch_function(&f_name, &mut conn, &schema_name) { Ok(val) => val, Err(superposition::AppError::DbError(diesel::result::Error::NotFound)) => { log::error!("Function not found."); @@ -158,7 +158,7 @@ async fn update( .filter(schema::functions::function_name.eq(f_name)) .set(new_function) .returning(Function::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; decode_function(&mut updated_function)?; @@ -169,11 +169,11 @@ async fn update( async fn get( params: web::Path, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; let f_name: String = params.into_inner().into(); - let mut function = fetch_function(&f_name, &mut conn, &tenant)?; + let mut function = fetch_function(&f_name, &mut conn, &schema_name)?; decode_function(&mut function)?; Ok(Json(function)) @@ -183,26 +183,26 @@ async fn get( async fn list_functions( db_conn: DbConnection, filters: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; let (total_pages, total_items, mut data) = match filters.all { Some(true) => { let result: Vec = - functions.schema_name(&tenant).get_results(&mut conn)?; + functions.schema_name(&schema_name).get_results(&mut conn)?; (1, result.len() as i64, result) } _ => { let n_functions: i64 = functions .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; let limit = filters.count.unwrap_or(10); let mut builder = functions .order(last_modified_at.desc()) .limit(limit) - .schema_name(&tenant) + .schema_name(&schema_name) .into_boxed(); if let Some(page) = filters.page { let offset = (page - 1) * limit; @@ -229,7 +229,7 @@ async fn delete_function( params: web::Path, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let f_name: String = params.into_inner().into(); @@ -241,10 +241,10 @@ async fn delete_function( dsl::last_modified_by.eq(user.get_email()), )) .returning(Function::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(&mut conn)?; let deleted_row = delete(functions.filter(function_name.eq(&f_name))) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(&mut conn); match deleted_row { Ok(0) => Err(not_found!("Function {} doesn't exists", f_name)), @@ -266,13 +266,13 @@ async fn test( params: Path, request: web::Json, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let path_params = params.into_inner(); let fun_name: &String = &path_params.function_name.into(); let req = request.into_inner(); - let mut function = match fetch_function(fun_name, &mut conn, &tenant) { + let mut function = match fetch_function(fun_name, &mut conn, &schema_name) { Ok(val) => val, Err(superposition::AppError::DbError(diesel::result::Error::NotFound)) => { log::error!("Function not found."); @@ -313,12 +313,12 @@ async fn publish( params: web::Path, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; let fun_name: String = params.into_inner().into(); - let function = match fetch_function(&fun_name, &mut conn, &tenant) { + let function = match fetch_function(&fun_name, &mut conn, &schema_name) { Ok(val) => val, Err(superposition::AppError::DbError(diesel::result::Error::NotFound)) => { log::error!("Function {} not found.", fun_name); @@ -342,7 +342,7 @@ async fn publish( dsl::published_at.eq(Some(Utc::now().naive_utc())), )) .returning(Function::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; Ok(Json(updated_function)) diff --git a/crates/context_aware_config/src/api/functions/helpers.rs b/crates/context_aware_config/src/api/functions/helpers.rs index 53d84a43b..ec5689cc5 100644 --- a/crates/context_aware_config/src/api/functions/helpers.rs +++ b/crates/context_aware_config/src/api/functions/helpers.rs @@ -2,7 +2,7 @@ extern crate base64; use base64::prelude::*; use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl}; -use service_utils::service::types::Tenant; +use service_utils::service::types::SchemaName; use std::str; use superposition_macros::unexpected_error; use superposition_types::{ @@ -16,11 +16,11 @@ use superposition_types::{ pub fn fetch_function( f_name: &String, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { Ok(functions .filter(schema::functions::function_name.eq(f_name)) - .schema_name(tenant) + .schema_name(schema_name) .get_result::(conn)?) } @@ -52,12 +52,12 @@ pub fn decode_base64_to_string(code: &String) -> superposition::Result { pub fn get_published_function_code( conn: &mut DBConnection, f_name: String, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result> { let function = functions .filter(schema::functions::function_name.eq(f_name)) .select(schema::functions::published_code) - .schema_name(tenant) + .schema_name(schema_name) .first(conn)?; Ok(function) } @@ -65,7 +65,7 @@ pub fn get_published_function_code( pub fn get_published_functions_by_names( conn: &mut DBConnection, function_names: Vec, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result)>> { let function: Vec<(String, Option)> = functions .filter(schema::functions::function_name.eq_any(function_names)) @@ -73,7 +73,7 @@ pub fn get_published_functions_by_names( schema::functions::function_name, schema::functions::published_code, )) - .schema_name(tenant) + .schema_name(schema_name) .load(conn)?; Ok(function) } diff --git a/crates/context_aware_config/src/api/type_templates/handlers.rs b/crates/context_aware_config/src/api/type_templates/handlers.rs index f175c516f..26c73b8a2 100644 --- a/crates/context_aware_config/src/api/type_templates/handlers.rs +++ b/crates/context_aware_config/src/api/type_templates/handlers.rs @@ -5,7 +5,7 @@ use diesel::{ ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl, SelectableHelper, }; use jsonschema::JSONSchema; -use service_utils::service::types::{DbConnection, Tenant}; +use service_utils::service::types::{DbConnection, SchemaName}; use superposition_macros::{bad_argument, db_error}; use superposition_types::{ custom_query::PaginationParams, @@ -33,7 +33,7 @@ async fn create_type( request: Json, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let _ = JSONSchema::compile(&request.type_schema).map_err(|err| { @@ -58,7 +58,7 @@ async fn create_type( type_templates::change_reason.eq(request.change_reason.clone()), )) .returning(TypeTemplate::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn) .map_err(|err| { log::error!("failed to insert custom type with error: {}", err); @@ -73,7 +73,7 @@ async fn update_type( path: Path, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let request = request.into_inner(); @@ -94,7 +94,7 @@ async fn update_type( let final_description = if description.is_none() { let existing_template = type_templates::table .filter(type_templates::type_name.eq(&type_name)) - .schema_name(&tenant) + .schema_name(&schema_name) .first::(&mut conn) .optional() .map_err(|err| { @@ -120,12 +120,12 @@ async fn update_type( .set(( type_templates::type_schema.eq(request.type_schema), type_templates::last_modified_at.eq(timestamp), - type_templates::last_modified_by.eq(user.email), + type_templates::last_modified_by.eq(user.email.clone()), type_templates::description.eq(final_description), type_templates::change_reason.eq(change_reason), )) .returning(TypeTemplate::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn) .map_err(|err| { log::error!("failed to insert custom type with error: {}", err); @@ -139,7 +139,7 @@ async fn delete_type( path: Path, db_conn: DbConnection, user: User, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let type_name: String = path.into_inner().into(); @@ -147,14 +147,14 @@ async fn delete_type( .filter(dsl::type_name.eq(type_name.clone())) .set(( dsl::last_modified_at.eq(Utc::now().naive_utc()), - dsl::last_modified_by.eq(user.email), + dsl::last_modified_by.eq(user.email.clone()), )) .returning(TypeTemplate::as_returning()) - .schema_name(&tenant) + .schema_name(&schema_name) .execute(&mut conn)?; let deleted_type = diesel::delete(dsl::type_templates.filter(dsl::type_name.eq(type_name))) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(&mut conn)?; Ok(HttpResponse::Ok().json(deleted_type)) } @@ -163,13 +163,13 @@ async fn delete_type( async fn list_types( db_conn: DbConnection, filters: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; if let Some(true) = filters.all { let result: Vec = type_templates::dsl::type_templates - .schema_name(&tenant) + .schema_name(&schema_name) .get_results(&mut conn)?; return Ok(Json(PaginatedResponse { total_pages: 1, @@ -180,11 +180,11 @@ async fn list_types( let n_types: i64 = type_templates::dsl::type_templates .count() - .schema_name(&tenant) + .schema_name(&schema_name) .get_result(&mut conn)?; let limit = filters.count.unwrap_or(10); let mut builder = type_templates::dsl::type_templates - .schema_name(&tenant) + .schema_name(&schema_name) .order(type_templates::dsl::created_at.desc()) .limit(limit) .into_boxed(); diff --git a/crates/context_aware_config/src/helpers.rs b/crates/context_aware_config/src/helpers.rs index 32cd3c427..6e0fe05c8 100644 --- a/crates/context_aware_config/src/helpers.rs +++ b/crates/context_aware_config/src/helpers.rs @@ -14,10 +14,9 @@ use jsonlogic; use jsonschema::{Draft, JSONSchema, ValidationError}; use num_bigint::BigUint; use serde_json::{json, Map, Value}; -use service_utils::service::types::Tenant; use service_utils::{ helpers::{generate_snowflake_id, validation_err_to_str}, - service::types::AppState, + service::types::{AppState, SchemaName}, }; use superposition_macros::{bad_argument, db_error, unexpected_error, validation_error}; #[cfg(feature = "high-performance-mode")] @@ -214,12 +213,12 @@ pub fn calculate_context_weight( } pub fn generate_cac( conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { let contexts_vec: Vec<(String, Condition, String, Overrides)> = ctxt::contexts .select((ctxt::id, ctxt::value, ctxt::override_id, ctxt::override_)) .order_by((ctxt::weight.asc(), ctxt::created_at.asc())) - .schema_name(tenant) + .schema_name(schema_name) .load::<(String, Condition, String, Overrides)>(conn) .map_err(|err| { log::error!("failed to fetch contexts with error: {}", err); @@ -269,7 +268,7 @@ pub fn generate_cac( let default_config_vec = def_conf::default_configs .select((def_conf::key, def_conf::value)) - .schema_name(&tenant) + .schema_name(schema_name) .load::<(String, Value)>(conn) .map_err(|err| { log::error!("failed to fetch default_configs with error: {}", err); @@ -297,11 +296,11 @@ pub fn add_config_version( description: String, change_reason: String, db_conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { use config_versions::dsl::config_versions; let version_id = generate_snowflake_id(state)?; - let config = generate_cac(db_conn, tenant)?; + let config = generate_cac(db_conn, schema_name)?; let json_config = json!(config); let config_hash = blake3::hash(json_config.to_string().as_bytes()).to_string(); let config_version = ConfigVersion { @@ -316,7 +315,7 @@ pub fn add_config_version( diesel::insert_into(config_versions) .values(&config_version) .returning(ConfigVersion::as_returning()) - .schema_name(tenant) + .schema_name(schema_name) .execute(db_conn)?; Ok(version_id) } @@ -335,18 +334,18 @@ pub fn get_workspace( pub async fn put_config_in_redis( version_id: i64, state: Data, - tenant: Tenant, + schema_name: &SchemaName, db_conn: &mut DBConnection, ) -> superposition::Result<()> { - let raw_config = generate_cac(db_conn, &tenant)?; + let raw_config = generate_cac(db_conn, schema_name)?; let parsed_config = serde_json::to_string(&json!(raw_config)).map_err(|e| { log::error!("failed to convert cac config to string: {}", e); unexpected_error!("could not convert cac config to string") })?; - let config_key = format!("{}::cac_config", *tenant); - let last_modified_at_key = format!("{}::cac_config::last_modified_at", *tenant); - let audit_id_key = format!("{}::cac_config::audit_id", *tenant); - let config_version_key = format!("{}::cac_config::config_version", *tenant); + let config_key = format!("{}::cac_config", *schema_name); + let last_modified_at_key = format!("{}::cac_config::last_modified_at", *schema_name); + let audit_id_key = format!("{}::cac_config::audit_id", *schema_name); + let config_version_key = format!("{}::cac_config::config_version", *schema_name); let last_modified = DateTime::to_rfc2822(&Utc::now()); let _ = state .redis diff --git a/crates/experimentation_platform/src/api/experiments/handlers.rs b/crates/experimentation_platform/src/api/experiments/handlers.rs index 0f2fe9165..ba652fac0 100644 --- a/crates/experimentation_platform/src/api/experiments/handlers.rs +++ b/crates/experimentation_platform/src/api/experiments/handlers.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{HashMap, HashSet}, - ops::Deref, -}; +use std::collections::{HashMap, HashSet}; use actix_http::header::{self}; use actix_web::{ @@ -9,7 +6,6 @@ use actix_web::{ web::{self, Data, Json, Query}, HttpRequest, HttpResponse, HttpResponseBuilder, Scope, }; -use anyhow::anyhow; use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use diesel::{ dsl::sql, @@ -20,14 +16,13 @@ use diesel::{ }; use reqwest::{Method, Response, StatusCode}; use serde_json::{json, Map, Value}; -use service_utils::service::types::{ - AppHeader, AppState, CustomHeaders, DbConnection, Tenant, -}; use service_utils::{ helpers::{ construct_request_headers, execute_webhook_call, generate_snowflake_id, request, }, - service::types::OrganisationId, + service::types::{ + AppHeader, AppState, CustomHeaders, DbConnection, SchemaName, WorkspaceRequest, + }, }; use superposition_macros::{bad_argument, response_error, unexpected_error}; use superposition_types::{ @@ -138,10 +133,9 @@ async fn create( custom_headers: CustomHeaders, req: web::Json, db_conn: DbConnection, - tenant: Tenant, + workspace_request: WorkspaceRequest, user: User, tenant_config: TenantConfig, - org_id: OrganisationId, ) -> superposition::Result { use superposition_types::database::schema::experiments::dsl::experiments; let mut variants = req.variants.to_vec(); @@ -196,7 +190,7 @@ async fn create( &unique_override_keys, None, flags, - &tenant, + &workspace_request.schema_name, &mut conn, )?; if !valid { @@ -244,25 +238,18 @@ async fn create( err ) })?; - let org_id = org_id.clone().deref().to_owned(); - log::info!("Organisation ID {org_id}"); + let extra_headers = vec![ ("x-user", Some(user_str)), ("x-config-tags", custom_headers.config_tags), - ("x-org-id", Some(org_id)), ] .into_iter() .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); let headers_map = construct_header_map( - tenant - .get_tenant_name() - .map_err(|err| { - log::error!("{err}"); - unexpected_error!("failed to decode tenant") - })? - .as_str(), + &workspace_request.workspace_id, + &workspace_request.organisation_id, extra_headers, )?; @@ -318,19 +305,19 @@ async fn create( let mut inserted_experiments = diesel::insert_into(experiments) .values(&new_experiment) .returning(Experiment::as_returning()) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_results(&mut conn)?; let inserted_experiment: Experiment = inserted_experiments.remove(0); let response = ExperimentCreateResponse::from(inserted_experiment.clone()); if let WebhookConfig::Enabled(experiments_webhook_config) = - tenant_config.experiments_webhook_config + &tenant_config.experiments_webhook_config { execute_webhook_call( - &experiments_webhook_config, + experiments_webhook_config, &ExperimentResponse::from(inserted_experiment), &config_version_id, - &tenant, + &workspace_request, WebhookEvent::ExperimentCreated, &state.app_env, &state.http_client, @@ -350,10 +337,9 @@ async fn conclude_handler( custom_headers: CustomHeaders, req: web::Json, db_conn: DbConnection, - tenant: Tenant, + workspace_request: WorkspaceRequest, tenant_config: TenantConfig, user: User, - org_id: OrganisationId, ) -> superposition::Result { let DbConnection(conn) = db_conn; let (response, config_version_id) = conclude( @@ -362,22 +348,21 @@ async fn conclude_handler( custom_headers.config_tags, req.into_inner(), conn, - tenant.clone(), - user, - org_id, + &workspace_request, + &user, ) .await?; let experiment_response = ExperimentResponse::from(response); if let WebhookConfig::Enabled(experiments_webhook_config) = - tenant_config.experiments_webhook_config + &tenant_config.experiments_webhook_config { execute_webhook_call( - &experiments_webhook_config, + experiments_webhook_config, &experiment_response, &config_version_id, - &tenant, + &workspace_request, WebhookEvent::ExperimentConcluded, &state.app_env, &state.http_client, @@ -397,9 +382,8 @@ pub async fn conclude( config_tags: Option, req: ConcludeExperimentRequest, mut conn: PooledConnection>, - tenant: Tenant, - user: User, - org_id: OrganisationId, + workspace_request: &WorkspaceRequest, + user: &User, ) -> superposition::Result<(Experiment, Option)> { use superposition_types::database::schema::experiments::dsl; @@ -408,7 +392,7 @@ pub async fn conclude( let experiment: Experiment = dsl::experiments .find(experiment_id) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_result::(&mut conn)?; let description = match req.description.clone() { @@ -425,10 +409,6 @@ pub async fn conclude( let experiment_context: Map = experiment.context.into(); let mut operations: Vec = vec![]; - let tenant_name = tenant.get_tenant_name().map_err(|err| { - log::error!("{err}"); - unexpected_error!("failed to decode tenant") - })?; let mut is_valid_winner_variant = false; for variant in experiment.variants.into_inner() { @@ -462,24 +442,20 @@ pub async fn conclude( let url = format!("{}/default-config/{}", state.cac_host, key); let headers = construct_request_headers(&[ - ("x-tenant", tenant_name.as_str()), + ("x-tenant", &workspace_request.workspace_id), ( "Authorization", &format!("Internal {}", state.superposition_token), ), ("x-user", user_str.as_str()), - ("x-org-id", org_id.as_str()), + ("x-org-id", &workspace_request.organisation_id), ]) - .map_err(|err| { - superposition::AppError::UnexpectedError(anyhow!(err)) - })?; + .map_err(|err| unexpected_error!(err))?; let _ = request::<_, Value>(url, Method::PUT, Some(create_req), headers) .await - .map_err(|err| { - superposition::AppError::UnexpectedError(anyhow!(err)) - })?; + .map_err(|err| unexpected_error!(err))?; } operations.push(ContextAction::DELETE(context_id)); } @@ -507,16 +483,16 @@ pub async fn conclude( err ) })?; - let extra_headers = vec![ - ("x-user", Some(user_str)), - ("x-config-tags", config_tags), - ("x-org-id", Some(org_id.to_string())), - ] - .into_iter() - .filter_map(|(key, val)| val.map(|v| (key, v))) - .collect::>(); + let extra_headers = vec![("x-user", Some(user_str)), ("x-config-tags", config_tags)] + .into_iter() + .filter_map(|(key, val)| val.map(|v| (key, v))) + .collect::>(); - let headers_map = construct_header_map(&tenant_name, extra_headers)?; + let headers_map = construct_header_map( + &workspace_request.workspace_id, + &workspace_request.organisation_id, + extra_headers, + )?; let response = http_client .put(&url) @@ -541,7 +517,7 @@ pub async fn conclude( dsl::chosen_variant.eq(Some(winner_variant_id)), )) .returning(Experiment::as_returning()) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_result::(&mut conn)?; Ok((updated_experiment, config_version_id)) @@ -551,14 +527,14 @@ pub async fn conclude( async fn get_applicable_variants( db_conn: DbConnection, query_data: Query, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let query_data = query_data.into_inner(); let experiments = experiments::experiments .filter(experiments::status.ne(ExperimentStatusType::CONCLUDED)) - .schema_name(&tenant) + .schema_name(&schema_name) .load::(&mut conn)?; let experiments = experiments.into_iter().filter(|exp| { @@ -594,13 +570,13 @@ async fn list_experiments( pagination_params: Query, filters: Query, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; if let Some(true) = pagination_params.all { let result = experiments::experiments - .schema_name(&tenant) + .schema_name(&schema_name) .get_results::(&mut conn)?; return Ok( HttpResponse::Ok().json(PaginatedResponse:: { @@ -614,7 +590,7 @@ async fn list_experiments( let max_event_timestamp: Option = event_log::event_log .filter(event_log::table_name.eq("experiments")) .select(diesel::dsl::max(event_log::timestamp)) - .schema_name(&tenant) + .schema_name(&schema_name) .first(&mut conn)?; let last_modified = req @@ -632,7 +608,9 @@ async fn list_experiments( }; let query_builder = |filters: &ExperimentListFilters| { - let mut builder = experiments::experiments.schema_name(&tenant).into_boxed(); + let mut builder = experiments::experiments + .schema_name(&schema_name) + .into_boxed(); if let Some(ref states) = filters.status { builder = builder.filter(experiments::status.eq_any(states.0.clone())); } @@ -702,22 +680,22 @@ async fn list_experiments( async fn get_experiment_handler( params: web::Path, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; - let response = get_experiment(params.into_inner(), &mut conn, &tenant)?; + let response = get_experiment(params.into_inner(), &mut conn, &schema_name)?; Ok(Json(ExperimentResponse::from(response))) } pub fn get_experiment( experiment_id: i64, conn: &mut DBConnection, - tenant: &Tenant, + schema_name: &SchemaName, ) -> superposition::Result { use superposition_types::database::schema::experiments::dsl::*; let result: Experiment = experiments .find(experiment_id) - .schema_name(&tenant) + .schema_name(&schema_name) .get_result::(conn)?; Ok(result) @@ -730,9 +708,8 @@ async fn ramp( req: web::Json, db_conn: DbConnection, user: User, - tenant: Tenant, + workspace_request: WorkspaceRequest, tenant_config: TenantConfig, - org_id: OrganisationId, ) -> superposition::Result> { let DbConnection(mut conn) = db_conn; let exp_id = params.into_inner(); @@ -740,7 +717,7 @@ async fn ramp( let experiment: Experiment = experiments::experiments .find(exp_id) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_result::(&mut conn)?; let old_traffic_percentage = experiment.traffic_percentage as u8; @@ -771,10 +748,10 @@ async fn ramp( experiments::change_reason.eq(change_reason), )) .returning(Experiment::as_returning()) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_result(&mut conn)?; - let (_, config_version_id) = fetch_cac_config(&tenant, &data, &org_id).await?; + let (_, config_version_id) = fetch_cac_config(&data, &workspace_request).await?; let experiment_response = ExperimentResponse::from(updated_experiment); let webhook_event = if new_traffic_percentage == 0 @@ -785,13 +762,13 @@ async fn ramp( WebhookEvent::ExperimentInprogress }; if let WebhookConfig::Enabled(experiments_webhook_config) = - tenant_config.experiments_webhook_config + &tenant_config.experiments_webhook_config { execute_webhook_call( - &experiments_webhook_config, + experiments_webhook_config, &experiment_response, &config_version_id, - &tenant, + &workspace_request, webhook_event, &data.app_env, &data.http_client, @@ -810,10 +787,9 @@ async fn update_overrides( custom_headers: CustomHeaders, db_conn: DbConnection, req: web::Json, - tenant: Tenant, + workspace_request: WorkspaceRequest, tenant_config: TenantConfig, user: User, - org_id: OrganisationId, ) -> superposition::Result { let DbConnection(mut conn) = db_conn; let experiment_id = params.into_inner(); @@ -834,7 +810,7 @@ async fn update_overrides( // fetch the current variants of the experiment let experiment = experiments::experiments .find(experiment_id) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .first::(&mut conn)?; if experiment.status != ExperimentStatusType::CREATED { @@ -918,7 +894,7 @@ async fn update_overrides( &override_keys, Some(experiment_id), flags, - &tenant, + &workspace_request.schema_name, &mut conn, )?; if !valid { @@ -976,17 +952,16 @@ async fn update_overrides( let extra_headers = vec![ ("x-user", Some(user_str)), ("x-config-tags", custom_headers.config_tags), - ("x-org-id", Some(org_id.to_string())), ] .into_iter() .filter_map(|(key, val)| val.map(|v| (key, v))) .collect::>(); - let tenant_name = tenant.get_tenant_name().map_err(|err| { - log::error!("{err}"); - unexpected_error!("failed to decode tenant") - })?; - let headers_map = construct_header_map(&tenant_name, extra_headers)?; + let headers_map = construct_header_map( + &workspace_request.workspace_id, + &workspace_request.organisation_id, + extra_headers, + )?; let response = http_client .put(&url) @@ -1032,19 +1007,19 @@ async fn update_overrides( experiments::last_modified_by.eq(user.get_email()), )) .returning(Experiment::as_returning()) - .schema_name(&tenant) + .schema_name(&workspace_request.schema_name) .get_result::(&mut conn)?; let experiment_response = ExperimentResponse::from(updated_experiment); if let WebhookConfig::Enabled(experiments_webhook_config) = - tenant_config.experiments_webhook_config + &tenant_config.experiments_webhook_config { execute_webhook_call( - &experiments_webhook_config, + experiments_webhook_config, &experiment_response, &config_version_id, - &tenant, + &workspace_request, WebhookEvent::ExperimentUpdated, &state.app_env, &state.http_client, @@ -1062,12 +1037,12 @@ async fn update_overrides( async fn get_audit_logs( filters: Query, db_conn: DbConnection, - tenant: Tenant, + schema_name: SchemaName, ) -> superposition::Result>> { let DbConnection(mut conn) = db_conn; let query_builder = |filters: &AuditQueryFilters| { - let mut builder = event_log::event_log.schema_name(&tenant).into_boxed(); + let mut builder = event_log::event_log.schema_name(&schema_name).into_boxed(); if let Some(tables) = filters.table.clone() { builder = builder.filter(event_log::table_name.eq_any(tables.0)); } diff --git a/crates/experimentation_platform/src/api/experiments/helpers.rs b/crates/experimentation_platform/src/api/experiments/helpers.rs index dbbffd62b..f89af86e4 100644 --- a/crates/experimentation_platform/src/api/experiments/helpers.rs +++ b/crates/experimentation_platform/src/api/experiments/helpers.rs @@ -5,7 +5,8 @@ use diesel::{BoolExpressionMethods, ExpressionMethods, QueryDsl, RunQueryDsl}; use serde_json::{Map, Value}; use service_utils::helpers::extract_dimensions; use service_utils::service::types::{ - AppState, ExperimentationFlags, OrganisationId, Tenant, + AppState, ExperimentationFlags, OrganisationId, SchemaName, WorkspaceId, + WorkspaceRequest, }; use std::collections::HashSet; use std::str::FromStr; @@ -185,7 +186,7 @@ pub fn validate_experiment( override_keys: &[String], experiment_id: Option, flags: &ExperimentationFlags, - tenant: &Tenant, + schema_name: &SchemaName, conn: &mut PgConnection, ) -> superposition::Result<(bool, String)> { use superposition_types::database::schema::experiments::dsl as experiments_dsl; @@ -199,7 +200,7 @@ pub fn validate_experiment( .or(experiments_dsl::status.eq(ExperimentStatusType::INPROGRESS)), ), ) - .schema_name(tenant) + .schema_name(schema_name) .load(conn)?; is_valid_experiment(context, override_keys, flags, &active_experiments) @@ -277,15 +278,23 @@ pub fn decide_variant( } pub fn construct_header_map( - tenant: &str, + workspace_id: &WorkspaceId, + organisation_id: &OrganisationId, other_headers: Vec<(&str, String)>, ) -> superposition::Result { let mut headers = HeaderMap::new(); - let tenant_val = HeaderValue::from_str(tenant).map_err(|err| { + let workspace_val = HeaderValue::from_str(workspace_id).map_err(|err| { log::error!("failed to set header: {}", err); unexpected_error!("Something went wrong") })?; - headers.insert(HeaderName::from_static("x-tenant"), tenant_val); + headers.insert(HeaderName::from_static("x-tenant"), workspace_val); + + let org_val = HeaderValue::from_str(organisation_id).map_err(|err| { + log::error!("failed to set header: {}", err); + unexpected_error!("Something went wrong") + })?; + headers.insert(HeaderName::from_static("x-org-id"), org_val); + for (header, value) in other_headers { let header_name = HeaderName::from_str(header).map_err(|err| { log::error!("failed to set header: {}", err); @@ -304,18 +313,16 @@ pub fn construct_header_map( } pub async fn fetch_cac_config( - tenant: &Tenant, state: &Data, - org_id: &OrganisationId, + workspace_request: &WorkspaceRequest, ) -> superposition::Result<(Config, Option)> { let http_client = reqwest::Client::new(); let url = state.cac_host.clone() + "/config"; - let tenant_name = tenant.get_tenant_name().map_err(|err| { - log::error!("{err}"); - unexpected_error!("failed to decode tenant") - })?; - let headers_map = - construct_header_map(&tenant_name, vec![("x-org-id", org_id.to_string())])?; + let headers_map = construct_header_map( + &workspace_request.workspace_id, + &workspace_request.organisation_id, + vec![], + )?; let response = http_client .get(&url) diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index 2adfb33ff..ee45776ee 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -1,6 +1,6 @@ use crate::{ aws::kms, - service::types::{AppEnv, AppState, Tenant}, + service::types::{AppEnv, AppState, WorkspaceRequest}, }; use actix_web::{error::ErrorInternalServerError, web::Data, Error}; use anyhow::anyhow; @@ -382,7 +382,7 @@ pub async fn execute_webhook_call( webhook_config: &Webhook, payload: &T, config_version_opt: &Option, - tenant: &Tenant, + workspace_request: &WorkspaceRequest, event: WebhookEvent, app_env: &AppEnv, http_client: &reqwest::Client, @@ -404,7 +404,9 @@ where None } } - HeadersEnum::TenantId => Some((key.to_string(), tenant.to_string())), + HeadersEnum::WorkspaceId => { + Some((key.to_string(), workspace_request.workspace_id.to_string())) + } }) .collect::>(); @@ -454,7 +456,8 @@ where event_info: WebhookEventInfo { webhook_event: event, time: Utc::now().naive_utc().to_string(), - tenant_id: tenant.to_string(), + workspace_id: workspace_request.workspace_id.to_string(), + organisation_id: workspace_request.organisation_id.to_string(), config_version: config_version_opt.clone(), }, payload, diff --git a/crates/service_utils/src/middlewares/tenant.rs b/crates/service_utils/src/middlewares/tenant.rs index e53106643..f17d543f2 100644 --- a/crates/service_utils/src/middlewares/tenant.rs +++ b/crates/service_utils/src/middlewares/tenant.rs @@ -1,6 +1,8 @@ use std::future::{ready, Ready}; -use crate::service::types::{AppState, OrganisationId, Tenant}; +use crate::service::types::{ + AppState, OrganisationId, SchemaName, WorkspaceId, WorkspaceRequest, +}; use actix_web::{ dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}, error::{self}, @@ -89,7 +91,7 @@ fn extract_org_workspace_from_query_params( query_str .split('&') .find(|segment| segment.contains(matching_pattern.as_str())) - .and_then(|tenant_query_param| tenant_query_param.split('=').nth(1)) + .and_then(|query_param| query_param.split('=').nth(1)) } impl Service for OrgWorkspaceMiddleware @@ -138,89 +140,36 @@ where || pkg_regex.is_match(&request_path) || assets_regex.is_match(&request_path); - if !is_excluded && app_state.enable_tenant_and_scope { - debug!( - "Workspace FROM HEADER ==> {:?}", - extract_org_workspace_from_header( - req.headers(), - String::from("x-tenant") - ) - ); - debug!( - "Workspace FROM URL ==> {:?}", - extract_org_workspace_from_url( - req.path(), - req.match_pattern(), - String::from("{tenant}") - ) - ); - debug!( - "Workspace FROM QUERY ==> {:?}", - extract_org_workspace_from_query_params( - req.query_string(), - String::from("tenant=") - ) - ); - - let workspace = extract_org_workspace_from_header( - req.headers(), - String::from("x-tenant"), - ) - .or_else(|| { - extract_org_workspace_from_url( - req.path(), - req.match_pattern(), - String::from("{tenant}"), - ) - }) - .or_else(|| { - extract_org_workspace_from_query_params( - req.query_string(), - String::from("tenant="), - ) - }); - - let org = extract_org_workspace_from_header( - req.headers(), - String::from("x-org-id"), - ) - .or_else(|| { - extract_org_workspace_from_url( - req.path(), - req.match_pattern(), - String::from("{org_id}"), - ) - }) - .or_else(|| { - extract_org_workspace_from_query_params( - req.query_string(), - String::from("org="), - ) - }); - - let workspace_id = match (enable_workspace_id, workspace) { - (true, None) => return Err(error::ErrorBadRequest("The parameter workspace id is required, and must be passed through headers/url params/query params. Consult the documentation to know which to use for this endpoint")), - (true, Some(workspace_id)) => workspace_id, - (false, _) => "public", + if !is_excluded { + let workspace_id = match (enable_workspace_id, req.get_workspace_id()) { + (true, None) => return Err(error::ErrorBadRequest("The parameter workspace id is required, and must be passed through headers/url params/query params.")), + (true, Some(WorkspaceId(workspace_id))) => workspace_id, + (false, _) => String::from("public"), }; - // TODO: validate the tenant, get correct TenantConfig - let (validated_tenant, tenant_config) = match (enable_org_id, org) { - (true, None) => return Err(error::ErrorBadRequest("The parameter org id is required, and must be passed through headers/url params/query params. Consult the documentation to know which to use for this endpoint")), - (true, Some(org_id)) => { - let tenant = format!("{org_id}_{workspace_id}"); - (Tenant(tenant), TenantConfig::default()) + let org = req.get_organisation_id(); + + // TODO: validate the workspace, get correct TenantConfig + let (schema_name, tenant_config) = match (enable_org_id, &org) { + (true, None) => return Err(error::ErrorBadRequest("The parameter org id is required, and must be passed through headers/url params/query params.")), + (true, Some(OrganisationId(org_id))) => { + let schema = format!("{org_id}_{workspace_id}"); + (SchemaName(schema), TenantConfig::default()) }, - (false, _) => (Tenant("public".into()), TenantConfig::default()), + (false, _) => (SchemaName("public".into()), TenantConfig::default()), }; - let organisation = org - .map(String::from) - .map(OrganisationId) - .unwrap_or_default(); + let organisation = org.unwrap_or_default(); + let workspace = WorkspaceId(workspace_id.to_string()); - req.extensions_mut().insert(validated_tenant); - req.extensions_mut().insert(organisation); + req.extensions_mut().insert(schema_name.clone()); + req.extensions_mut().insert(workspace.clone()); + req.extensions_mut().insert(organisation.clone()); + req.extensions_mut().insert(WorkspaceRequest { + organisation_id: organisation, + workspace_id: workspace, + schema_name, + }); req.extensions_mut().insert(tenant_config); } @@ -230,3 +179,58 @@ where }) } } + +pub trait ServiceRequestExt { + fn get_organisation_id(&self) -> Option; + fn get_workspace_id(&self) -> Option; +} + +impl ServiceRequestExt for ServiceRequest { + fn get_organisation_id(&self) -> Option { + let org_from_header = + extract_org_workspace_from_header(self.headers(), String::from("x-org-id")); + let org_from_url = extract_org_workspace_from_url( + self.path(), + self.match_pattern(), + String::from("{org_id}"), + ); + let org_from_query_params = extract_org_workspace_from_query_params( + self.query_string(), + String::from("org="), + ); + + debug!("Org FROM HEADER ==> {:?}", org_from_header); + debug!("Org FROM URL ==> {:?}", org_from_url); + debug!("Org FROM QUERY ==> {:?}", org_from_query_params); + + org_from_header + .or_else(|| org_from_url) + .or_else(|| org_from_query_params) + .map(String::from) + .map(OrganisationId) + } + + fn get_workspace_id(&self) -> Option { + let workspace_from_header = + extract_org_workspace_from_header(self.headers(), String::from("x-tenant")); + let workspace_from_url = extract_org_workspace_from_url( + self.path(), + self.match_pattern(), + String::from("{tenant}"), + ); + let workspace_from_query_params = extract_org_workspace_from_query_params( + self.query_string(), + String::from("tenant="), + ); + + debug!("Workspace FROM HEADER ==> {:?}", workspace_from_header); + debug!("Workspace FROM URL ==> {:?}", workspace_from_url); + debug!("Workspace FROM QUERY ==> {:?}", workspace_from_query_params); + + workspace_from_header + .or_else(|| workspace_from_url) + .or_else(|| workspace_from_query_params) + .map(String::from) + .map(WorkspaceId) + } +} diff --git a/crates/service_utils/src/service/types.rs b/crates/service_utils/src/service/types.rs index e3d024d86..6e50949f6 100644 --- a/crates/service_utils/src/service/types.rs +++ b/crates/service_utils/src/service/types.rs @@ -1,4 +1,3 @@ -use std::ops::Deref; use std::sync::Mutex; use std::{ collections::HashSet, @@ -13,7 +12,6 @@ use derive_more::{Deref, DerefMut}; use diesel::r2d2::{ConnectionManager, PooledConnection}; use diesel::PgConnection; use jsonschema::JSONSchema; -use serde_json::json; use snowflake::SnowflakeIdGenerator; use crate::db::PgSchemaConnectionPool; @@ -48,7 +46,6 @@ pub struct AppState { pub meta_schema: JSONSchema, pub experimentation_flags: ExperimentationFlags, pub snowflake_generator: Arc>, - pub enable_tenant_and_scope: bool, pub tenant_middleware_exclusion_list: HashSet, pub service_prefix: String, pub superposition_token: String, @@ -78,7 +75,11 @@ pub enum AppScope { EXPERIMENTATION, SUPERPOSITION, } -impl FromRequest for AppScope { + +#[derive(Deref, DerefMut, Clone, Debug)] +pub struct WorkspaceId(pub String); + +impl FromRequest for WorkspaceId { type Error = Error; type Future = Ready>; @@ -86,55 +87,25 @@ impl FromRequest for AppScope { req: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload, ) -> Self::Future { - let scope = req.extensions().get::().cloned(); - let result = match scope { - Some(v) => Ok(v), - None => Err(error::ErrorInternalServerError("app scope not set")), - }; + let result = req.extensions().get::().cloned().ok_or_else(|| { + log::error!("Workspace Id not found"); + actix_web::error::ErrorInternalServerError("Workspace Id not found") + }); + ready(result) } } #[derive(Deref, DerefMut, Clone, Debug)] -pub struct AppExecutionNamespace(pub String); -impl AppExecutionNamespace { - pub fn from_request_sync(req: &actix_web::HttpRequest) -> Result { - let app_state = match req.app_data::>() { - Some(val) => val, - None => { - log::error!("get_app_execution_namespace: AppState not set"); - return Err(error::ErrorInternalServerError("")); - } - }; - - let tenant = req.extensions().get::().cloned(); - let scope = req.extensions().get::().cloned(); +pub struct OrganisationId(pub String); - match ( - app_state.enable_tenant_and_scope, - app_state.app_env, - tenant, - scope, - ) { - (false, _, _, _) => Ok(AppExecutionNamespace("cac_v1".to_string())), - (true, _, Some(t), Some(_)) => Ok(AppExecutionNamespace(t.0)), - (true, _, None, _) => { - log::error!( - "get_app_execution_namespace: Tenant not set in request extensions" - ); - Err(error::ErrorInternalServerError("")) - } - (true, _, _, None) => { - log::error!( - "get_app_execution_namespace: AppScope not set in request extensions" - ); - Err(error::ErrorInternalServerError("")) - } - } +impl Default for OrganisationId { + fn default() -> Self { + Self(String::from("superposition")) } } -impl FromRequest for AppExecutionNamespace { +impl FromRequest for OrganisationId { type Error = Error; type Future = Ready>; @@ -142,24 +113,43 @@ impl FromRequest for AppExecutionNamespace { req: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload, ) -> Self::Future { - ready(AppExecutionNamespace::from_request_sync(req)) + let result = req.extensions().get::().cloned().ok_or_else(|| { + log::error!("Organisation Id not found"); + actix_web::error::ErrorInternalServerError("Organisation Id not found") + }); + + ready(result) } } #[derive(Deref, DerefMut, Clone, Debug)] -pub struct Tenant(pub String); +pub struct SchemaName(pub String); + +impl FromRequest for SchemaName { + type Error = Error; + type Future = Ready>; + + fn from_request( + req: &actix_web::HttpRequest, + _: &mut actix_web::dev::Payload, + ) -> Self::Future { + let result = req.extensions().get::().cloned().ok_or_else(|| { + log::error!("Schema name not found, please check that the organisation id and workspace id are being properly sent"); + actix_web::error::ErrorInternalServerError("Schema name not found, please check that the organisation id and workspace id are being properly sent") + }); -impl Tenant { - pub fn get_tenant_name(&self) -> Result { - self.deref() - .split('_') - .into_iter() - .last() - .map(str::to_string) - .ok_or(String::from("failed to decode tenant")) + ready(result) } } -impl FromRequest for Tenant { + +#[derive(Clone)] +pub struct WorkspaceRequest { + pub workspace_id: WorkspaceId, + pub organisation_id: OrganisationId, + pub schema_name: SchemaName, +} + +impl FromRequest for WorkspaceRequest { type Error = Error; type Future = Ready>; @@ -167,28 +157,11 @@ impl FromRequest for Tenant { req: &actix_web::HttpRequest, _: &mut actix_web::dev::Payload, ) -> Self::Future { - let tenant = req.extensions().get::().cloned(); - let result = match tenant { - Some(v) => Ok(v), - None => { - let app_state = match req.app_data::>() { - Some(val) => val, - None => { - log::error!("app state not set"); - return ready(Err(error::ErrorInternalServerError(json!({ - "message": "an unknown error occurred with the app. Please contact an admin" - })))); - } - }; - if app_state.enable_tenant_and_scope { - Err(error::ErrorInternalServerError(json!({ - "message": "tenant was not set. Please ensure you are passing in the x-tenant header" - }))) - } else { - Ok(Tenant("public".into())) - } - } - }; + let result = req.extensions().get::().cloned().ok_or_else(|| { + log::error!("Workspace request data not found, please check that the organisation id and workspace id are being properly sent"); + actix_web::error::ErrorInternalServerError("Workspace request data not found, please check that the organisation id and workspace id are being properly sent") + }); + ready(result) } } @@ -245,46 +218,3 @@ impl FromRequest for CustomHeaders { ready(Ok(val)) } } - -#[derive(Deref, DerefMut, Clone, Debug)] -pub struct OrganisationId(pub String); - -impl Default for OrganisationId { - fn default() -> Self { - Self(String::from("superposition")) - } -} - -impl FromRequest for OrganisationId { - type Error = Error; - type Future = Ready>; - - fn from_request( - req: &actix_web::HttpRequest, - _: &mut actix_web::dev::Payload, - ) -> Self::Future { - let organisation = req.extensions().get::().cloned(); - let result = match organisation { - Some(v) => Ok(v), - None => { - let app_state = match req.app_data::>() { - Some(val) => val, - None => { - log::error!("app state not set"); - return ready(Err(error::ErrorInternalServerError(json!({ - "message": "an unknown error occurred with the app. Please contact an admin" - })))); - } - }; - if app_state.enable_tenant_and_scope { - Err(error::ErrorInternalServerError(json!({ - "message": "x-org-id was not set. Please ensure you are passing in the x-tenant header" - }))) - } else { - Ok(OrganisationId::default()) - } - } - }; - ready(result) - } -} diff --git a/crates/superposition/src/app_state.rs b/crates/superposition/src/app_state.rs index f998ef9f2..eb2361989 100644 --- a/crates/superposition/src/app_state.rs +++ b/crates/superposition/src/app_state.rs @@ -30,8 +30,6 @@ pub async fn get( let cac_host = get_from_env_unsafe::("CAC_HOST").expect("CAC host is not set") + base; let max_pool_size = get_from_env_or_default("MAX_DB_CONNECTION_POOL_SIZE", 2); - let enable_tenant_and_scope = get_from_env_unsafe("ENABLE_TENANT_AND_SCOPE") - .expect("ENABLE_TENANT_AND_SCOPE is not set"); let snowflake_generator = Arc::new(Mutex::new(SnowflakeIdGenerator::new(1, 1))); @@ -92,7 +90,6 @@ pub async fn get( snowflake_generator, meta_schema: get_meta_schema(), app_env, - enable_tenant_and_scope, tenant_middleware_exclusion_list: get_from_env_unsafe::( "TENANT_MIDDLEWARE_EXCLUSION_LIST", ) diff --git a/crates/superposition/src/auth/authenticator.rs b/crates/superposition/src/auth/authenticator.rs index 018d9270d..811adc5d1 100644 --- a/crates/superposition/src/auth/authenticator.rs +++ b/crates/superposition/src/auth/authenticator.rs @@ -1,14 +1,8 @@ use std::fmt::Display; -use actix_web::{ - dev::ServiceRequest, - http::header::{HeaderMap, HeaderValue}, - web::Path, - HttpRequest, HttpResponse, Scope, -}; +use actix_web::{dev::ServiceRequest, web::Path, HttpRequest, HttpResponse, Scope}; use futures_util::future::LocalBoxFuture; use serde::Deserialize; -use service_utils::service::types::OrganisationId; use superposition_types::User; #[derive(Deserialize)] @@ -16,30 +10,6 @@ pub(super) struct SwitchOrgParams { pub(super) organisation_id: String, } -fn extract_org_from_header(headers: &HeaderMap) -> Option<&str> { - headers - .get("x-org-id") - .and_then(|header_value: &HeaderValue| header_value.to_str().ok()) -} - -fn extract_org_from_url(path: &str, match_pattern: Option) -> Option<&str> { - match_pattern.and_then(move |pattern| { - let pattern_segments = pattern.split('/'); - let path_segments = path.split('/').collect::>(); - - std::iter::zip(path_segments, pattern_segments) - .find(|(_, pattern_seg)| *pattern_seg == "{org_id}") - .map(|(path_seg, _)| path_seg) - }) -} - -fn extract_org_from_query_params(query_str: &str) -> Option<&str> { - query_str - .split('&') - .find(|segment| segment.contains("org=")) - .and_then(|tenant_query_param| tenant_query_param.split('=').nth(1)) -} - #[derive(Debug)] pub enum Login { None, @@ -74,13 +44,4 @@ pub trait Authenticator: Sync + Send { req: &HttpRequest, path: &Path, ) -> LocalBoxFuture<'static, actix_web::Result>; - - fn get_org_id(&self, request: &ServiceRequest) -> OrganisationId { - extract_org_from_header(request.headers()) - .or_else(|| extract_org_from_url(request.path(), request.match_pattern())) - .or_else(|| extract_org_from_query_params(request.query_string())) - .map(String::from) - .map(OrganisationId) - .unwrap_or_default() - } } diff --git a/crates/superposition/src/auth/oidc.rs b/crates/superposition/src/auth/oidc.rs index 9049ea472..8a1b68df6 100644 --- a/crates/superposition/src/auth/oidc.rs +++ b/crates/superposition/src/auth/oidc.rs @@ -17,7 +17,9 @@ use openidconnect::{ AuthenticationFlow, ClientId, ClientSecret, CsrfToken, IssuerUrl, Nonce, RedirectUrl, ResourceOwnerPassword, ResourceOwnerUsername, Scope, TokenResponse, TokenUrl, }; -use service_utils::helpers::get_from_env_unsafe; +use service_utils::{ + helpers::get_from_env_unsafe, middlewares::tenant::ServiceRequestExt, +}; use superposition_types::User; use types::{ GlobalUserClaims, GlobalUserTokenResponse, LoginParams, OrgUserClaims, @@ -240,9 +242,9 @@ impl OIDCAuthenticator { } fn get_org_user(&self, request: &ServiceRequest) -> Result { - let org_id = self.get_org_id(request); + let org_id = request.get_organisation_id().unwrap_or_default(); let token = request.cookie(&Login::Org.to_string()).and_then(|c| { - self.decode_org_token(&org_id.0, c.value()) + self.decode_org_token(&org_id, c.value()) .map_err(|e| log::error!("Error in decoding org_user : {e}")) .ok() }); diff --git a/crates/superposition_types/src/webhook.rs b/crates/superposition_types/src/webhook.rs index f06dafb26..209231143 100644 --- a/crates/superposition_types/src/webhook.rs +++ b/crates/superposition_types/src/webhook.rs @@ -7,14 +7,14 @@ use std::{ #[derive(Clone, Debug, Serialize, Deserialize)] pub enum HeadersEnum { ConfigVersion, - TenantId, + WorkspaceId, } impl fmt::Display for HeadersEnum { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::ConfigVersion => write!(f, "x-config-version"), - Self::TenantId => write!(f, "x-tenant"), + Self::WorkspaceId => write!(f, "x-tenant"), } } } @@ -53,7 +53,8 @@ pub enum WebhookEvent { pub struct WebhookEventInfo { pub webhook_event: WebhookEvent, pub time: String, - pub tenant_id: String, + pub workspace_id: String, + pub organisation_id: String, pub config_version: Option, } diff --git a/docs/setup.md b/docs/setup.md index 350c2ec2d..b78992ede 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -125,7 +125,6 @@ The following targets are available ### Environment Variables | Variable | Description | Default Value | |---|---|---| -| `ENABLE_TENANT_AND_SCOPE` | Enables multi-tenancy | `true` | | `TENANTS` | List of Tenants | `dev,test` | | `DOCKER_DNS` | DNS server to use within the container | `localhost` |