diff --git a/Cargo.lock b/Cargo.lock index a934c02e..882439ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -819,6 +819,7 @@ dependencies = [ "openssl", "protobuf", "rand 0.8.5", + "redis", "regex", "reqwest 0.12.9", "sentry", @@ -1114,6 +1115,20 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "config" version = "0.14.1" @@ -3111,6 +3126,28 @@ dependencies = [ "rand_core 0.5.1", ] +[[package]] +name = "redis" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f89727cba9cec05cc579942321ff6dd09fe57a8b3217f52f952301efa010da5" +dependencies = [ + "arc-swap", + "bytes", + "combine", + "futures-util", + "itoa", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "ryu", + "sha1_smol", + "socket2", + "tokio", + "tokio-util", + "url", +] + [[package]] name = "redox_syscall" version = "0.2.16" @@ -3692,6 +3729,12 @@ dependencies = [ "digest", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.8" diff --git a/Cargo.toml b/Cargo.toml index c6f5c184..0ce080b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -103,7 +103,7 @@ autoconnect_settings = { path = "./autoconnect/autoconnect-settings" } autoconnect_web = { path = "./autoconnect/autoconnect-web" } autoconnect_ws = { path = "./autoconnect/autoconnect-ws" } autoconnect_ws_clientsm = { path = "./autoconnect/autoconnect-ws/autoconnect-ws-clientsm" } -autopush_common = { path = "./autopush-common", features = ["bigtable"] } +autopush_common = { path = "./autopush-common" } [profile.release] debug = 1 diff --git a/Dockerfile b/Dockerfile index 224ef7de..7b3f7ede 100644 --- a/Dockerfile +++ b/Dockerfile @@ -2,6 +2,7 @@ # RUST_VER FROM rust:1.83-bookworm AS builder ARG CRATE +ARG BUILD_ARGS ADD . /app WORKDIR /app @@ -16,7 +17,7 @@ RUN \ cargo --version && \ rustc --version && \ mkdir -m 755 bin && \ - cargo install --path $CRATE --locked --root /app + cargo install --path $CRATE $BUILD_ARGS --locked --root /app FROM debian:bookworm-slim diff --git a/autoconnect/Cargo.toml b/autoconnect/Cargo.toml index c0ba4ba2..b2c95301 100644 --- a/autoconnect/Cargo.toml +++ b/autoconnect/Cargo.toml @@ -56,4 +56,5 @@ docopt = "1.1" default = ["bigtable"] bigtable = ["autopush_common/bigtable", "autoconnect_settings/bigtable"] emulator = ["bigtable"] +redis = ["autopush_common/redis", "autoconnect_settings/redis"] log_vapid = [] diff --git a/autoconnect/autoconnect-settings/Cargo.toml b/autoconnect/autoconnect-settings/Cargo.toml index 32c04aeb..4e0bc8bd 100644 --- a/autoconnect/autoconnect-settings/Cargo.toml +++ b/autoconnect/autoconnect-settings/Cargo.toml @@ -25,3 +25,4 @@ autopush_common.workspace = true # specify the default via the calling crate, in order to simplify default chains. bigtable = ["autopush_common/bigtable"] emulator = ["bigtable"] +redis = ["autopush_common/redis"] diff --git a/autoconnect/autoconnect-settings/src/app_state.rs b/autoconnect/autoconnect-settings/src/app_state.rs index 8383c6ee..b62bef93 100644 --- a/autoconnect/autoconnect-settings/src/app_state.rs +++ b/autoconnect/autoconnect-settings/src/app_state.rs @@ -2,6 +2,8 @@ use std::{sync::Arc, time::Duration}; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "redis")] +use autopush_common::db::redis::RedisClientImpl; use cadence::StatsdClient; use config::ConfigError; use fernet::{Fernet, MultiFernet}; @@ -78,6 +80,11 @@ impl AppState { client.spawn_sweeper(Duration::from_secs(30)); Box::new(client) } + #[cfg(feature = "redis")] + StorageType::Redis => Box::new( + RedisClientImpl::new(metrics.clone(), &db_settings) + .map_err(|e| ConfigError::Message(e.to_string()))?, + ), _ => panic!( "Invalid Storage type {:?}. Check {}__DB_DSN.", storage_type, diff --git a/autoconnect/autoconnect-settings/src/lib.rs b/autoconnect/autoconnect-settings/src/lib.rs index f022fe16..16b47614 100644 --- a/autoconnect/autoconnect-settings/src/lib.rs +++ b/autoconnect/autoconnect-settings/src/lib.rs @@ -11,7 +11,6 @@ use config::{Config, ConfigError, Environment, File}; use fernet::Fernet; use lazy_static::lazy_static; use serde::{Deserialize, Deserializer}; -use serde_json::json; use autopush_common::util::deserialize_u32_to_duration; @@ -218,6 +217,7 @@ impl Settings { Ok(()) } + #[cfg(feature = "bigtable")] pub fn test_settings() -> Self { let db_dsn = Some("grpc://localhost:8086".to_string()); // BigTable DB_SETTINGS. @@ -234,6 +234,17 @@ impl Settings { ..Default::default() } } + + #[cfg(all(feature = "redis", not(feature = "bigtable")))] + pub fn test_settings() -> Self { + let db_dsn = Some("redis://localhost".to_string()); + let db_settings = "".to_string(); + Self { + db_dsn, + db_settings, + ..Default::default() + } + } } fn deserialize_f64_to_duration<'de, D>(deserializer: D) -> Result diff --git a/autoendpoint/Cargo.toml b/autoendpoint/Cargo.toml index a541fc44..f58b9c86 100644 --- a/autoendpoint/Cargo.toml +++ b/autoendpoint/Cargo.toml @@ -75,6 +75,8 @@ bigtable = ["autopush_common/bigtable"] # enable emulator to call locally run data store. emulator = ["bigtable"] +redis = ["autopush_common/redis"] + # Enable "stub" router for local testing purposes. # The "stub" will return specified error strings or success # depending on which `app_id` client is called based on the registration diff --git a/autoendpoint/src/server.rs b/autoendpoint/src/server.rs index 86444951..a05f3fbe 100644 --- a/autoendpoint/src/server.rs +++ b/autoendpoint/src/server.rs @@ -13,6 +13,8 @@ use serde_json::json; #[cfg(feature = "bigtable")] use autopush_common::db::bigtable::BigTableClientImpl; +#[cfg(feature = "redis")] +use autopush_common::db::redis::RedisClientImpl; use autopush_common::{ db::{client::DbClient, spawn_pool_periodic_reporter, DbSettings, StorageType}, middleware::sentry::SentryWrapper, @@ -77,6 +79,8 @@ impl Server { client.spawn_sweeper(Duration::from_secs(30)); Box::new(client) } + #[cfg(feature = "redis")] + StorageType::Redis => Box::new(RedisClientImpl::new(metrics.clone(), &db_settings)?), _ => { debug!("No idea what {:?} is", &db_settings.dsn); return Err(ApiErrorKind::General( diff --git a/autopush-common/Cargo.toml b/autopush-common/Cargo.toml index 053681e2..6c9d473f 100644 --- a/autopush-common/Cargo.toml +++ b/autopush-common/Cargo.toml @@ -59,6 +59,7 @@ grpcio = { version = "=0.13.0", features = ["openssl"], optional = true } grpcio-sys = { version = "=0.13.0", optional = true } protobuf = { version = "=2.28.0", optional = true } # grpcio does not support protobuf 3+ form_urlencoded = { version = "1.2", optional = true } +redis = { version = "0.28.1", features = ["aio", "tokio-comp"]} [dev-dependencies] mockito = "0.31" @@ -80,3 +81,4 @@ bigtable = [ emulator = [ "bigtable", ] # used for testing big table, requires an external bigtable emulator running. +redis = [] diff --git a/autopush-common/build.rs b/autopush-common/build.rs index 06440274..a04ae546 100644 --- a/autopush-common/build.rs +++ b/autopush-common/build.rs @@ -1,5 +1,5 @@ pub fn main() { - if !cfg!(feature = "bigtable") { - panic!("No database defined! Please compile with `features=bigtable`"); + if !cfg!(feature = "bigtable") && !cfg!(feature = "redis") { + panic!("No database defined! Please compile with `features=bigtable` (or redis)"); } } diff --git a/autopush-common/src/db/error.rs b/autopush-common/src/db/error.rs index a30dc915..2bb82895 100644 --- a/autopush-common/src/db/error.rs +++ b/autopush-common/src/db/error.rs @@ -26,6 +26,10 @@ pub enum DbError { #[error("BigTable error: {0}")] BTError(#[from] BigTableError), + #[cfg(feature = "redis")] + #[error("Redis error {0}")] + RedisError(#[from] redis::RedisError), + #[error("Connection failure: {0}")] ConnectionError(String), diff --git a/autopush-common/src/db/mod.rs b/autopush-common/src/db/mod.rs index 70a2f025..c34aafff 100644 --- a/autopush-common/src/db/mod.rs +++ b/autopush-common/src/db/mod.rs @@ -24,6 +24,8 @@ pub mod bigtable; pub mod client; pub mod error; pub mod models; +#[cfg(feature = "redis")] +pub mod redis; pub mod reporter; pub mod routing; @@ -45,6 +47,8 @@ pub enum StorageType { INVALID, #[cfg(feature = "bigtable")] BigTable, + #[cfg(feature = "redis")] + Redis, } impl From<&str> for StorageType { @@ -52,6 +56,8 @@ impl From<&str> for StorageType { match name.to_lowercase().as_str() { #[cfg(feature = "bigtable")] "bigtable" => Self::BigTable, + #[cfg(feature = "redis")] + "redis" => Self::Redis, _ => Self::INVALID, } } @@ -65,6 +71,8 @@ impl StorageType { let mut result: Vec<&str> = Vec::new(); #[cfg(feature = "bigtable")] result.push("Bigtable"); + #[cfg(feature = "redis")] + result.push("Redis"); result } @@ -90,6 +98,11 @@ impl StorageType { } return Self::BigTable; } + #[cfg(feature = "redis")] + if dsn.starts_with("redis") { + trace!("Found redis"); + return Self::Redis; + } Self::INVALID } } diff --git a/autopush-common/src/db/redis/mod.rs b/autopush-common/src/db/redis/mod.rs new file mode 100644 index 00000000..2921590b --- /dev/null +++ b/autopush-common/src/db/redis/mod.rs @@ -0,0 +1,64 @@ +/// This uses redis as a storage and management +/// system for Autopush Notifications and Routing information. +/// +/// Keys for the data are +/// `autopush/user/{uaid}` String to store the user data +/// `autopush/co/{uaid}` u64 to store the last time the user has interacted with the server +/// `autopush/channels/{uaid}` List to store the list of the channels of the user +/// `autopush/msgs/{uaid}` SortedSet to store the list of the pending message ids for the user +/// `autopush/msgs_exp/{uaid}` SortedSet to store the list of the pending message ids, ordered by expiry date, this is because SortedSet elements can't have independant expiry date +/// `autopush/msg/{uaid}/{chidmessageid}`, with `{chidmessageid} == {chid}:{version}` String to store +/// the content of the messages +/// +mod redis_client; + +pub use redis_client::RedisClientImpl; + +use serde::Deserialize; +use std::time::Duration; + +use crate::db::error::DbError; +use crate::util::deserialize_opt_u32_to_duration; + +/// The settings for accessing the redis contents. +#[derive(Clone, Debug, Deserialize)] +pub struct RedisDbSettings { + #[serde(default)] + #[serde(deserialize_with = "deserialize_opt_u32_to_duration")] + pub timeout: Option, +} + +// Used by test, but we don't want available for release. +#[allow(clippy::derivable_impls)] +impl Default for RedisDbSettings { + fn default() -> Self { + Self { + timeout: Default::default(), + } + } +} + +impl TryFrom<&str> for RedisDbSettings { + type Error = DbError; + fn try_from(setting_string: &str) -> Result { + let me: Self = match serde_json::from_str(setting_string) { + Ok(me) => me, + Err(e) if e.is_eof() => Self::default(), + Err(e) => Err(DbError::General(format!( + "Could not parse DdbSettings: {:?}", + e + )))?, + }; + Ok(me) + } +} + +mod tests { + + #[test] + fn test_settings_parse() -> Result<(), crate::db::error::DbError> { + let settings = super::RedisDbSettings::try_from("{\"timeout\": 123}")?; + assert_eq!(settings.timeout, Some(std::time::Duration::from_secs(123))); + Ok(()) + } +} diff --git a/autopush-common/src/db/redis/redis_client/error.rs b/autopush-common/src/db/redis/redis_client/error.rs new file mode 100644 index 00000000..2513eca8 --- /dev/null +++ b/autopush-common/src/db/redis/redis_client/error.rs @@ -0,0 +1,7 @@ +use crate::db::error::DbError; + +impl From for DbError { + fn from(err: serde_json::Error) -> Self { + DbError::Serialization(err.to_string()) + } +} diff --git a/autopush-common/src/db/redis/redis_client/mod.rs b/autopush-common/src/db/redis/redis_client/mod.rs new file mode 100644 index 00000000..e247e36a --- /dev/null +++ b/autopush-common/src/db/redis/redis_client/mod.rs @@ -0,0 +1,850 @@ +use std::collections::HashSet; +use std::fmt; +use std::fmt::Display; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; + +use async_trait::async_trait; +use cadence::{CountedExt, StatsdClient}; +use redis::aio::MultiplexedConnection; +use redis::{AsyncCommands, SetExpiry, SetOptions}; +use uuid::Uuid; + +use crate::db::NotificationRecord; +use crate::db::{ + client::{DbClient, FetchMessageResponse}, + error::{DbError, DbResult}, + DbSettings, Notification, User, MAX_ROUTER_TTL, +}; +use crate::util::ms_since_epoch; + +mod error; + +use super::RedisDbSettings; + +/// Semi convenience wrapper to ensure that the UAID is formatted and displayed consistently. +struct Uaid<'a>(&'a Uuid); + +impl<'a> Display for Uaid<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_hyphenated()) + } +} + +impl<'a> From> for String { + fn from(uaid: Uaid) -> String { + uaid.0.as_hyphenated().to_string() + } +} + +struct Chanid<'a>(&'a Uuid); + +impl<'a> Display for Chanid<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0.as_hyphenated()) + } +} + +impl<'a> From> for String { + fn from(uaid: Chanid) -> String { + uaid.0.as_hyphenated().to_string() + } +} + +#[derive(Clone)] +/// Wrapper for the Redis connection +pub struct RedisClientImpl { + /// Database connector string + pub client: redis::Client, + pub conn: Arc>>, + pub(crate) settings: RedisDbSettings, + /// Metrics client + metrics: Arc, + redis_opts: SetOptions, +} + +impl RedisClientImpl { + pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { + debug!("🐰 New redis client"); + let dsn = settings + .dsn + .clone() + .ok_or(DbError::General("Could not find DSN".to_owned()))?; + let client = redis::Client::open(dsn)?; + let db_settings = RedisDbSettings::try_from(settings.db_settings.as_ref())?; + info!("🐰 {:#?}", db_settings); + + Ok(Self { + client, + conn: Arc::new(Mutex::new(None)), + settings: db_settings, + metrics, + redis_opts: SetOptions::default().with_expiration(SetExpiry::EX(MAX_ROUTER_TTL)), + }) + } + + /// Return a [ConnectionLike], which implement redis [Commands] and can be + /// used in pipes. + /// + /// Pools also return a ConnectionLike, so we can add support for pools later. + async fn connection(&self) -> DbResult { + { + let conn = self + .conn + .lock() + .map_err(|e| DbError::General(e.to_string()))? + .clone(); + + if let Some(co) = conn { + return Ok(co); + } + } + let config = if self.settings.timeout.is_some_and(|t| !t.is_zero()) { + redis::AsyncConnectionConfig::new() + .set_connection_timeout(self.settings.timeout.unwrap()) + } else { + redis::AsyncConnectionConfig::new() + }; + let co = self + .client + .get_multiplexed_async_connection_with_config(&config) + .await + .map_err(|e| DbError::ConnectionError(format!("Cannot connect to redis: {}", e)))?; + let mut conn = self + .conn + .lock() + .map_err(|e| DbError::General(e.to_string()))?; + *conn = Some(co.clone()); + Ok(co) + } + + fn user_key(&self, uaid: &Uaid) -> String { + format!("autopush/user/{}", uaid) + } + + /// This store the last connection record, but doesn't update User + fn last_co_key(&self, uaid: &Uaid) -> String { + format!("autopush/co/{}", uaid) + } + + fn channel_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/channels/{}", uaid) + } + + fn message_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/msgs/{}", uaid) + } + + fn message_exp_list_key(&self, uaid: &Uaid) -> String { + format!("autopush/msgs_exp/{}", uaid) + } + + fn message_key(&self, uaid: &Uaid, chidmessageid: &str) -> String { + format!("autopush/msg/{}/{}", uaid, chidmessageid) + } +} + +#[async_trait] +impl DbClient for RedisClientImpl { + /// add user to the database + async fn add_user(&self, user: &User) -> DbResult<()> { + trace!("🐰 Adding user"); + trace!("🐰 Logged at {}", &user.connected_at); + let mut con = self.connection().await?; + let uaid = Uaid(&user.uaid); + let user_key = self.user_key(&uaid); + let co_key = self.last_co_key(&uaid); + let _: () = redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .set_options(user_key, serde_json::to_string(user)?, self.redis_opts) + .exec_async(&mut con) + .await?; + Ok(()) + } + + /// To update the TTL of the Redis entry we just have to SET again, with the new expiry + /// + /// NOTE: This function is called by mobile during the daily + /// [autoendpoint::routes::update_token_route] handling, and by desktop + /// [autoconnect-ws-sm::get_or_create_user]` which is called + /// during the `HELLO` handler. This should be enough to ensure that the ROUTER records + /// are properly refreshed for "lively" clients. + /// + /// NOTE: There is some, very small, potential risk that a desktop client that can + /// somehow remain connected the duration of MAX_ROUTER_TTL, may be dropped as not being + /// "lively". + async fn update_user(&self, user: &mut User) -> DbResult { + trace!("🐰 Updating user"); + let mut con = self.connection().await?; + let co_key = self.last_co_key(&Uaid(&user.uaid)); + let last_co: Option = con.get(&co_key).await?; + if last_co.is_some_and(|c| c < user.connected_at) { + trace!( + "🐰 Was connected at {}, now at {}", + last_co.unwrap(), + &user.connected_at + ); + self.add_user(&user).await?; + Ok(true) + } else { + Ok(false) + } + } + + async fn get_user(&self, uaid: &Uuid) -> DbResult> { + let mut con = self.connection().await?; + let user_key = self.user_key(&Uaid(uaid)); + let user: Option = con + .get::<&str, Option>(&user_key) + .await? + .and_then(|s| serde_json::from_str(s.as_ref()).ok()); + if user.is_some() { + trace!("🐰 Found a record for {}", &uaid); + } + Ok(user) + } + + async fn remove_user(&self, uaid: &Uuid) -> DbResult<()> { + let uaid = Uaid(uaid); + let mut con = self.connection().await?; + let user_key = self.user_key(&uaid); + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + redis::pipe() + .del(&user_key) + .del(&co_key) + .del(&chan_list_key) + .del(&msg_list_key) + .del(&exp_list_key) + .exec_async(&mut con) + .await?; + Ok(()) + } + + async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { + let uaid = Uaid(uaid); + let mut con = self.connection().await?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + + let _: () = redis::pipe() + .rpush(chan_list_key, channel_id.as_hyphenated().to_string()) + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .exec_async(&mut con) + .await?; + Ok(()) + } + + /// Add channels in bulk (used mostly during migration) + async fn add_channels(&self, uaid: &Uuid, channels: HashSet) -> DbResult<()> { + let uaid = Uaid(uaid); + // channel_ids are stored as a set within a single redis key + let mut con = self.connection().await?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .rpush( + chan_list_key, + channels + .into_iter() + .map(|c| c.as_hyphenated().to_string()) + .collect::>(), + ) + .exec_async(&mut con) + .await?; + Ok(()) + } + + async fn get_channels(&self, uaid: &Uuid) -> DbResult> { + let uaid = Uaid(uaid); + let mut con = self.client.get_multiplexed_async_connection().await?; + //let mut con = self.connection().await?; + let chan_list_key = self.channel_list_key(&uaid); + let channels: HashSet = con + .lrange::<&str, HashSet>(&chan_list_key, 0, -1) + .await? + .into_iter() + .filter_map(|s| Uuid::from_str(&s).ok()) + .collect(); + trace!("🐰 Found {} channels for {}", channels.len(), &uaid); + Ok(channels) + } + + /// Delete the channel. Does not delete its associated pending messages. + async fn remove_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult { + let uaid = Uaid(uaid); + let channel_id = Chanid(channel_id); + let mut con = self.connection().await?; + let co_key = self.last_co_key(&uaid); + let chan_list_key = self.channel_list_key(&uaid); + // Remove {channel_id} from autopush/channel/{auid} + trace!("🐰 Removing channel {}", channel_id); + let (status,): (bool,) = redis::pipe() + .set_options(co_key, ms_since_epoch(), self.redis_opts) + .ignore() + .lrem(&chan_list_key, 1, channel_id.to_string()) + .query_async(&mut con) + .await?; + Ok(status) + } + + /// Remove the node_id + async fn remove_node_id( + &self, + uaid: &Uuid, + _node_id: &str, + _connected_at: u64, + _version: &Option, + ) -> DbResult { + if let Some(mut user) = self.get_user(&uaid).await? { + user.node_id = None; + self.update_user(&mut user).await?; + } + Ok(true) + } + + /// Write the notification to storage. + /// + /// If the message contains a topic, we remove the old message + async fn save_message(&self, uaid: &Uuid, message: Notification) -> DbResult<()> { + let uaid = Uaid(uaid); + let mut con = self.connection().await?; + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + let msg_id = &message.chidmessageid(); + let msg_key = self.message_key(&uaid, &msg_id); + // message.ttl is already min(headers.ttl, MAX_NOTIFICATION_TTL) + // see autoendpoint/src/extractors/notification_headers.rs + let opts = SetOptions::default().with_expiration(SetExpiry::EX(message.ttl)); + + debug!("🐰 Saving message {} :: {:?}", &msg_key, &message); + trace!( + "🐰 timestamp: {:?}", + &message.timestamp.to_be_bytes().to_vec() + ); + + // Remember, `timestamp` is effectively the time to kill the message, not the + // current time. + let expiry = (SystemTime::now() + Duration::from_secs(message.ttl)) + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as u64; + trace!("🐰 Message Expiry {}", expiry); + + let mut pipe = redis::pipe(); + + // If this is a topic message: + // zadd(msg_list_key) and zadd(exp_list_key) will replace their old entry + // in the hashset if one already exists + // and set(msg_key, message) will override it too: nothing to do. + let is_topic = message.topic.is_some(); + + // Store notification record in autopush/msg/{aud}/{chidmessageid} + // And store {chidmessageid} in autopush/msgs/{aud} + let msg_key = self.message_key(&uaid, &msg_id); + pipe.set_options( + msg_key, + serde_json::to_string(&NotificationRecord::from_notif(&uaid.0, message))?, + opts, + ) + // The function [fecth_timestamp_messages] takes a timestamp in input, + // here we use the timestamp of the record (in ms) + .zadd(&exp_list_key, &msg_id, expiry) + .zadd(&msg_list_key, &msg_id, ms_since_epoch()); + + let _: () = pipe.exec_async(&mut con).await?; + self.metrics + .incr_with_tags("notification.message.stored") + .with_tag("topic", &is_topic.to_string()) + .with_tag("database", &self.name()) + .send(); + Ok(()) + } + + /// Save a batch of messages to the database. + /// + /// Currently just iterating through the list and saving one at a time. There's a bulk way + /// to save messages, but there are other considerations (e.g. mutation limits) + async fn save_messages(&self, uaid: &Uuid, messages: Vec) -> DbResult<()> { + // plate simple way of solving this: + for message in messages { + self.save_message(uaid, message).await?; + } + Ok(()) + } + + /// Delete expired messages + async fn increment_storage(&self, uaid: &Uuid, timestamp: u64) -> DbResult<()> { + let uaid = Uaid(uaid); + debug!("🐰🔥 Incrementing storage to {}", timestamp); + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + let mut con = self.connection().await?; + let exp_id_list: Vec = con.zrangebyscore(&exp_list_key, 0, timestamp).await?; + if exp_id_list.len() > 0 { + trace!("🐰🔥 Deleting {} expired msgs", exp_id_list.len()); + redis::pipe() + .del(&exp_id_list) + .zrem(&msg_list_key, &exp_id_list) + .zrem(&exp_list_key, &exp_id_list) + .exec_async(&mut con) + .await?; + } + Ok(()) + } + + /// Delete the notification from storage. + async fn remove_message(&self, uaid: &Uuid, chidmessageid: &str) -> DbResult<()> { + let uaid = Uaid(uaid); + trace!( + "🐰 attemping to delete {:?} :: {:?}", + uaid.to_string(), + chidmessageid + ); + let msg_key = self.message_key(&uaid, &chidmessageid); + let msg_list_key = self.message_list_key(&uaid); + let exp_list_key = self.message_exp_list_key(&uaid); + debug!("🐰🔥 Deleting message {}", &msg_key); + let mut con = self.connection().await?; + // We remove the id from the exp list at the end, to be sure + // it can't be removed from the list before the message is removed + redis::pipe() + .del(&msg_key) + .zrem(&msg_list_key, &chidmessageid) + .zrem(&exp_list_key, &chidmessageid) + .exec_async(&mut con) + .await?; + self.metrics + .incr_with_tags("notification.message.deleted") + .with_tag("database", &self.name()) + .send(); + Ok(()) + } + + /// Topic messages are handled as other messages with redis, we return nothing. + async fn fetch_topic_messages( + &self, + _uaid: &Uuid, + _limit: usize, + ) -> DbResult { + Ok(FetchMessageResponse { + messages: vec![], + timestamp: None, + }) + } + + /// Return [`limit`] messages pending for a [`uaid`] that have a record timestamp + /// after [`timestamp`] (millisecs). + /// + /// If [`limit`] = 0, we fetch all messages after [`timestamp`]. + /// + /// This can return expired messages, following bigtables behavior + async fn fetch_timestamp_messages( + &self, + uaid: &Uuid, + timestamp: Option, + limit: usize, + ) -> DbResult { + let uaid = Uaid(uaid); + trace!("🐰 Fecthing {} messages since {:?}", limit, timestamp); + let mut con = self.connection().await?; + let msg_list_key = self.message_list_key(&uaid); + // ZRANGE Key (x +inf LIMIT 0 limit + let (messages_id, mut scores): (Vec, Vec) = con + .zrangebyscore_limit_withscores::<&str, &str, &str, Vec<(String, u64)>>( + &msg_list_key, + &format!("({}", timestamp.unwrap_or(0)), + "+inf", + 0, + limit as isize, + ) + .await? + .into_iter() + .map(|(id, s): (String, u64)| (self.message_key(&uaid, &id), s)) + .unzip(); + if messages_id.len() == 0 { + trace!("🐰 No message found"); + return Ok(FetchMessageResponse { + messages: vec![], + timestamp: None, + }); + } + let messages: Vec = if messages_id.len() == 0 { + vec![] + } else { + con.mget::<&Vec, Vec>>(&messages_id) + .await? + .into_iter() + .filter_map(|opt: Option| { + if opt.is_none() { + // We return dummy expired event if we can't fetch the said event, + // it means the event has expired + Some(Notification { + timestamp: 1, + ..Default::default() + }) + } else { + opt.and_then(|m| serde_json::from_str(&m).ok()) + .and_then(|m: NotificationRecord| m.into_notif().ok()) + } + }) + .collect() + }; + let timestamp = scores.pop(); + trace!("🐰 Found {} messages until {:?}", messages.len(), timestamp); + Ok(FetchMessageResponse { + messages, + timestamp, + }) + } + + async fn health_check(&self) -> DbResult { + let mut con = self.connection().await?; + let _: () = con.ping().await?; + Ok(true) + } + + /// Returns true, because there's no table in Redis + async fn router_table_exists(&self) -> DbResult { + Ok(true) + } + + /// Returns true, because there's no table in Redis + async fn message_table_exists(&self) -> DbResult { + Ok(true) + } + + fn box_clone(&self) -> Box { + Box::new(self.clone()) + } + + fn name(&self) -> String { + "Redis".to_owned() + } + + fn pool_status(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use crate::{logging::init_test_logging, util::ms_since_epoch}; + + use super::*; + const TEST_USER: &str = "DEADBEEF-0000-0000-0000-0123456789AB"; + const TEST_CHID: &str = "DECAFBAD-0000-0000-0000-0123456789AB"; + const TOPIC_CHID: &str = "DECAFBAD-1111-0000-0000-0123456789AB"; + + fn now() -> u64 { + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs() + } + + fn new_client() -> DbResult { + let env_dsn = "redis://localhost".into(); // We force localhost to force test environment + let settings = DbSettings { + dsn: Some(env_dsn), + db_settings: "".into(), + }; + let metrics = Arc::new(StatsdClient::builder("", cadence::NopMetricSink).build()); + RedisClientImpl::new(metrics, &settings) + } + + #[actix_rt::test] + async fn health_check() { + let client = new_client().unwrap(); + + let result = client.health_check().await; + assert!(result.is_ok()); + assert!(result.unwrap()); + } + + /// Test if [increment_storage] correctly wipe expired messages + #[actix_rt::test] + async fn wipe_expired() -> DbResult<()> { + init_test_logging(); + let client = new_client()?; + + let connected_at = ms_since_epoch(); + + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + + let node_id = "test_node".to_owned(); + + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + + let test_user = User { + uaid, + router_type: "webpush".to_owned(), + connected_at, + router_data: None, + node_id: Some(node_id.clone()), + ..Default::default() + }; + + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + + // can we add the user? + let timestamp = now(); + let fetch_timestamp = ms_since_epoch(); + client.add_user(&test_user).await?; + let test_notification = crate::db::Notification { + channel_id: chid, + version: "test".to_owned(), + ttl: 1, + timestamp, + data: Some("Encrypted".into()), + sortkey_timestamp: Some(timestamp), + ..Default::default() + }; + client.save_message(&uaid, test_notification).await?; + client + .increment_storage(&uaid, fetch_timestamp + 10000) + .await?; + let msgs = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_eq!(msgs.messages.len(), 0); + Ok(()) + } + + /// run a gauntlet of testing. These are a bit linear because they need + /// to run in sequence. + #[actix_rt::test] + async fn run_gauntlet() -> DbResult<()> { + init_test_logging(); + let client = new_client()?; + + let connected_at = ms_since_epoch(); + + let uaid = Uuid::parse_str(TEST_USER).unwrap(); + let chid = Uuid::parse_str(TEST_CHID).unwrap(); + let topic_chid = Uuid::parse_str(TOPIC_CHID).unwrap(); + + let node_id = "test_node".to_owned(); + + // purge the user record if it exists. + let _ = client.remove_user(&uaid).await; + + let test_user = User { + uaid, + router_type: "webpush".to_owned(), + connected_at, + router_data: None, + node_id: Some(node_id.clone()), + ..Default::default() + }; + + // purge the old user (if present) + // in case a prior test failed for whatever reason. + let _ = client.remove_user(&uaid).await; + + // can we add the user? + client.add_user(&test_user).await?; + let fetched = client.get_user(&uaid).await?; + assert!(fetched.is_some()); + let fetched = fetched.unwrap(); + assert_eq!(fetched.router_type, "webpush".to_owned()); + + // Simulate a connected_at occuring before the following writes + let connected_at = ms_since_epoch(); + + // can we add channels? + client.add_channel(&uaid, &chid).await?; + let channels = client.get_channels(&uaid).await?; + assert!(channels.contains(&chid)); + + // can we add lots of channels? + let mut new_channels: HashSet = HashSet::new(); + new_channels.insert(chid); + for _ in 1..10 { + new_channels.insert(uuid::Uuid::new_v4()); + } + let chid_to_remove = uuid::Uuid::new_v4(); + new_channels.insert(chid_to_remove); + client.add_channels(&uaid, new_channels.clone()).await?; + let channels = client.get_channels(&uaid).await?; + assert_eq!(channels, new_channels); + + // can we remove a channel? + assert!(client.remove_channel(&uaid, &chid_to_remove).await?); + assert!(!client.remove_channel(&uaid, &chid_to_remove).await?); + new_channels.remove(&chid_to_remove); + let channels = client.get_channels(&uaid).await?; + assert_eq!(channels, new_channels); + + // now ensure that we can update a user that's after the time we set + // prior. first ensure that we can't update a user that's before the + // time we set prior to the last write + let mut updated = User { + connected_at, + ..test_user.clone() + }; + let result = client.update_user(&mut updated).await; + assert!(result.is_ok()); + assert!(!result.unwrap()); + + // Make sure that the `connected_at` wasn't modified + let fetched2 = client.get_user(&fetched.uaid).await?.unwrap(); + assert_eq!(fetched.connected_at, fetched2.connected_at); + + // and make sure we can update a record with a later connected_at time. + let mut updated = User { + connected_at: fetched.connected_at + 300, + ..fetched2 + }; + let result = client.update_user(&mut updated).await; + assert!(result.is_ok()); + assert!(result.unwrap()); + assert_ne!( + fetched2.connected_at, + client.get_user(&uaid).await?.unwrap().connected_at + ); + + // can we increment the storage for the user? + client + .increment_storage( + &fetched.uaid, + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + ) + .await?; + + let test_data = "An_encrypted_pile_of_crap".to_owned(); + let timestamp = now(); + let sort_key = now(); + // Unlike Bigtable, [fetch_timestamp_messages] uses and return a + // timestamp in milliseconds + let fetch_timestamp = ms_since_epoch(); + // Can we store a message? + let test_notification = crate::db::Notification { + channel_id: chid, + version: "test".to_owned(), + ttl: 300, + timestamp, + data: Some(test_data.clone()), + sortkey_timestamp: Some(sort_key), + ..Default::default() + }; + let res = client.save_message(&uaid, test_notification.clone()).await; + assert!(res.is_ok()); + + let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_ne!(fetched.messages.len(), 0); + let fm = fetched.messages.pop().unwrap(); + assert_eq!(fm.channel_id, test_notification.channel_id); + assert_eq!(fm.data, Some(test_data)); + + // Grab all 1 of the messages that were submmited within the past 10 seconds. + let fetched = client + .fetch_timestamp_messages(&uaid, Some(fetch_timestamp - 10), 999) + .await?; + assert_ne!(fetched.messages.len(), 0); + + // Try grabbing a message for 10 seconds from now. + let fetched = client + .fetch_timestamp_messages(&uaid, Some(fetch_timestamp + 10), 999) + .await?; + assert_eq!(fetched.messages.len(), 0); + + // can we clean up our toys? + assert!(client + .remove_message(&uaid, &test_notification.chidmessageid()) + .await + .is_ok()); + + assert!(client.remove_channel(&uaid, &chid).await.is_ok()); + + let msgs = client + .fetch_timestamp_messages(&uaid, None, 999) + .await? + .messages; + assert!(msgs.is_empty()); + + // Now, can we do all that with topic messages + // Unlike bigtable, we don't use [fetch_topic_messages]: it always return None: + // they are handled as usuals messages. + client.add_channel(&uaid, &topic_chid).await?; + let test_data = "An_encrypted_pile_of_crap_with_a_topic".to_owned(); + let timestamp = now(); + let sort_key = now(); + + // We store 2 messages, with a single topic + let test_notification_0 = crate::db::Notification { + channel_id: topic_chid, + version: "version0".to_owned(), + ttl: 300, + topic: Some("topic".to_owned()), + timestamp, + data: Some(test_data.clone()), + sortkey_timestamp: Some(sort_key), + ..Default::default() + }; + assert!(client + .save_message(&uaid, test_notification_0.clone()) + .await + .is_ok()); + + let test_notification = crate::db::Notification { + timestamp: now(), + version: "version1".to_owned(), + sortkey_timestamp: Some(sort_key + 10), + ..test_notification_0 + }; + + assert!(client + .save_message(&uaid, test_notification.clone()) + .await + .is_ok()); + + let mut fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_eq!(fetched.messages.len(), 1); + let fm = fetched.messages.pop().unwrap(); + assert_eq!(fm.channel_id, test_notification.channel_id); + assert_eq!(fm.data, Some(test_data)); + + // Grab the message that was submmited. + let fetched = client.fetch_timestamp_messages(&uaid, None, 999).await?; + assert_ne!(fetched.messages.len(), 0); + + // can we clean up our toys? + assert!(client + .remove_message(&uaid, &test_notification.chidmessageid()) + .await + .is_ok()); + + assert!(client.remove_channel(&uaid, &topic_chid).await.is_ok()); + + let msgs = client + .fetch_timestamp_messages(&uaid, None, 999) + .await? + .messages; + assert!(msgs.is_empty()); + + let fetched = client.get_user(&uaid).await?.unwrap(); + assert!(client + .remove_node_id(&uaid, &node_id, connected_at, &fetched.version) + .await + .is_ok()); + // did we remove it? + let fetched = client.get_user(&uaid).await?.unwrap(); + assert_eq!(fetched.node_id, None); + + assert!(client.remove_user(&uaid).await.is_ok()); + + assert!(client.get_user(&uaid).await?.is_none()); + + Ok(()) + } +} diff --git a/autopush-common/src/db/routing.rs b/autopush-common/src/db/routing.rs index 7999552a..ca40a87d 100644 --- a/autopush-common/src/db/routing.rs +++ b/autopush-common/src/db/routing.rs @@ -1,6 +1,7 @@ #[derive(Clone, Eq, PartialEq, Debug)] pub(crate) enum StorageType { BigTable, + Redis, None, } @@ -8,6 +9,8 @@ impl Default for StorageType { fn default() -> StorageType { if cfg!(feature = "bigtable") { StorageType::BigTable + } else if cfg!(feature = "redis") { + StorageType::Redis } else { StorageType::None } @@ -18,6 +21,7 @@ impl From<&str> for StorageType { fn from(str: &str) -> StorageType { match str.to_lowercase().as_str() { "bigtable" => StorageType::BigTable, + "redis" => StorageType::Redis, _ => { warn!("Using default StorageType for {str}"); StorageType::default() diff --git a/autopush-common/src/notification.rs b/autopush-common/src/notification.rs index ecbb7f42..298960a4 100644 --- a/autopush-common/src/notification.rs +++ b/autopush-common/src/notification.rs @@ -47,6 +47,7 @@ impl Notification { /// {chid}:{message_id} pub fn chidmessageid(&self) -> String { let chid = self.channel_id.as_hyphenated(); + if let Some(ref topic) = self.topic { format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}") } else if let Some(sortkey_timestamp) = self.sortkey_timestamp { diff --git a/redis-docker-compose.yml b/redis-docker-compose.yml new file mode 100644 index 00000000..6819ea32 --- /dev/null +++ b/redis-docker-compose.yml @@ -0,0 +1,51 @@ +services: + autoconnect: + build: + context: . + args: + BUILD_ARGS: "--no-default-features --features redis" + CRATE: autoconnect + BINARY: autoconnect + environment: + - "AUTOCONNECT__DB_DSN=redis://redis" + - "AUTOCONNECT__CRYPTO_KEY=[tlLWgjoAT-vV4q0nR0uiU3ANhI5uQ10GH2fKCgWrxaU=]" # Replace with output of `./scripts/fernet_key.py` + - "AUTOCONNECT__ENDPOINT_SCHEME=http" # The ENDPOINT* var are for the public facing autoendpoint url + - "AUTOCONNECT__ENDPOINT_HOSTNAME=localhost" + - "AUTOCONNECT__ENDPOINT_PORT=8000" + - "AUTOCONNECT__ROUTER_HOSTNAME=autoconnect" # This is used by autoendpoint to reach this autoconnect + - "RUST_BACKTRACE=1" + - "RUST_LOG=trace" + ports: + - "8080:8080" + - "8081:8081" + depends_on: + - redis + + + autoendpoint: + build: + context: . + args: + BUILD_ARGS: "--no-default-features --features redis" + CRATE: autoendpoint + BINARY: autoendpoint + environment: + - "AUTOEND__DB_DSN=redis://redis" + - 'AUTOEND__CRYPTO_KEYS=[tlLWgjoAT-vV4q0nR0uiU3ANhI5uQ10GH2fKCgWrxaU=]' # This is the same value as AUTOCONNECT__CRYPTO_KEY + - "RUST_BACKTRACE=1" + - "RUST_LOG=trace" + - "AUTOEND__HOST=0.0.0.0" # autoendpoint must listen on 0.0.0.0 with docker + - "AUTOEND__PORT=8000" # This is the port we listen on + - "AUTOEND__ENDPOINT_URL=http://localhost:8000" # This is the public facing url to reach autoendpoint + ports: + - "8000:8000" + depends_on: + - redis + - autoconnect + + redis: + image: redis:latest + restart: unless-stopped + command: redis-server + ports: + - "6379:6379"