Skip to content

Commit

Permalink
Merge pull request #291 from ff137/feat/order-by-scan
Browse files Browse the repository at this point in the history
✨ add `order_by` and `descending` options to scan and fetch_all queries
  • Loading branch information
andrewwhitehead authored Aug 20, 2024
2 parents 7813768 + 7eb7763 commit 75226af
Show file tree
Hide file tree
Showing 19 changed files with 303 additions and 37 deletions.
22 changes: 16 additions & 6 deletions askar-storage/src/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{fmt::Debug, sync::Arc};

use super::{Backend, BackendSession, ManageBackend};
use crate::{
backend::OrderBy,
entry::{Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::BoxFuture,
Expand Down Expand Up @@ -72,9 +73,12 @@ impl<B: Backend> Backend for WrapBackend<B> {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down Expand Up @@ -142,9 +146,12 @@ impl Backend for AnyBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
self.0
.scan(profile, kind, category, tag_filter, offset, limit)
self.0.scan(
profile, kind, category, tag_filter, offset, limit, order_by, descending,
)
}

#[inline]
Expand Down Expand Up @@ -207,10 +214,13 @@ impl BackendSession for AnyBackendSession {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
self.0
.fetch_all(kind, category, tag_filter, limit, for_update)
self.0.fetch_all(
kind, category, tag_filter, limit, order_by, descending, for_update,
)
}

/// Remove all matching records from the store
Expand Down
28 changes: 25 additions & 3 deletions askar-storage/src/backend/db_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use crate::{
},
};

use super::OrderBy;

/// cbindgen:ignore
pub const PAGE_SIZE: usize = 32;

Expand Down Expand Up @@ -453,6 +455,17 @@ pub trait QueryPrepare {
}
query
}

fn order_by_query<'q>(mut query: String, order_by: OrderBy, descending: bool) -> String {
query.push_str(" ORDER BY ");
match order_by {
OrderBy::Id => query.push_str("id"),
}
if descending {
query.push_str(" DESC");
}
query
}
}

pub fn replace_arg_placeholders<Q: QueryPrepare + ?Sized>(
Expand Down Expand Up @@ -625,6 +638,8 @@ pub fn extend_query<'q, Q: QueryPrepare>(
tag_filter: Option<(String, Vec<Vec<u8>>)>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> Result<String, Error>
where
i64: for<'e> Encode<'e, Q::DB> + Type<Q::DB>,
Expand All @@ -636,9 +651,16 @@ where
query.push_str(" AND "); // assumes WHERE already occurs
query.push_str(&filter_clause);
};
if offset.is_some() || limit.is_some() {
query = Q::limit_query(query, args, offset, limit);
};
// Only add ordering, and limit/offset, if the query starts with SELECT
if query.trim_start().to_uppercase().starts_with("SELECT") {
if let Some(order_by_value) = order_by {
query = Q::order_by_query(query, order_by_value, descending);
};

if offset.is_some() || limit.is_some() {
query = Q::limit_query(query, args, offset, limit);
};
}
Ok(query)
}

Expand Down
27 changes: 26 additions & 1 deletion askar-storage/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ pub mod postgres;
/// Sqlite database support
pub mod sqlite;

/// Enum to support custom ordering in record queries
#[derive(Debug)]
pub enum OrderBy {
/// Order by ID field
Id,
}

