Skip to content

Commit

Permalink
refactor: OrgId, WorkspaceId, SchemaName cleanup and refactor (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
ayushjain17 authored Jan 23, 2025
1 parent 7eb8ee5 commit 48213f1
Show file tree
Hide file tree
Showing 26 changed files with 513 additions and 664 deletions.
1 change: 0 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ SUPERPOSITION_VERSION="v0.1.0"
HOSTNAME="<application_name>-<deployment_id>-<replicaset>-<pod>"
ACTIX_KEEP_ALIVE=120
MAX_DB_CONNECTION_POOL_SIZE=3
ENABLE_TENANT_AND_SCOPE=true
TENANTS=dev,test,superposition
TENANT_MIDDLEWARE_EXCLUSION_LIST="/health,/assets/favicon.ico,/pkg/frontend.js,/pkg,/pkg/frontend_bg.wasm,/pkg/tailwind.css,/pkg/style.css,/assets,/admin,/oidc/login,/admin/organisations,/organisations,/organisations/switch/{organisation_id},/"
SERVICE_PREFIX=""
Expand Down
6 changes: 3 additions & 3 deletions crates/context_aware_config/src/api/audit_log/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use actix_web::{
};
use chrono::{Duration, Utc};
use diesel::{ExpressionMethods, QueryDsl, RunQueryDsl};
use service_utils::service::types::{DbConnection, Tenant};
use service_utils::service::types::{DbConnection, SchemaName};
use superposition_types::{
database::{models::cac::EventLog, schema::event_log::dsl as event_log},
result as superposition, PaginatedResponse,
Expand All @@ -21,12 +21,12 @@ pub fn endpoints() -> Scope {
async fn get_audit_logs(
filters: Query<AuditQueryFilters>,
db_conn: DbConnection,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<EventLog>>> {
let DbConnection(mut conn) = db_conn;

let query_builder = |filters: &AuditQueryFilters| {
let mut builder = event_log::event_log.schema_name(&tenant).into_boxed();
let mut builder = event_log::event_log.schema_name(&schema_name).into_boxed();
if let Some(tables) = filters.table.clone() {
builder = builder.filter(event_log::table_name.eq_any(tables.0));
}
Expand Down
85 changes: 45 additions & 40 deletions crates/context_aware_config/src/api/config/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@ use itertools::Itertools;
use serde_json::{json, Map, Value};
#[cfg(feature = "high-performance-mode")]
use service_utils::service::types::AppState;
use service_utils::service::types::Tenant;
use service_utils::{
helpers::extract_dimensions,
service::types::{AppHeader, DbConnection},
service::types::{AppHeader, DbConnection, SchemaName},
};
#[cfg(feature = "high-performance-mode")]
use superposition_macros::response_error;
Expand Down Expand Up @@ -81,13 +80,13 @@ fn validate_version_in_params(
pub fn add_audit_id_to_header(
conn: &mut DBConnection,
resp_builder: &mut HttpResponseBuilder,
tenant: &Tenant,
schema_name: &SchemaName,
) {
if let Ok(uuid) = event_log::event_log
.select(event_log::id)
.filter(event_log::table_name.eq("contexts"))
.order_by(event_log::timestamp.desc())
.schema_name(tenant)
.schema_name(schema_name)
.first::<Uuid>(conn)
{
resp_builder.insert_header((AppHeader::XAuditId.to_string(), uuid.to_string()));
Expand Down Expand Up @@ -126,12 +125,12 @@ fn add_config_version_to_header(

fn get_max_created_at(
conn: &mut DBConnection,
tenant: &Tenant,
schema_name: &SchemaName,
) -> Result<NaiveDateTime, diesel::result::Error> {
event_log::event_log
.select(max(event_log::timestamp))
.filter(event_log::table_name.eq_any(vec!["contexts", "default_configs"]))
.schema_name(tenant)
.schema_name(schema_name)
.first::<Option<NaiveDateTime>>(conn)
.and_then(|res| res.ok_or(diesel::result::Error::NotFound))
}
Expand All @@ -156,14 +155,14 @@ fn is_not_modified(max_created_at: Option<NaiveDateTime>, req: &HttpRequest) ->
pub fn generate_config_from_version(
version: &mut Option<i64>,
conn: &mut DBConnection,
tenant: &Tenant,
schema_name: &SchemaName,
) -> superposition::Result<Config> {
if let Some(val) = version {
let val = val.clone();
let config = config_versions::config_versions
.select(config_versions::config)
.filter(config_versions::id.eq(val))
.schema_name(tenant)
.schema_name(schema_name)
.get_result::<Value>(conn)
.map_err(|err| {
log::error!("failed to fetch config with error: {}", err);
Expand All @@ -177,19 +176,19 @@ pub fn generate_config_from_version(
match config_versions::config_versions
.select((config_versions::id, config_versions::config))
.order(config_versions::created_at.desc())
.schema_name(tenant)
.schema_name(schema_name)
.first::<(i64, Value)>(conn)
{
Ok((latest_version, config)) => {
*version = Some(latest_version);
serde_json::from_value::<Config>(config).or_else(|err| {
log::error!("failed to decode config: {}", err);
generate_cac(conn, tenant)
generate_cac(conn, schema_name)
})
}
Err(err) => {
log::error!("failed to find latest config: {err}");
generate_cac(conn, tenant)
generate_cac(conn, schema_name)
}
}
}
Expand Down Expand Up @@ -410,15 +409,15 @@ fn construct_new_payload(

#[allow(clippy::too_many_arguments)]
async fn reduce_config_key(
user: User,
user: &User,
conn: &mut DBConnection,
mut og_contexts: Vec<Context>,
mut og_overrides: HashMap<String, Overrides>,
check_key: &str,
dimension_schema_map: &HashMap<String, DimensionData>,
default_config: Map<String, Value>,
is_approve: bool,
tenant: Tenant,
schema_name: &SchemaName,
) -> superposition::Result<Config> {
let default_config_val =
default_config
Expand Down Expand Up @@ -489,15 +488,21 @@ async fn reduce_config_key(

if *to_be_deleted {
if is_approve {
let _ = context::delete(cid.clone(), user.clone(), conn, &tenant);
let _ = context::delete(cid.clone(), user, conn, schema_name);
}
og_contexts.retain(|x| x.id != *cid);
} else {
if is_approve {
let _ = context::delete(cid.clone(), user.clone(), conn, &tenant);
let _ = context::delete(cid.clone(), user, conn, schema_name);
if let Ok(put_req) = construct_new_payload(request_payload) {
let _ =
context::put(put_req, conn, false, &user, &tenant, false);
let _ = context::put(
put_req,
conn,
false,
&user,
schema_name,
false,
);
}
}

Expand Down Expand Up @@ -541,7 +546,7 @@ async fn reduce_config(
req: HttpRequest,
user: User,
db_conn: DbConnection,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let is_approve = req
Expand All @@ -550,28 +555,28 @@ async fn reduce_config(
.and_then(|value| value.to_str().ok().and_then(|s| s.parse::<bool>().ok()))
.unwrap_or(false);

let dimensions_vec = get_dimension_data(&mut conn, &tenant)?;
let dimensions_vec = get_dimension_data(&mut conn, &schema_name)?;
let dimensions_data_map = get_dimension_data_map(&dimensions_vec)?;
let mut config = generate_cac(&mut conn, &tenant)?;
let mut config = generate_cac(&mut conn, &schema_name)?;
let default_config = (config.default_configs).clone();
for (key, _) in default_config {
let contexts = config.contexts;
let overrides = config.overrides;
let default_config = config.default_configs;
config = reduce_config_key(
user.clone(),
&user,
&mut conn,
contexts.clone(),
overrides.clone(),
key.as_str(),
&dimensions_data_map,
default_config.clone(),
is_approve,
tenant.clone(),
&schema_name,
)
.await?;
if is_approve {
config = generate_cac(&mut conn, &tenant)?;
config = generate_cac(&mut conn, &schema_name)?;
}
}

Expand All @@ -581,16 +586,16 @@ async fn reduce_config(
#[cfg(feature = "high-performance-mode")]
#[get("/fast")]
async fn get_config_fast(
tenant: Tenant,
schema_name: SchemaName,
state: Data<AppState>,
) -> superposition::Result<HttpResponse> {
use fred::interfaces::MetricsInterface;

log::debug!("Started redis fetch");
let config_key = format!("{}::cac_config", *tenant);
let last_modified_at_key = format!("{}::cac_config::last_modified_at", *tenant);
let audit_id_key = format!("{}::cac_config::audit_id", *tenant);
let config_version_key = format!("{}::cac_config::config_version", *tenant);
let config_key = format!("{}::cac_config", *schema_name);
let last_modified_at_key = format!("{}::cac_config::last_modified_at", *schema_name);
let audit_id_key = format!("{}::cac_config::audit_id", *schema_name);
let config_version_key = format!("{}::cac_config::config_version", *schema_name);
let client = state.redis.next_connected();
let config = client.get::<String, String>(config_key).await;
let metrics = client.take_latency_metrics();
Expand Down Expand Up @@ -672,11 +677,11 @@ async fn get_config(
req: HttpRequest,
db_conn: DbConnection,
query_map: superposition_query::Query<QueryMap>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;

let max_created_at = get_max_created_at(&mut conn, &tenant)
let max_created_at = get_max_created_at(&mut conn, &schema_name)
.map_err(|e| log::error!("failed to fetch max timestamp from event_log: {e}"))
.ok();

Expand All @@ -691,7 +696,7 @@ async fn get_config(
let mut query_params_map = query_map.into_inner();
let mut config_version = validate_version_in_params(&mut query_params_map)?;
let mut config =
generate_config_from_version(&mut config_version, &mut conn, &tenant)?;
generate_config_from_version(&mut config_version, &mut conn, &schema_name)?;

config = apply_prefix_filter_to_config(&mut query_params_map, config)?;

Expand All @@ -701,7 +706,7 @@ async fn get_config(

let mut response = HttpResponse::Ok();
add_last_modified_to_header(max_created_at, &mut response);
add_audit_id_to_header(&mut conn, &mut response, &tenant);
add_audit_id_to_header(&mut conn, &mut response, &schema_name);
add_config_version_to_header(&config_version, &mut response);
Ok(response.json(config))
}
Expand All @@ -711,12 +716,12 @@ async fn get_resolved_config(
req: HttpRequest,
db_conn: DbConnection,
query_map: superposition_query::Query<QueryMap>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<HttpResponse> {
let DbConnection(mut conn) = db_conn;
let mut query_params_map = query_map.into_inner();

let max_created_at = get_max_created_at(&mut conn, &tenant)
let max_created_at = get_max_created_at(&mut conn, &schema_name)
.map_err(|e| log::error!("failed to fetch max timestamp from event_log : {e}"))
.ok();

Expand All @@ -728,7 +733,7 @@ async fn get_resolved_config(

let mut config_version = validate_version_in_params(&mut query_params_map)?;
let mut config =
generate_config_from_version(&mut config_version, &mut conn, &tenant)?;
generate_config_from_version(&mut config_version, &mut conn, &schema_name)?;

config = apply_prefix_filter_to_config(&mut query_params_map, config)?;

Expand Down Expand Up @@ -772,7 +777,7 @@ async fn get_resolved_config(
};
let mut resp = HttpResponse::Ok();
add_last_modified_to_header(max_created_at, &mut resp);
add_audit_id_to_header(&mut conn, &mut resp, &tenant);
add_audit_id_to_header(&mut conn, &mut resp, &schema_name);
add_config_version_to_header(&config_version, &mut resp);

Ok(resp.json(response))
Expand All @@ -782,13 +787,13 @@ async fn get_resolved_config(
async fn get_config_versions(
db_conn: DbConnection,
filters: Query<PaginationParams>,
tenant: Tenant,
schema_name: SchemaName,
) -> superposition::Result<Json<PaginatedResponse<ConfigVersion>>> {
let DbConnection(mut conn) = db_conn;

if let Some(true) = filters.all {
let config_versions: Vec<ConfigVersion> = config_versions::config_versions
.schema_name(&tenant)
.schema_name(&schema_name)
.get_results(&mut conn)?;
return Ok(Json(PaginatedResponse {
total_pages: 1,
Expand All @@ -799,12 +804,12 @@ async fn get_config_versions(

let n_version: i64 = config_versions::config_versions
.count()
.schema_name(&tenant)
.schema_name(&schema_name)
.get_result(&mut conn)?;

let limit = filters.count.unwrap_or(10);
let mut builder = config_versions::config_versions
.schema_name(&tenant)
.schema_name(&schema_name)
.into_boxed()
.order(config_versions::created_at.desc())
.limit(limit);
Expand Down
Loading

0 comments on commit 48213f1

Please sign in to comment.