diff --git a/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/down.sql b/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/down.sql new file mode 100644 index 00000000..d9a93fe9 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/down.sql @@ -0,0 +1 @@ +-- This file should undo anything in `up.sql` diff --git a/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/up.sql b/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/up.sql new file mode 100644 index 00000000..578ee153 --- /dev/null +++ b/crates/context_aware_config/migrations/2024-12-06-112348_webhooks/up.sql @@ -0,0 +1,23 @@ +-- Your SQL goes here +-- Name: webhooks; Type: TABLE; Schema: public; Owner: - +-- +CREATE TABLE public.webhooks ( + name text PRIMARY KEY, + description text NOT NULL, + enabled boolean NOT NULL DEFAULT true, + url text NOT NULL, + method text NOT NULL DEFAULT 'POST', + version text NOT NULL, + custom_headers json, + events varchar(100)[] NOT NULL CHECK (array_position(events, NULL) IS NULL), + max_retries integer NOT NULL DEFAULT 0, + last_triggered_at timestamp, + created_by text NOT NULL, + created_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP, + last_modified_by text NOT NULL, + last_modified_at timestamp without time zone NOT NULL DEFAULT CURRENT_TIMESTAMP +); +-- +-- Name: webhooks webhooks_audit; Type: TRIGGER; Schema: public; Owner: - +-- +CREATE TRIGGER webhooks_audit AFTER INSERT OR DELETE OR UPDATE ON public.webhooks FOR EACH ROW EXECUTE FUNCTION public.event_logger(); \ No newline at end of file diff --git a/crates/context_aware_config/src/api.rs b/crates/context_aware_config/src/api.rs index 11c521bc..a1e9bd40 100644 --- a/crates/context_aware_config/src/api.rs +++ b/crates/context_aware_config/src/api.rs @@ -5,3 +5,4 @@ pub mod default_config; pub mod dimension; pub mod functions; pub mod type_templates; +pub mod webhooks; diff --git a/crates/context_aware_config/src/api/webhooks.rs b/crates/context_aware_config/src/api/webhooks.rs new file mode 100644 index 00000000..a34458e6 --- /dev/null +++ b/crates/context_aware_config/src/api/webhooks.rs @@ -0,0 +1,3 @@ +mod handlers; +pub mod types; +pub use handlers::endpoints; diff --git a/crates/context_aware_config/src/api/webhooks/handlers.rs b/crates/context_aware_config/src/api/webhooks/handlers.rs new file mode 100644 index 00000000..4dda7ff7 --- /dev/null +++ b/crates/context_aware_config/src/api/webhooks/handlers.rs @@ -0,0 +1,238 @@ +use actix_web::{ + delete, get, post, put, + web::{self, Json, Query}, + HttpResponse, Scope, +}; +use chrono::Utc; +use service_utils::service::types::DbConnection; +use superposition_macros::{bad_argument, db_error, not_found, unexpected_error}; +use superposition_types::{ + cac::schema, custom_query::PaginationParams, result as superposition, + PaginatedResponse, User, +}; + +use super::types::{CreateWebhookRequest, WebhookName}; +use diesel::{delete, ExpressionMethods, QueryDsl, RunQueryDsl}; +use diesel::{ + r2d2::{ConnectionManager, PooledConnection}, + PgConnection, +}; +use superposition_types::cac::models::Webhooks; +use superposition_types::cac::schema::webhooks::dsl::webhooks; + +pub fn endpoints() -> Scope { + Scope::new("") + .service(create) + .service(list_webhooks) + .service(get) + .service(update) + .service(delete_webhook) +} + +#[post("")] +async fn create( + request: web::Json, + db_conn: DbConnection, + user: User, +) -> superposition::Result> { + let DbConnection(mut conn) = db_conn; + let req = request.into_inner(); + let events: Vec = req + .events + .into_iter() + .map(|event| event.to_string()) + .collect(); + validate_events(&events, None, &mut conn)?; + let now = Utc::now().naive_utc(); + let webhook = Webhooks { + name: req.name, + description: req.description, + enabled: req.enabled, + url: req.url, + method: req.method, + version: req.version.unwrap_or("v1".to_owned()), + custom_headers: req.custom_headers, + events, + max_retries: 0, + last_triggered_at: None, + created_by: user.email.clone(), + created_at: now, + last_modified_by: user.email, + last_modified_at: now, + }; + + let insert: Result = diesel::insert_into(webhooks) + .values(&webhook) + .get_result(&mut conn); + + match insert { + Ok(res) => Ok(Json(res)), + Err(e) => match e { + diesel::result::Error::DatabaseError(kind, e) => { + log::error!("Function error: {:?}", e); + match kind { + diesel::result::DatabaseErrorKind::UniqueViolation => { + Err(bad_argument!("Webhook already exists.")) + } + _ => Err(unexpected_error!( + "Something went wrong, failed to create the webhook" + )), + } + } + _ => { + log::error!("Webhook creation failed with error: {e}"); + Err(unexpected_error!( + "An error occured please contact the admin." + )) + } + }, + } +} + +#[get("")] +async fn list_webhooks( + db_conn: DbConnection, + filters: Query, +) -> superposition::Result>> { + let DbConnection(mut conn) = db_conn; + + let (total_pages, total_items, data) = match filters.all { + Some(true) => { + let result: Vec = webhooks.get_results(&mut conn)?; + (1, result.len() as i64, result) + } + _ => { + let n_functions: i64 = webhooks.count().get_result(&mut conn)?; + let limit = filters.count.unwrap_or(10); + let mut builder = webhooks + .into_boxed() + .order(schema::webhooks::last_modified_at.desc()) + .limit(limit); + if let Some(page) = filters.page { + let offset = (page - 1) * limit; + builder = builder.offset(offset); + } + let result: Vec = builder.load(&mut conn)?; + let total_pages = (n_functions as f64 / limit as f64).ceil() as i64; + (total_pages, n_functions, result) + } + }; + + Ok(Json(PaginatedResponse { + total_pages, + total_items, + data, + })) +} + +#[get("/{webhook_name}")] +async fn get( + params: web::Path, + db_conn: DbConnection, +) -> superposition::Result> { + let DbConnection(mut conn) = db_conn; + let w_name: String = params.into_inner().into(); + let webhook = fetch_webhook(&w_name, &mut conn)?; + Ok(Json(webhook)) +} + +pub fn validate_events( + events: &Vec, + exclude_webhook: Option<&String>, + conn: &mut PooledConnection>, +) -> superposition::Result<()> { + let result: Vec = webhooks.get_results(conn)?; + for webhook in result { + if exclude_webhook.map_or(false, |val| &webhook.name == val) { + continue; + } + if let Some(duplicate_event) = + webhook.events.iter().find(|event| events.contains(event)) + { + return Err(bad_argument!("Duplicate event found: {}", duplicate_event)); + } + } + Ok(()) +} + +pub fn fetch_webhook( + w_name: &String, + conn: &mut PooledConnection>, +) -> superposition::Result { + Ok(webhooks + .filter(schema::webhooks::name.eq(w_name)) + .get_result::(conn)?) +} + +#[put("/{webhook_name}")] +async fn update( + params: web::Path, + db_conn: DbConnection, + user: User, + request: web::Json, +) -> superposition::Result> { + let DbConnection(mut conn) = db_conn; + let req = request.into_inner(); + let w_name: String = params.into_inner().into(); + let events: Vec = req + .events + .into_iter() + .map(|event| event.to_string()) + .collect(); + + validate_events(&events, Some(&w_name), &mut conn)?; + + let update = diesel::update(webhooks) + .filter(schema::webhooks::name.eq(w_name)) + .set(( + schema::webhooks::description.eq(req.description), + schema::webhooks::enabled.eq(req.enabled), + schema::webhooks::url.eq(req.url), + schema::webhooks::method.eq(req.method), + schema::webhooks::version.eq(req.version.unwrap_or("v1".to_owned())), + schema::webhooks::custom_headers.eq(req.custom_headers), + schema::webhooks::events.eq(events), + schema::webhooks::last_modified_by.eq(user.email), + schema::webhooks::last_modified_at.eq(Utc::now().naive_utc()), + )) + .get_result::(&mut conn) + .map_err(|err| { + log::error!("failed to insert custom type with error: {}", err); + db_error!(err) + })?; + + Ok(Json(update)) +} + +#[delete("/{webhook_name}")] +async fn delete_webhook( + params: web::Path, + db_conn: DbConnection, + user: User, +) -> superposition::Result { + let DbConnection(mut conn) = db_conn; + let w_name: String = params.into_inner().into(); + + diesel::update(webhooks) + .filter(schema::webhooks::name.eq(&w_name)) + .set(( + schema::webhooks::last_modified_at.eq(Utc::now().naive_utc()), + schema::webhooks::last_modified_by.eq(user.get_email()), + )) + .execute(&mut conn)?; + let deleted_row = + delete(webhooks.filter(schema::webhooks::name.eq(&w_name))).execute(&mut conn); + match deleted_row { + Ok(0) => Err(not_found!("Webhook {} doesn't exists", w_name)), + Ok(_) => { + log::info!("{w_name} Webhook deleted by {}", user.get_email()); + Ok(HttpResponse::NoContent().finish()) + } + Err(e) => { + log::error!("Webhook delete query failed with error: {e}"); + Err(unexpected_error!( + "Something went wrong, failed to delete the Webhook" + )) + } + } +} diff --git a/crates/context_aware_config/src/api/webhooks/helper.rs b/crates/context_aware_config/src/api/webhooks/helper.rs new file mode 100644 index 00000000..e69de29b diff --git a/crates/context_aware_config/src/api/webhooks/types.rs b/crates/context_aware_config/src/api/webhooks/types.rs new file mode 100644 index 00000000..734afd07 --- /dev/null +++ b/crates/context_aware_config/src/api/webhooks/types.rs @@ -0,0 +1,35 @@ +use derive_more::{AsRef, Deref, DerefMut, Into}; +use serde::Deserialize; +use serde_json::Value; +use superposition_types::{webhook::WebhookEvent, RegexEnum}; + +#[derive(Debug, Deserialize)] +pub struct CreateWebhookRequest { + pub name: String, + pub description: String, + pub enabled: bool, + pub url: String, + pub method: String, + pub version: Option, + pub custom_headers: Option, + pub events: Vec, +} + +#[derive(Debug, Deserialize, AsRef, Deref, DerefMut, Into)] +#[serde(try_from = "String")] +pub struct WebhookName(String); +impl WebhookName { + pub fn validate_data(name: String) -> Result { + let name = name.trim(); + RegexEnum::FunctionName + .match_regex(name) + .map(|_| Self(name.to_string())) + } +} + +impl TryFrom for WebhookName { + type Error = String; + fn try_from(value: String) -> Result { + Ok(Self::validate_data(value)?) + } +} diff --git a/crates/service_utils/src/helpers.rs b/crates/service_utils/src/helpers.rs index 85133bce..ac63667c 100644 --- a/crates/service_utils/src/helpers.rs +++ b/crates/service_utils/src/helpers.rs @@ -449,7 +449,7 @@ where }) .send() .await; - + println!("<<>> response: {:?}", response); match response { Ok(res) => { match res.status() { diff --git a/crates/superposition/src/main.rs b/crates/superposition/src/main.rs index 464b3efd..bd1508a8 100644 --- a/crates/superposition/src/main.rs +++ b/crates/superposition/src/main.rs @@ -197,6 +197,11 @@ async fn main() -> Result<()> { .wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::SUPERPOSITION)) .service(organisation::endpoints()), ) + .service( + scope("/webhook") + .wrap(AppExecutionScopeMiddlewareFactory::new(AppScope::CAC)) + .service(webhooks::endpoints()), + ) /***************************** UI Routes ******************************/ .route("/fxn/{tail:.*}", leptos_actix::handle_server_fns()) // serve JS/WASM/CSS from `pkg` diff --git a/crates/superposition_types/src/cac/schema.patch b/crates/superposition_types/src/cac/schema.patch new file mode 100644 index 00000000..50c7aa7e --- /dev/null +++ b/crates/superposition_types/src/cac/schema.patch @@ -0,0 +1,39 @@ +diff --git a/crates/context_aware_config/src/db/schema.rs b/crates/context_aware_config/src/db/schema.rs +index 15c2eee..25c8088 100644 +--- a/crates/context_aware_config/src/db/schema.rs ++++ b/crates/context_aware_config/src/db/schema.rs +@@ -2,13 +2,13 @@ + + diesel::table! { + config_versions (id) { + id -> Int8, + config -> Json, + config_hash -> Text, +- tags -> Nullable>>, ++ tags -> Nullable>, + created_at -> Timestamp, + } + } + +diesel::table! { + webhooks (name) { + name -> Text, + description -> Text, + enabled -> Bool, + url -> Text, + method -> Text, + version -> Text, + custom_headers -> Nullable, +- events -> Array>, ++ events -> Array, + max_retries -> Int4, + last_triggered_at -> Nullable, + created_by -> Text, + created_at -> Timestamp, + last_modified_by -> Text, + last_modified_at -> Timestamp, + } +} + +diesel::table! { + contexts (id) { diff --git a/crates/superposition_types/src/database/models/cac.rs b/crates/superposition_types/src/database/models/cac.rs index 73ed532d..50e96f71 100644 --- a/crates/superposition_types/src/database/models/cac.rs +++ b/crates/superposition_types/src/database/models/cac.rs @@ -10,7 +10,7 @@ use crate::{Cac, Condition, Contextual, Overridden, Overrides}; #[cfg(feature = "diesel_derives")] use super::super::schema::{ config_versions, contexts, default_configs, dimensions, event_log, functions, - type_templates, + type_templates, webhooks, }; #[derive(Clone, Serialize, Debug)] @@ -148,3 +148,28 @@ pub struct TypeTemplate { pub last_modified_at: NaiveDateTime, pub last_modified_by: String, } + +#[derive(Serialize, Clone, Debug)] +#[cfg_attr( + feature = "diesel_derives", + derive(Queryable, Selectable, Insertable, AsChangeset) +)] +#[cfg_attr(feature = "diesel_derives", diesel(check_for_backend(diesel::pg::Pg)))] +#[cfg_attr(feature = "diesel_derives", diesel(table_name = webhooks))] +#[cfg_attr(feature = "diesel_derives", diesel(primary_key(name)))] +pub struct Webhooks { + pub name: String, + pub description: String, + pub enabled: bool, + pub url: String, + pub method: String, + pub version: String, + pub custom_headers: Option, + pub events: Vec, + pub max_retries: i32, + pub last_triggered_at: Option, + pub created_by: String, + pub created_at: NaiveDateTime, + pub last_modified_by: String, + pub last_modified_at: NaiveDateTime, +} diff --git a/crates/superposition_types/src/database/schema.rs b/crates/superposition_types/src/database/schema.rs index fad44535..f5cfeef8 100644 --- a/crates/superposition_types/src/database/schema.rs +++ b/crates/superposition_types/src/database/schema.rs @@ -689,6 +689,25 @@ diesel::table! { } } +diesel::table! { + webhooks (name) { + name -> Text, + description -> Text, + enabled -> Bool, + url -> Text, + method -> Text, + version -> Text, + custom_headers -> Nullable, + events -> Array, + max_retries -> Int4, + last_triggered_at -> Nullable, + created_by -> Text, + created_at -> Timestamp, + last_modified_by -> Text, + last_modified_at -> Timestamp, + } +} + diesel::joinable!(default_configs -> functions (function_name)); diesel::joinable!(dimensions -> functions (function_name)); @@ -743,4 +762,5 @@ diesel::allow_tables_to_appear_in_same_query!( functions, organisations, type_templates, + webhooks, ); diff --git a/crates/superposition_types/src/webhook.rs b/crates/superposition_types/src/webhook.rs index f06dafb2..53ee0a6e 100644 --- a/crates/superposition_types/src/webhook.rs +++ b/crates/superposition_types/src/webhook.rs @@ -40,7 +40,7 @@ pub struct Webhook { pub authorization: Option, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] pub enum WebhookEvent { ExperimentCreated, ExperimentStarted, @@ -49,7 +49,19 @@ pub enum WebhookEvent { ExperimentConcluded, } -#[derive(Serialize, Deserialize)] +impl fmt::Display for WebhookEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WebhookEvent::ExperimentCreated => write!(f, "ExperimentCreated"), + WebhookEvent::ExperimentStarted => write!(f, "ExperimentStarted"), + WebhookEvent::ExperimentInprogress => write!(f, "ExperimentInprogress"), + WebhookEvent::ExperimentUpdated => write!(f, "ExperimentUpdated"), + WebhookEvent::ExperimentConcluded => write!(f, "ExperimentConcluded"), + } + } +} + +#[derive(Serialize, Deserialize, Debug)] pub struct WebhookEventInfo { pub webhook_event: WebhookEvent, pub time: String, @@ -57,7 +69,7 @@ pub struct WebhookEventInfo { pub config_version: Option, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Debug)] pub struct WebhookResponse { pub event_info: WebhookEventInfo, pub payload: T, diff --git a/setup b/setup new file mode 100755 index 00000000..3885846e --- /dev/null +++ b/setup @@ -0,0 +1,3 @@ +#!bin/sh +make setup && make tenant TENANT=mjos && make tenant TENANT=sdk_config +npm i \ No newline at end of file