impl Default for OrderBy {
fn default() -> Self {
OrderBy::Id
}
}
/// Represents a generic backend implementation
pub trait Backend: Debug + Send + Sync {
/// The type of session managed by this backend
Expand Down Expand Up @@ -54,6 +66,8 @@ pub trait Backend: Debug + Send + Sync {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>>;

/// Create a new session against the store
Expand Down Expand Up @@ -122,6 +136,8 @@ pub trait BackendSession: Debug + Send {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>>;

Expand Down Expand Up @@ -185,7 +201,16 @@ pub async fn copy_profile<A: Backend, B: Backend>(
to_profile: &str,
) -> Result<(), Error> {
let scan = from_backend
.scan(Some(from_profile.into()), None, None, None, None, None)
.scan(
Some(from_profile.into()),
None,
None,
None,
None,
None,
None,
false,
)
.await?;
if let Err(e) = to_backend.create_profile(Some(to_profile.into())).await {
if e.kind() != ErrorKind::Duplicate {
Expand Down
26 changes: 23 additions & 3 deletions askar-storage/src/backend/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::{
Backend, BackendSession,
};
use crate::{
backend::OrderBy,
entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::{unblock, BoxFuture},
Expand Down Expand Up @@ -268,6 +269,8 @@ impl Backend for PostgresBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -282,6 +285,8 @@ impl Backend for PostgresBackend {
tag_filter,
offset,
limit,
order_by,
descending,
false,
);
let stream = scan.then(move |enc_rows| {
Expand Down Expand Up @@ -347,8 +352,15 @@ impl BackendSession for DbSession<Postgres> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<PostgresBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<PostgresBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -424,6 +436,8 @@ impl BackendSession for DbSession<Postgres> {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
let category = category.map(|c| c.to_string());
Expand All @@ -440,6 +454,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
limit,
order_by,
descending,
for_update,
);
pin!(scan);
Expand Down Expand Up @@ -483,6 +499,8 @@ impl BackendSession for DbSession<Postgres> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -752,6 +770,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
for_update: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
Expand All @@ -772,7 +792,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let mut query = extend_query::<PostgresBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;
if for_update {
query.push_str(" FOR NO KEY UPDATE");
}
Expand Down
26 changes: 23 additions & 3 deletions askar-storage/src/backend/sqlite/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use super::{
Backend, BackendSession,
};
use crate::{
backend::OrderBy,
entry::{EncEntryTag, Entry, EntryKind, EntryOperation, EntryTag, Scan, TagFilter},
error::Error,
future::{unblock, BoxFuture},
Expand Down Expand Up @@ -262,6 +263,8 @@ impl Backend for SqliteBackend {
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> BoxFuture<'_, Result<Scan<'static, Entry>, Error>> {
Box::pin(async move {
let session = self.session(profile, false)?;
Expand All @@ -276,6 +279,8 @@ impl Backend for SqliteBackend {
tag_filter,
offset,
limit,
order_by,
descending,
);
let stream = scan.then(move |enc_rows| {
let category = category.clone();
Expand Down Expand Up @@ -330,8 +335,15 @@ impl BackendSession for DbSession<Sqlite> {
})
.await?;
params.push(enc_category);
let query =
extend_query::<SqliteBackend>(COUNT_QUERY, &mut params, tag_filter, None, None)?;
let query = extend_query::<SqliteBackend>(
COUNT_QUERY,
&mut params,
tag_filter,
None,
None,
None,
false,
)?;
let mut active = acquire_session(&mut *self).await?;
let count = sqlx::query_scalar_with(query.as_str(), params)
.fetch_one(active.connection_mut())
Expand Down Expand Up @@ -398,6 +410,8 @@ impl BackendSession for DbSession<Sqlite> {
category: Option<&'q str>,
tag_filter: Option<TagFilter>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
_for_update: bool,
) -> BoxFuture<'q, Result<Vec<Entry>, Error>> {
let category = category.map(|c| c.to_string());
Expand All @@ -413,6 +427,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
limit,
order_by,
descending,
);
pin!(scan);
let mut enc_rows = vec![];
Expand Down Expand Up @@ -455,6 +471,8 @@ impl BackendSession for DbSession<Sqlite> {
tag_filter,
None,
None,
None,
false,
)?;

let mut active = acquire_session(&mut *self).await?;
Expand Down Expand Up @@ -703,6 +721,8 @@ fn perform_scan(
tag_filter: Option<TagFilter>,
offset: Option<i64>,
limit: Option<i64>,
order_by: Option<OrderBy>,
descending: bool,
) -> impl Stream<Item = Result<Vec<EncScanEntry>, Error>> + '_ {
try_stream! {
let mut params = QueryParams::new();
Expand All @@ -720,7 +740,7 @@ fn perform_scan(
}
}).await?;
params.push(enc_category);
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit)?;
let query = extend_query::<SqliteBackend>(SCAN_QUERY, &mut params, tag_filter, offset, limit, order_by, descending)?;

let mut batch = Vec::with_capacity(PAGE_SIZE);

Expand Down
Loading

0 comments on commit 75226af

Please sign in to comment.