diff --git a/bindings/matrix-sdk-ffi/src/error.rs b/bindings/matrix-sdk-ffi/src/error.rs index e644d7974d..9d793b0702 100644 --- a/bindings/matrix-sdk-ffi/src/error.rs +++ b/bindings/matrix-sdk-ffi/src/error.rs @@ -183,6 +183,12 @@ pub enum QueueWedgeError { /// session before sending. CrossVerificationRequired, + /// Some media content to be sent has disappeared from the cache. + MissingMediaContent, + + /// Some mime type couldn't be parsed. + InvalidMimeType { mime_type: String }, + /// Other errors. GenericApiError { msg: String }, } @@ -201,10 +207,17 @@ impl Display for QueueWedgeError { QueueWedgeError::CrossVerificationRequired => { f.write_str("Own verification is required") } + QueueWedgeError::MissingMediaContent => { + f.write_str("Media to be sent disappeared from local storage") + } + QueueWedgeError::InvalidMimeType { mime_type } => { + write!(f, "Invalid mime type '{mime_type}' for media upload") + } QueueWedgeError::GenericApiError { msg } => f.write_str(msg), } } } + impl From for QueueWedgeError { fn from(value: SdkQueueWedgeError) -> Self { match value { @@ -223,6 +236,10 @@ impl From for QueueWedgeError { users: users.iter().map(ruma::OwnedUserId::to_string).collect(), }, SdkQueueWedgeError::CrossVerificationRequired => Self::CrossVerificationRequired, + SdkQueueWedgeError::MissingMediaContent => Self::MissingMediaContent, + SdkQueueWedgeError::InvalidMimeType { mime_type } => { + Self::InvalidMimeType { mime_type } + } SdkQueueWedgeError::GenericApiError { msg } => Self::GenericApiError { msg }, } } diff --git a/crates/matrix-sdk-base/src/media.rs b/crates/matrix-sdk-base/src/media.rs index 6d02683a5f..15950be168 100644 --- a/crates/matrix-sdk-base/src/media.rs +++ b/crates/matrix-sdk-base/src/media.rs @@ -14,6 +14,7 @@ use ruma::{ }, MxcUri, UInt, }; +use serde::{Deserialize, Serialize}; const UNIQUE_SEPARATOR: &str = "_"; @@ -25,7 +26,7 @@ pub trait UniqueKey { } /// The requested format of a media file. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum MediaFormat { /// The file that was uploaded. File, @@ -44,7 +45,7 @@ impl UniqueKey for MediaFormat { } /// The requested size of a media thumbnail. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaThumbnailSize { /// The desired resizing method. pub method: Method, @@ -65,7 +66,7 @@ impl UniqueKey for MediaThumbnailSize { } /// The desired settings of a media thumbnail. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaThumbnailSettings { /// The desired size of the thumbnail. pub size: MediaThumbnailSize, @@ -110,7 +111,7 @@ impl UniqueKey for MediaSource { } /// A request for media data. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct MediaRequest { /// The source of the media file. pub source: MediaSource, diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 131f919c2c..b8121bc197 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -1406,7 +1406,9 @@ impl StateStoreIntegrationTests for DynStateStore { assert_eq!(dependents.len(), 1); assert_eq!(dependents[0].parent_transaction_id, txn0); assert_eq!(dependents[0].own_transaction_id, child_txn); - assert_eq!(dependents[0].parent_key.as_ref(), Some(&SentRequestKey::Event(event_id))); + assert_matches!(dependents[0].parent_key.as_ref(), Some(&SentRequestKey::Event(ref eid)) => { + assert_eq!(*eid, event_id); + }); assert_matches!(dependents[0].kind, DependentQueuedRequestKind::RedactEvent); // Now remove it. diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index cd4b7c67cd..b219c664bb 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -75,8 +75,9 @@ pub use self::integration_tests::StateStoreIntegrationTests; pub use self::{ memory_store::MemoryStore, send_queue::{ - ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, + FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind, + SentRequestKey, SerializableEventContent, }, traits::{ ComposerDraft, ComposerDraftType, DynStateStore, IntoStateStore, ServerCapabilities, diff --git a/crates/matrix-sdk-base/src/store/send_queue.rs b/crates/matrix-sdk-base/src/store/send_queue.rs index 7efe0f1363..e5b62242c1 100644 --- a/crates/matrix-sdk-base/src/store/send_queue.rs +++ b/crates/matrix-sdk-base/src/store/send_queue.rs @@ -18,12 +18,17 @@ use std::{collections::BTreeMap, fmt, ops::Deref}; use as_variant::as_variant; use ruma::{ - events::{AnyMessageLikeEventContent, EventContent as _, RawExt as _}, + events::{ + room::{message::RoomMessageEventContent, MediaSource}, + AnyMessageLikeEventContent, EventContent as _, RawExt as _, + }, serde::Raw, - OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, + OwnedDeviceId, OwnedEventId, OwnedTransactionId, OwnedUserId, TransactionId, UInt, }; use serde::{Deserialize, Serialize}; +use crate::media::MediaRequest; + /// A thin wrapper to serialize a `AnyMessageLikeEventContent`. #[derive(Clone, Serialize, Deserialize)] pub struct SerializableEventContent { @@ -76,6 +81,28 @@ pub enum QueuedRequestKind { /// The content of the message-like event we'd like to send. content: SerializableEventContent, }, + + /// Content to upload on the media server. + /// + /// The bytes must be stored in the media cache, and are identified by the + /// cache key. + Upload { + /// Content type of the media to be uploaded. + /// + /// Stored as a `String` because `Mime` which we'd really want to use + /// here, is not serializable. Oh well. + content_type: String, + + /// The cache key used to retrieve the media's bytes in the event cache + /// store. + cache_key: MediaRequest, + + /// An optional media source for a thumbnail already uploadd. + thumbnail_source: Option, + + /// To which media event transaction does this upload relate? + related_to: OwnedTransactionId, + }, } impl From for QueuedRequestKind { @@ -143,6 +170,18 @@ pub enum QueueWedgeError { #[error("Own verification is required")] CrossVerificationRequired, + /// Media content was cached in the media store, but has disappeared before + /// we could upload it. + #[error("Media content disappeared")] + MissingMediaContent, + + /// We tried to upload some media content with an unknown mime type. + #[error("Invalid mime type '{mime_type}' for media")] + InvalidMimeType { + /// The observed mime type that's expected to be invalid. + mime_type: String, + }, + /// Other errors. #[error("Other unrecoverable error: {msg}")] GenericApiError { @@ -169,6 +208,43 @@ pub enum DependentQueuedRequestKind { /// Key used for the reaction. key: String, }, + + /// Upload a file that had a thumbnail. + UploadFileWithThumbnail { + /// Content type for the file itself (not the thumbnail). + content_type: String, + + /// Media request necessary to retrieve the file itself (not the + /// thumbnail). + cache_key: MediaRequest, + + /// To which media transaction id does this upload relate to? + related_to: OwnedTransactionId, + }, + + /// Finish an upload by updating references to the media cache and sending + /// the final media event with the remote MXC URIs. + FinishUpload { + /// Local echo for the event (containing the local MXC URIs). + local_echo: RoomMessageEventContent, + + /// Transaction id for the file upload. + file_upload: OwnedTransactionId, + + /// Information about the thumbnail, if present. + thumbnail_info: Option, + }, +} + +/// Detailed record about a thumbnail used when finishing a media upload. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct FinishUploadThumbnailInfo { + /// Transaction id for the thumbnail upload. + pub txn: OwnedTransactionId, + /// Thumbnail's width. + pub width: UInt, + /// Thumbnail's height. + pub height: UInt, } /// A transaction id identifying a [`DependentQueuedRequest`] rather than its @@ -210,14 +286,34 @@ impl From for OwnedTransactionId { } } +impl From for ChildTransactionId { + fn from(val: OwnedTransactionId) -> Self { + Self(val) + } +} + /// A unique key (identifier) indicating that a transaction has been /// successfully sent to the server. /// /// The owning child transactions can now be resolved. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum SentRequestKey { /// The parent transaction returned an event when it succeeded. Event(OwnedEventId), + + /// The parent transaction returned an uploaded resource URL. + Media { + /// File that was uploaded by this request. + /// + /// If the request related to a thumbnail upload, this contains the + /// thumbnail media source. + file: MediaSource, + + /// Optional thumbnail previously uploaded, when uploading a file. + /// + /// When uploading a thumbnail, this is set to `None`. + thumbnail: Option, + }, } impl SentRequestKey { diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index cd1c39b14d..3dd614fa33 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -1359,6 +1359,11 @@ impl TimelineController

{ self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id }) .await; } + + RoomSendQueueUpdate::UploadedMedia { related_to } => { + // TODO(bnjbvr): Do something else? + info!(txn_id = %related_to, "some media for a media event has been uploaded"); + } } } } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 0d0e13b1bd..cc9a294d58 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -2045,7 +2045,7 @@ impl Room { /// Creates the inner [`MessageType`] for an already-uploaded media file /// provided by its source. #[allow(clippy::too_many_arguments)] - fn make_attachment_type( + pub(crate) fn make_attachment_type( &self, content_type: &Mime, filename: &str, @@ -2131,7 +2131,7 @@ impl Room { /// Creates the [`RoomMessageEventContent`] based on the message type and /// mentions. - fn make_attachment_event( + pub(crate) fn make_attachment_event( msg_type: MessageType, mentions: Option, ) -> RoomMessageEventContent { diff --git a/crates/matrix-sdk/src/send_queue.rs b/crates/matrix-sdk/src/send_queue.rs index 01c50eddd7..f326760038 100644 --- a/crates/matrix-sdk/src/send_queue.rs +++ b/crates/matrix-sdk/src/send_queue.rs @@ -45,27 +45,38 @@ use std::{ collections::{BTreeMap, BTreeSet, HashMap}, + io::Cursor, + str::FromStr as _, sync::{ atomic::{AtomicBool, Ordering}, Arc, RwLock as SyncRwLock, }, }; +use as_variant::as_variant; use matrix_sdk_base::{ + event_cache_store::EventCacheStoreError, + media::{MediaFormat, MediaRequest, MediaThumbnailSettings, MediaThumbnailSize}, store::{ - ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, QueueWedgeError, - QueuedRequest, QueuedRequestKind, SentRequestKey, SerializableEventContent, + ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, + FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind, + SentRequestKey, SerializableEventContent, }, RoomState, StoreError, }; use matrix_sdk_common::executor::{spawn, JoinHandle}; +use mime::Mime; use ruma::{ + assign, events::{ - reaction::ReactionEventContent, relation::Annotation, AnyMessageLikeEventContent, - EventContent as _, + reaction::ReactionEventContent, + relation::Annotation, + room::{message::MessageType, MediaSource, ThumbnailInfo}, + AnyMessageLikeEventContent, EventContent as _, }, + media::Method, serde::Raw, - OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + uint, OwnedEventId, OwnedMxcUri, OwnedRoomId, OwnedTransactionId, TransactionId, }; use tokio::sync::{broadcast, Notify, RwLock}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -73,6 +84,7 @@ use tracing::{debug, error, info, instrument, trace, warn}; #[cfg(feature = "e2e-encryption")] use crate::crypto::{OlmError, SessionRecipientCollectionError}; use crate::{ + attachment::AttachmentConfig, client::WeakClient, config::RequestConfig, error::RetryKind, @@ -390,6 +402,225 @@ impl RoomSendQueue { .await } + /// Queues an attachment to be sent to the room, using the send queue. + /// + /// This returns quickly (without sending or uploading anything), and will + /// push the event to be sent into a queue, handled in the background. + /// + /// Callers are expected to consume [`RoomSendQueueUpdate`] via calling + /// the [`Self::subscribe()`] method to get updates about the sending of + /// that event. + /// + /// By default, if sending failed on the first attempt, it will be retried a + /// few times. If sending failed after those retries, the entire + /// client's sending queue will be disabled, and it will need to be + /// manually re-enabled by the caller (e.g. after network is back, or when + /// something has been done about the faulty requests). + pub async fn send_attachment( + &self, + filename: &str, + content_type: Mime, + data: Vec, + mut config: AttachmentConfig, + ) -> Result { + let Some(room) = self.inner.room.get() else { + return Err(RoomSendQueueError::RoomDisappeared); + }; + if room.state() != RoomState::Joined { + return Err(RoomSendQueueError::RoomNotJoined); + } + + let client = room.client(); + let store = client.store(); + + // Push the dependent requests first, to make sure we're not sending the parent + // (depended upon) while dependencies aren't known yet. + + let upload_file_txn = TransactionId::new(); + let send_event_txn = ChildTransactionId::new(); + + // Cache medias. + + // Prepare and cache the file. + let file_source = MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{upload_file_txn}" + ))); + + let file_media_request = + MediaRequest { source: file_source.clone(), format: MediaFormat::File }; + room.client() + .event_cache_store() + .add_media_content(&file_media_request, data.clone()) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + let (event_content, thumbnail_txn) = if let Some(thumbnail) = config.thumbnail.take() { + let info = thumbnail.info.as_ref(); + let height = info.and_then(|info| info.height).unwrap_or_else(|| uint!(0)); + let width = info.and_then(|info| info.width).unwrap_or_else(|| uint!(0)); + + let thumbnail_upload_txn = TransactionId::new(); + let thumbnail_source = MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{thumbnail_upload_txn}" + ))); + + let media_request = MediaRequest { + source: thumbnail_source.clone(), + format: MediaFormat::Thumbnail(MediaThumbnailSettings { + size: MediaThumbnailSize { method: Method::Scale, width, height }, + animated: false, + }), + }; + + room.client() + .event_cache_store() + .add_media_content(&media_request, thumbnail.data.clone()) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + let thumbnail_info = + Box::new(assign!(thumbnail.info.map(ThumbnailInfo::from).unwrap_or_default(), { + mimetype: Some(thumbnail.content_type.as_ref().to_owned()) + })); + + // Save the event sending request as a dependent request on the file upload. + let content = Room::make_attachment_event( + room.make_attachment_type( + &content_type, + filename, + file_source, + config.caption, + config.formatted_caption, + config.info, + Some((thumbnail_source, thumbnail_info)), + ), + config.mentions, + ); + + store + .save_dependent_queued_request( + room.room_id(), + &upload_file_txn, + send_event_txn.clone().into(), + DependentQueuedRequestKind::FinishUpload { + local_echo: content.clone(), + file_upload: upload_file_txn.clone(), + thumbnail_info: Some(FinishUploadThumbnailInfo { + txn: thumbnail_upload_txn.clone(), + height, + width, + }), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // Save the file upload request as a dependent request of the thumbnail upload. + store + .save_dependent_queued_request( + room.room_id(), + &thumbnail_upload_txn, + upload_file_txn.clone().into(), + DependentQueuedRequestKind::UploadFileWithThumbnail { + content_type: content_type.to_string(), + cache_key: file_media_request, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // Save the thumbnail upload request. + store + .save_send_queue_request( + room.room_id(), + thumbnail_upload_txn.clone(), + QueuedRequestKind::Upload { + content_type: thumbnail.content_type.to_string(), + cache_key: media_request, + thumbnail_source: None, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + (content, Some(thumbnail_upload_txn)) + } else { + // No thumbnail: only save the file upload request and send the event as a + // dependency. + let content = Room::make_attachment_event( + room.make_attachment_type( + &content_type, + filename, + file_source, + config.caption, + config.formatted_caption, + config.info, + None, + ), + config.mentions, + ); + + store + .save_dependent_queued_request( + room.room_id(), + &upload_file_txn, + send_event_txn.clone().into(), + DependentQueuedRequestKind::FinishUpload { + local_echo: content.clone(), + file_upload: upload_file_txn.clone(), + thumbnail_info: None, + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + store + .save_send_queue_request( + room.room_id(), + upload_file_txn.clone(), + QueuedRequestKind::Upload { + content_type: content_type.to_string(), + cache_key: file_media_request, + thumbnail_source: None, + related_to: send_event_txn.clone().into(), + }, + ) + .await + .map_err(|err| RoomSendQueueError::StorageError(err.into()))?; + + // No thumbnail attachment. + (content, None) + }; + + let send_event_txn = OwnedTransactionId::from(send_event_txn); + trace!(event_txn = %send_event_txn, file_txn = %upload_file_txn, thumbnail_txn = ?thumbnail_txn, "manager sends a media to the background task"); + + self.inner.notifier.notify_one(); + + let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + transaction_id: send_event_txn.clone(), + content: LocalEchoContent::Event { + serialized_event: SerializableEventContent::new(&event_content.into()) + .map_err(RoomSendQueueStorageError::JsonSerialization)?, + // TODO: this should be a `SendAttachmentHandle`! + send_handle: SendHandle { + room: self.clone(), + transaction_id: send_event_txn.clone(), + }, + send_error: None, + }, + })); + + Ok(SendAttachmentHandle { + _room: self.clone(), + _transaction_id: send_event_txn, + _file_upload: upload_file_txn, + _thumbnail_transaction_id: thumbnail_txn, + }) + } + /// Returns the current local requests as well as a receiver to listen to /// the send queue updates, as defined in [`RoomSendQueueUpdate`]. pub async fn subscribe( @@ -454,7 +685,10 @@ impl RoomSendQueue { } }; - trace!(txn_id = %queued_request.transaction_id, "received a request to send!"); + let txn_id = queued_request.transaction_id.clone(); + trace!(txn_id = %txn_id, "received a request to send!"); + + let related_txn_id = as_variant!(&queued_request.kind, QueuedRequestKind::Upload { related_to, .. } => related_to.clone()); let Some(room) = room.get() else { if is_dropping.load(Ordering::SeqCst) { @@ -464,18 +698,21 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, &queued_request).await { - Ok(parent_key) => match queue - .mark_as_sent(&queued_request.transaction_id, parent_key.clone()) - .await - { + match Self::handle_request(&room, queued_request).await { + Ok(parent_key) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { SentRequestKey::Event(event_id) => { let _ = updates.send(RoomSendQueueUpdate::SentEvent { - transaction_id: queued_request.transaction_id, + transaction_id: txn_id, event_id, }); } + + SentRequestKey::Media { .. } => { + let _ = updates.send(RoomSendQueueUpdate::UploadedMedia { + related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), + }); + } }, Err(err) => { @@ -504,11 +741,11 @@ impl RoomSendQueue { }; if is_recoverable { - warn!(txn_id = %queued_request.transaction_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue"); + warn!(txn_id = %txn_id, error = ?err, "Recoverable error when sending request: {err}, disabling send queue"); // In this case, we intentionally keep the request in the queue, but mark it // as not being sent anymore. - queue.mark_as_not_being_sent(&queued_request.transaction_id).await; + queue.mark_as_not_being_sent(&txn_id).await; // Let observers know about a failure *after* we've marked the item as not // being sent anymore. Otherwise, there's a possible race where a caller @@ -520,16 +757,11 @@ impl RoomSendQueue { // disconnected, maybe the server had a hiccup). locally_enabled.store(false, Ordering::SeqCst); } else { - warn!(txn_id = %queued_request.transaction_id, error = ?err, "Unrecoverable error when sending request: {err}"); + warn!(txn_id = %txn_id, error = ?err, "Unrecoverable error when sending request: {err}"); // Mark the request as wedged, so it's not picked at any future point. - - if let Err(storage_error) = queue - .mark_as_wedged( - &queued_request.transaction_id, - QueueWedgeError::from(&err), - ) - .await + if let Err(storage_error) = + queue.mark_as_wedged(&txn_id, QueueWedgeError::from(&err)).await { warn!("unable to mark request as wedged: {storage_error}"); } @@ -544,7 +776,7 @@ impl RoomSendQueue { }); let _ = updates.send(RoomSendQueueUpdate::SendError { - transaction_id: queued_request.transaction_id, + transaction_id: related_txn_id.unwrap_or(txn_id), error, is_recoverable, }); @@ -558,9 +790,9 @@ impl RoomSendQueue { /// Handles a single request and returns the [`SentRequestKey`] on success. async fn handle_request( room: &Room, - request: &QueuedRequest, + request: QueuedRequest, ) -> Result { - match &request.kind { + match request.kind { QueuedRequestKind::Event { content } => { let (event, event_type) = content.raw(); @@ -573,6 +805,45 @@ impl RoomSendQueue { trace!(txn_id = %request.transaction_id, event_id = %res.event_id, "event successfully sent"); Ok(SentRequestKey::Event(res.event_id)) } + + QueuedRequestKind::Upload { + content_type, + cache_key, + thumbnail_source, + related_to: relates_to, + } => { + let mime = Mime::from_str(&content_type).map_err(|_| { + crate::Error::SendQueueWedgeError(QueueWedgeError::InvalidMimeType { + mime_type: content_type.clone(), + }) + })?; + + let Some(data) = + room.client().event_cache_store().get_media_content(&cache_key).await? + else { + return Err(crate::Error::SendQueueWedgeError( + QueueWedgeError::MissingMediaContent, + )); + }; + + let media_source = if room.is_encrypted().await? { + let mut cursor = Cursor::new(data); + let encrypted_file = + room.client().upload_encrypted_file(&mime, &mut cursor).await?; + MediaSource::Encrypted(Box::new(encrypted_file)) + } else { + let res = room.client().media().upload(&mime, data).await?; + MediaSource::Plain(res.content_uri) + }; + + let uri = match &media_source { + MediaSource::Plain(uri) => uri, + MediaSource::Encrypted(encrypted_file) => &encrypted_file.url, + }; + trace!(%relates_to, mxc_uri = %uri, "media successfully uploaded"); + + Ok(SentRequestKey::Media { file: media_source, thumbnail: thumbnail_source }) + } } } @@ -633,6 +904,9 @@ impl From<&crate::Error> for QueueWedgeError { } }, + // Flatten errors of `Self` type. + crate::Error::SendQueueWedgeError(error) => error.clone(), + _ => QueueWedgeError::GenericApiError { msg: value.to_string() }, } } @@ -908,8 +1182,8 @@ impl QueueStorage { let store = client.store(); let local_requests = - store.load_send_queue_requests(&self.room_id).await?.into_iter().map(|queued| { - LocalEcho { + store.load_send_queue_requests(&self.room_id).await?.into_iter().filter_map(|queued| { + Some(LocalEcho { transaction_id: queued.transaction_id.clone(), content: match queued.kind { QueuedRequestKind::Event { content } => LocalEchoContent::Event { @@ -920,33 +1194,67 @@ impl QueueStorage { }, send_error: queued.error, }, + + QueuedRequestKind::Upload { .. } => { + // Don't return uploaded medias as their own things; the accompanying + // event represented as a dependent request should be sufficient. + return None; + } }, - } + }) }); - let local_reactions = - store.load_dependent_queued_requests(&self.room_id).await?.into_iter().filter_map( - |dep| match dep.kind { - DependentQueuedRequestKind::EditEvent { .. } - | DependentQueuedRequestKind::RedactEvent => { - // TODO: reflect local edits/redacts too? - None - } - DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho { + let reactions_and_medias = store + .load_dependent_queued_requests(&self.room_id) + .await? + .into_iter() + .filter_map(|dep| match dep.kind { + DependentQueuedRequestKind::EditEvent { .. } + | DependentQueuedRequestKind::RedactEvent => { + // TODO: reflect local edits/redacts too? + None + } + + DependentQueuedRequestKind::ReactEvent { key } => Some(LocalEcho { + transaction_id: dep.own_transaction_id.clone().into(), + content: LocalEchoContent::React { + key, + send_handle: SendReactionHandle { + room: room.clone(), + transaction_id: dep.own_transaction_id, + }, + applies_to: dep.parent_transaction_id, + }, + }), + + DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => { + // Don't reflect these: only the associated event is interesting to observers. + None + } + + DependentQueuedRequestKind::FinishUpload { + local_echo, + file_upload: _, + thumbnail_info: _, + } => { + // Materialize as an event local echo. + Some(LocalEcho { transaction_id: dep.own_transaction_id.clone().into(), - content: LocalEchoContent::React { - key, - send_handle: SendReactionHandle { + content: LocalEchoContent::Event { + serialized_event: SerializableEventContent::new(&local_echo.into()) + .ok()?, + // TODO this should be a `SendAttachmentHandle`! + send_handle: SendHandle { room: room.clone(), - transaction_id: dep.own_transaction_id, + transaction_id: dep.own_transaction_id.into(), }, - applies_to: dep.parent_transaction_id, + send_error: None, }, - }), - }, - ); + }) + } + }); - Ok(local_requests.chain(local_reactions).collect()) + Ok(local_requests.chain(reactions_and_medias).collect()) } /// Try to apply a single dependent request, whether it's local or remote. @@ -1030,7 +1338,7 @@ impl QueueStorage { serializable.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } else { // The parent event is still local; update the local echo. let edited = store @@ -1040,7 +1348,7 @@ impl QueueStorage { new_content.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; if !edited { warn!("missing local echo upon dependent edit"); @@ -1080,7 +1388,7 @@ impl QueueStorage { let removed = store .remove_send_queue_request(&self.room_id, &de.parent_transaction_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; if !removed { warn!("missing local echo upon dependent redact"); @@ -1112,12 +1420,173 @@ impl QueueStorage { serializable.into(), ) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } else { // Not applied yet, we should retry later => false. return Ok(false); } } + + DependentQueuedRequestKind::UploadFileWithThumbnail { + content_type, + cache_key, + related_to, + } => { + let Some(parent_key) = parent_key else { + // Not finished yet. + return Ok(false); + }; + + let Some((file, thumbnail)) = as_variant!(parent_key, SentRequestKey::Media { file, thumbnail } => (file, thumbnail)) + else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + + // The media we just uploaded was a thumbnail, so the thumbnail shouldn't have + // a thumbnail itself. + debug_assert!(thumbnail.is_none()); + if thumbnail.is_some() { + warn!("unexpected thumbnail for a thumbnail!"); + } + + let request = QueuedRequestKind::Upload { + content_type, + cache_key, + thumbnail_source: Some(file), + related_to, + }; + + store + .save_send_queue_request(&self.room_id, de.own_transaction_id.into(), request) + .await + .map_err(RoomSendQueueStorageError::StateStoreError)?; + } + + DependentQueuedRequestKind::FinishUpload { + mut local_echo, + file_upload, + thumbnail_info, + } => { + let Some(parent_key) = parent_key else { + // Not finished yet. + return Ok(false); + }; + + // Both uploads are ready: enqueue the event with its final data. + let Some((file_source, thumbnail_source)) = as_variant!(parent_key, SentRequestKey::Media { file, thumbnail } => (file, thumbnail)) + else { + return Err(RoomSendQueueError::StorageError( + RoomSendQueueStorageError::InvalidParentKey, + )); + }; + + { + // Update cache keys in the media stores, from the local ones to the remote + // ones. + + // Rename the original file. + let original_file_request = MediaRequest { + source: MediaSource::Plain(OwnedMxcUri::from(format!( + "mxc://send-queue.local/{file_upload}" + ))), + format: MediaFormat::File, + }; + + client + .event_cache_store() + .replace_media_key( + &original_file_request, + &MediaRequest { + source: file_source.clone(), + format: MediaFormat::File, + }, + ) + .await + .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + + // Rename the thumbnail too, if needs be. + if let Some(info) = thumbnail_info.as_ref() { + let original_thumbnail_source = MediaSource::Plain(OwnedMxcUri::from( + format!("mxc://send-queue.local/{}", info.txn), + )); + let format = MediaFormat::Thumbnail(MediaThumbnailSettings { + size: MediaThumbnailSize { + method: Method::Scale, + width: info.width, + height: info.height, + }, + animated: false, + }); + + client + .event_cache_store() + .replace_media_key( + &MediaRequest { + source: original_thumbnail_source, + format: format.clone(), + }, + &MediaRequest { source: file_source.clone(), format }, + ) + .await + .map_err(RoomSendQueueStorageError::EventCacheStoreError)?; + } + } + + // Replace the source by the final ones in all the medias handled by + // `Room::make_attachment_type()`. + // + // Some variants look eerily similar below, but the `event` and `info` are all + // different types… + + match &mut local_echo.msgtype { + MessageType::Audio(event) => { + event.source = file_source; + } + MessageType::File(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + MessageType::Image(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + MessageType::Video(event) => { + event.source = file_source; + if let Some(info) = event.info.as_mut() { + info.thumbnail_source = thumbnail_source; + } + } + + _ => { + // All `MessageType` created by `Room::make_attachment_type` should be + // handled here. The only way to end up here is that a message type has + // been tampered with in the database. + error!("Invalid message type in database: {}", local_echo.msgtype()); + // Only crash debug builds. + debug_assert!(false, "invalid message type in database"); + } + } + + let request = SerializableEventContent::new(&local_echo.into()) + .map_err(RoomSendQueueStorageError::JsonSerialization)?; + + // TODO: probably want to emit a room update here, of type "edit". + + store + .save_send_queue_request( + &self.room_id, + de.own_transaction_id.into(), + request.into(), + ) + .await + .map_err(RoomSendQueueStorageError::StateStoreError)?; + } } Ok(true) @@ -1134,7 +1603,7 @@ impl QueueStorage { let dependent_requests = store .load_dependent_queued_requests(&self.room_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; let num_initial_dependent_requests = dependent_requests.len(); if num_initial_dependent_requests == 0 { @@ -1153,7 +1622,7 @@ impl QueueStorage { store .remove_dependent_queued_request(&self.room_id, &original.own_transaction_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; } } @@ -1174,7 +1643,7 @@ impl QueueStorage { store .remove_dependent_queued_request(&self.room_id, &dependent_id) .await - .map_err(RoomSendQueueStorageError::StorageError)?; + .map_err(RoomSendQueueStorageError::StateStoreError)?; num_dependent_requests -= 1; } @@ -1303,6 +1772,12 @@ pub enum RoomSendQueueUpdate { /// Received event id from the send response. event_id: OwnedEventId, }, + + /// A media has been successfully uploaded. + UploadedMedia { + /// The media event this uploaded media relates to. + related_to: OwnedTransactionId, + }, } /// An error triggered by the send queue module. @@ -1328,7 +1803,11 @@ pub enum RoomSendQueueError { pub enum RoomSendQueueStorageError { /// Error caused by the state store. #[error(transparent)] - StorageError(#[from] StoreError), + StateStoreError(#[from] StoreError), + + /// Error caused by the event cache store. + #[error(transparent)] + EventCacheStoreError(#[from] EventCacheStoreError), /// Error caused when (de)serializing into/from json. #[error(transparent)] @@ -1510,16 +1989,34 @@ impl SendReactionHandle { } } +/// A handle to execute actions while sending an attachment. +/// +/// In the future, this may support cancellation, subscribing to progress, etc. +#[derive(Clone, Debug)] +pub struct SendAttachmentHandle { + /// Reference to the send queue for the room where this attachment was sent. + _room: RoomSendQueue, + + /// Transaction id for the sending of the event itself. + _transaction_id: OwnedTransactionId, + + /// Transaction id for the file upload. + _file_upload: OwnedTransactionId, + + /// Transaction id for the thumbnail upload. + _thumbnail_transaction_id: Option, +} + /// From a given source of [`DependentQueuedRequest`], return only the most /// meaningful, i.e. the ones that wouldn't be overridden after applying the /// others. fn canonicalize_dependent_requests( dependent: &[DependentQueuedRequest], ) -> Vec { - let mut by_event_id = HashMap::>::new(); + let mut by_txn = HashMap::>::new(); for d in dependent { - let prevs = by_event_id.entry(d.parent_transaction_id.clone()).or_default(); + let prevs = by_txn.entry(d.parent_transaction_id.clone()).or_default(); if prevs.iter().any(|prev| matches!(prev.kind, DependentQueuedRequestKind::RedactEvent)) { // The parent event has already been flagged for redaction, don't consider the @@ -1540,7 +2037,10 @@ fn canonicalize_dependent_requests( } } - DependentQueuedRequestKind::ReactEvent { .. } => { + DependentQueuedRequestKind::UploadFileWithThumbnail { .. } + | DependentQueuedRequestKind::FinishUpload { .. } + | DependentQueuedRequestKind::ReactEvent { .. } => { + // These requests can't be canonicalized, push them as is. prevs.push(d); } @@ -1552,10 +2052,7 @@ fn canonicalize_dependent_requests( } } - by_event_id - .into_iter() - .flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()) - .collect() + by_txn.into_iter().flat_map(|(_parent_txn_id, entries)| entries.into_iter().cloned()).collect() } #[cfg(all(test, not(target_arch = "wasm32")))]