Skip to content

Commit

Permalink
fix: add test for optimistic locking and fix it
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Mar 15, 2024
1 parent 3910159 commit ef6cb08
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 28 deletions.
9 changes: 5 additions & 4 deletions common/auth/src/client/inject.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use super::{error::Error, Credentials, TokenProvider};
use async_trait::async_trait;
use std::future::Future;
use tracing::instrument;

/// Allows injecting tokens.
#[async_trait]
pub trait TokenInjector: Sized + Send + Sync {
async fn inject_token(self, token_provider: &dyn TokenProvider) -> Result<Self, Error>;
fn inject_token(
self,
token_provider: &dyn TokenProvider,
) -> impl Future<Output = Result<Self, Error>> + Send;
}

/// Injects tokens into a request by setting the authorization header to a "bearer" token.
#[async_trait]
impl TokenInjector for reqwest::RequestBuilder {
#[instrument(level = "debug", skip(token_provider), err)]
async fn inject_token(self, token_provider: &dyn TokenProvider) -> Result<Self, Error> {
Expand Down
37 changes: 20 additions & 17 deletions modules/importer/src/endpoints.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::service::{Error, ImporterService};
use crate::model::Revisioned;
use actix_web::http::header;
use actix_web::http::header::{ETag, EntityTag, IfMatch};
use actix_web::{delete, get, post, put, web, HttpResponse, Responder};
use serde_json::Value;
use trustify_common::db::Database;
Expand Down Expand Up @@ -43,7 +44,7 @@ async fn read(
.await?
.map(|Revisioned { value, revision }| {
HttpResponse::Ok()
.append_header((header::ETAG, revision))
.append_header((header::ETAG, ETag(EntityTag::new_strong(revision))))
.json(value)
}))
}
Expand All @@ -52,32 +53,34 @@ async fn read(
async fn update(
service: web::Data<ImporterService>,
name: web::Path<String>,
if_match: Option<web::Header<header::ETag>>,
web::Header(if_match): web::Header<IfMatch>,
web::Json(configuration): web::Json<Value>,
) -> Result<impl Responder, Error> {
let revision = match &if_match {
IfMatch::Any => None,
IfMatch::Items(items) => items.first().map(|etag| etag.tag()),
};

service
.update(
name.into_inner(),
configuration,
if_match.map(|v| v.to_string()).as_deref(),
)
.update(name.into_inner(), configuration, revision)
.await?;

Ok(HttpResponse::NoContent().finish())
}

#[delete("/{name}")]
async fn delete(
service: web::Data<ImporterService>,
name: web::Path<String>,
if_match: Option<web::Header<header::ETag>>,
web::Header(if_match): web::Header<IfMatch>,
) -> Result<impl Responder, Error> {
Ok(
match service
.delete(&name, if_match.map(|v| v.to_string()).as_deref())
.await?
{
true => HttpResponse::NoContent().finish(),
false => HttpResponse::NoContent().finish(),
},
)
let revision = match &if_match {
IfMatch::Any => None,
IfMatch::Items(items) => items.first().map(|etag| etag.tag()),
};

Ok(match service.delete(&name, revision).await? {
true => HttpResponse::NoContent().finish(),
false => HttpResponse::NoContent().finish(),
})
}
19 changes: 15 additions & 4 deletions modules/importer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use sea_orm::{
ActiveModelTrait, ActiveValue::Set, ColumnTrait, EntityTrait, PaginatorTrait, QueryFilter,
TransactionTrait,
};
use sea_query::Expr;
use sea_query::{Alias, Expr};
use serde_json::Value;
use trustify_common::db::Database;
use trustify_common::error::ErrorInformation;
Expand All @@ -32,7 +32,7 @@ impl ResponseError for Error {
message: self.to_string(),
details: None,
}),
Error::NotFound(_) => HttpResponse::Conflict().json(ErrorInformation {
Error::NotFound(_) => HttpResponse::NotFound().json(ErrorInformation {
error: "NotFound".into(),
message: self.to_string(),
details: None,
Expand Down Expand Up @@ -101,10 +101,16 @@ impl ImporterService {
) -> Result<(), Error> {
let mut update = importer::Entity::update_many()
.col_expr(importer::Column::Configuration, Expr::value(configuration))
.col_expr(importer::Column::Revision, Expr::value(Uuid::new_v4()))
.filter(importer::Column::Name.eq(&name));

if let Some(revision) = expected_revision {
update = update.filter(importer::Column::Revision.eq(revision));
update = update.filter(
importer::Column::Revision
.into_expr()
.cast_as(Alias::new("text"))
.eq(revision),
);
}

let result = update.exec(&self.db).await?;
Expand All @@ -125,7 +131,12 @@ impl ImporterService {
let mut delete = importer::Entity::delete_many().filter(importer::Column::Name.eq(name));

if let Some(revision) = expected_revision {
delete = delete.filter(importer::Column::Revision.eq(revision));
delete = delete.filter(
importer::Column::Revision
.into_expr()
.cast_as(Alias::new("text"))
.eq(revision),
);
}

let result = delete.exec(&self.db).await?;
Expand Down
179 changes: 177 additions & 2 deletions modules/importer/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#![cfg(test)]

use super::model::ImportConfiguration;
use actix_web::http::StatusCode;
use actix_web::{test, App};
use actix_web::{
http::{header, StatusCode},
test, App,
};
use serde_json::json;
use trustify_common::db::Database;

Expand Down Expand Up @@ -88,3 +90,176 @@ async fn test_default() {
let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[actix_web::test]
async fn test_oplock() {
env_logger::init();

let db = Database::for_test("test_oplock").await.unwrap();
let app =
test::init_service(App::new().configure(|svc| super::endpoints::configure(svc, db))).await;

// create one

let req = test::TestRequest::post()
.uri("/api/v1/importer/foo")
.set_json(json!({"foo":"bar"}))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::CREATED);

// update it (no lock)

let req = test::TestRequest::put()
.uri("/api/v1/importer/foo")
.set_json(json!({"foo":"baz"}))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NO_CONTENT);

// get it

let req = test::TestRequest::get()
.uri("/api/v1/importer/foo")
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);

let etag = resp.headers().get(header::ETAG);
assert!(etag.is_some());
let etag = etag.cloned().unwrap();

let result: ImportConfiguration = test::read_body_json(resp).await;
assert_eq!(
result,
ImportConfiguration {
name: "foo".into(),
configuration: json!({"foo":"baz"})
}
);

// update it (with lock)

let req = test::TestRequest::put()
.uri("/api/v1/importer/foo")
.set_json(json!({"foo":"buz"}))
.append_header((header::IF_MATCH, etag.clone()))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NO_CONTENT);

// get it

let req = test::TestRequest::get()
.uri("/api/v1/importer/foo")
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);

let result: ImportConfiguration = test::read_body_json(resp).await;
assert_eq!(
result,
ImportConfiguration {
name: "foo".into(),
configuration: json!({"foo":"buz"})
}
);

// update it (with broken lock)

let req = test::TestRequest::put()
.uri("/api/v1/importer/foo")
.set_json(json!({"foo":"boz"}))
.append_header((header::IF_MATCH, etag.clone()))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::PRECONDITION_FAILED);

// update it (with wrong name)

let req = test::TestRequest::put()
.uri("/api/v1/importer/foo2")
.set_json(json!({"foo":"boz"}))
.append_header((header::IF_MATCH, etag.clone()))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);

// get it (must not change)

let req = test::TestRequest::get()
.uri("/api/v1/importer/foo")
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);

let old_etag = etag;
let etag = resp.headers().get(header::ETAG);
assert!(etag.is_some());
let etag = etag.cloned().unwrap();
assert_ne!(old_etag, etag);

let result: ImportConfiguration = test::read_body_json(resp).await;
assert_eq!(
result,
ImportConfiguration {
name: "foo".into(),
configuration: json!({"foo":"buz"})
}
);

// delete it (wrong lock)

let req = test::TestRequest::delete()
.uri("/api/v1/importer/foo")
.append_header((header::IF_MATCH, old_etag.clone()))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NO_CONTENT);

// get it (must still be there)

let req = test::TestRequest::get()
.uri("/api/v1/importer/foo")
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::OK);

let result: ImportConfiguration = test::read_body_json(resp).await;
assert_eq!(
result,
ImportConfiguration {
name: "foo".into(),
configuration: json!({"foo":"buz"})
}
);

// delete it (correct lock)

let req = test::TestRequest::delete()
.uri("/api/v1/importer/foo")
.append_header((header::IF_MATCH, etag.clone()))
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NO_CONTENT);

// get none

let req = test::TestRequest::get()
.uri("/api/v1/importer/foo")
.to_request();

let resp = test::call_service(&app, req).await;
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.75.0"
channel = "1.76.0"
components = [ "rustfmt", "clippy" ]

0 comments on commit ef6cb08

Please sign in to comment.