Skip to content

Commit

Permalink
Refactor main (#16)
Browse files Browse the repository at this point in the history
* fix: update egui and eframe to 0.27.2

* fix: update opendal to 0.47.1
  • Loading branch information
fireyy authored Jul 1, 2024
1 parent f77bc87 commit 09a639b
Show file tree
Hide file tree
Showing 20 changed files with 1,993 additions and 662 deletions.
1,998 changes: 1,496 additions & 502 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ cc_ui = { path = "crates/cc_ui" }
cc_storage = { path = "crates/cc_storage" }
cc_runtime = { path = "crates/cc_runtime" }
cc_files = { path = "crates/cc_files" }
eframe = { version = "0.23.0", features = ["persistence"] }
egui = "0.23.0"
egui_extras = { version = "0.23.0", features = ["all_loaders"] }
eframe = { version = "0.27.2", features = ["persistence"] }
egui = "0.27.2"
egui_extras = { version = "0.27.2", features = ["all_loaders"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1.0"
tracing = "0.1.29"
Expand Down
2 changes: 1 addition & 1 deletion crates/cc_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ anyhow = "1.0"
tracing = { workspace = true }
bytesize = "1.1.0"
chrono = "0.4.24"
opendal = { version = "0.44.2", default-features = false, features = [
opendal = { version = "0.47.1", default-features = false, features = [
"services-azblob",
"services-gcs",
"services-oss",
Expand Down
185 changes: 80 additions & 105 deletions crates/cc_storage/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
use std::io::Write;
use std::path::PathBuf;
use std::sync::Arc;

use crate::config::ClientConfig;
use crate::partial_file::PartialFile;
use crate::transfer::{TransferProgressInfo, TransferSender, TransferType};
use crate::types::{Bucket, ListObjects, ListObjectsV2Params, Object, Params};
use crate::util::get_name;
use crate::Result;
use anyhow::Context;
use cc_core::ServiceType;
use futures::StreamExt;

use crate::services;
use opendal::{Metadata, Metakey, Operator};
use tokio::{
fs,
io::{self, AsyncWriteExt as _},
use crate::stream::{
AsyncReadProgressExt, BoxedStreamingUploader, StreamingUploader, TrackableBodyStream,
};

const DEFAULT_BUF_SIZE: usize = 8 * 1024 * 1024;
use futures::{AsyncReadExt, StreamExt, TryStreamExt};
use opendal::{Metadata, Metakey, Operator};

#[derive(Clone)]
pub struct Client {
Expand Down Expand Up @@ -102,9 +102,9 @@ impl Client {
pub async fn delete_object(&self, object: impl AsRef<str>) -> Result<bool> {
let object = object.as_ref();
self.operator.delete(object).await?;
// let result = self.operator.is_exist(object).await?;
let result = self.operator.is_exist(object).await?;

Ok(true)
Ok(result)
}

pub async fn delete_multi_object(self, obj: Vec<Object>) -> Result<bool> {
Expand All @@ -122,7 +122,6 @@ impl Client {

pub async fn list_v2(&self, query: ListObjectsV2Params) -> Result<ListObjects> {
tracing::debug!("List object: {:?}", query);
// let path = query.prefix.map_or("".into(), |x| format!("{x}/"));
let mut path = query.prefix;
if !path.is_empty() && !path.ends_with('/') {
path.push('/');
Expand Down Expand Up @@ -199,66 +198,92 @@ impl Client {
Ok((src.to_string(), is_move))
}

fn streaming_upload(&self, path: &str) -> Result<BoxedStreamingUploader> {
Ok(Box::new(StreamingUploader::new(
self.operator.clone(),
path.to_string(),
)))
}

async fn streaming_read(&self, path: &str, transfer: TransferSender) -> Result<Vec<u8>> {
let reader = self.operator.reader(path).await?;

let size = self.meta_data(path).await?.content_length();
let mut body = Vec::new();

let mut stream = reader
.into_futures_async_read(0..)
.await?
.report_progress(|bytes_read| {
transfer
.send(TransferType::Download(
path.to_string(),
TransferProgressInfo {
total_bytes: size,
transferred_bytes: bytes_read as u64,
},
))
.unwrap();
});

stream
.read_to_end(&mut body)
.await
.context("failed to read object content into buffer")?;

Ok(body)
}

pub async fn download_file(
&self,
obj: &str,
target: PathBuf,
transfer: TransferSender,
) -> Result<()> {
let remote_op = self.operator.clone();
let progress_tx = transfer.clone();
let oid = obj.to_string();
let total_bytes = self.meta_data(obj).await?.content_length();

tokio::spawn(async move {
let _: Result<Option<String>> = async {
fs::create_dir_all(target.parent().unwrap()).await?;
let mut reader = remote_op.reader_with(&oid).buffer(DEFAULT_BUF_SIZE).await?;
let mut writer = io::BufWriter::new(fs::File::create(&target).await?);
copy_with_progress(
"download",
&progress_tx,
&oid,
total_bytes,
&mut reader,
&mut writer,
)
.await?;
writer.shutdown().await?;
Ok(Some(target.to_string_lossy().into()))
}
.await;
});
let mut new_file = PartialFile::create(&target)
.with_context(|| format!("create `{}`", target.display()))?;

let content = self.streaming_read(obj, transfer).await?;

new_file
.write_all(&content)
.context("write content of file")?;
new_file.finish().context("finish writing to new file")?;

Ok(())
}

pub async fn put(&self, path: PathBuf, dest: &str, transfer: &TransferSender) -> Result<()> {
let name = get_name(&path);
let key = format!("{dest}{name}");
let remote_op = self.operator.clone();

let mut body = TrackableBodyStream::try_from(path)
.map_err(|e| {
panic!("Could not open sample file: {e}");
})
.unwrap();
let progress_tx = transfer.clone();
let total_bytes = fs::metadata(&path).await?.len();

tokio::spawn(async move {
let _: Result<Option<String>> = async {
let mut reader = io::BufReader::new(fs::File::open(path).await?);
let mut writer = remote_op.writer_with(&key).buffer(DEFAULT_BUF_SIZE).await?;
copy_with_progress(
"upload",
&progress_tx,
&key,
total_bytes,
&mut reader,
&mut writer,
)
.await?;
writer.close().await?;
Ok(None)
}
.await;
});

body.set_callback(
&key,
move |key: &str, tot_size: u64, sent: u64, _cur_buf: u64| {
progress_tx
.send(TransferType::Upload(
key.to_string(),
TransferProgressInfo {
total_bytes: tot_size,
transferred_bytes: sent,
},
))
.unwrap();
},
);

let mut uploader = self.streaming_upload(&key)?;
while let Ok(Some(bytes)) = body.try_next().await {
uploader.write_bytes(bytes).await?;
}
uploader.finish().await?;
// TODO: check if put success

Ok(())
Expand Down Expand Up @@ -331,53 +356,3 @@ impl ClientBuilder {
Client::new(self.config)
}
}

async fn copy_with_progress<R, W>(
tp: &str,
progress_sender: &TransferSender,
key: &str,
total_bytes: u64,
mut reader: R,
mut writer: W,
) -> io::Result<usize>
where
R: io::AsyncReadExt + Unpin,
W: io::AsyncWriteExt + Unpin,
{
let mut bytes_so_far: usize = 0;
let mut buf = vec![0; DEFAULT_BUF_SIZE];

loop {
let bytes_since_last = reader.read(&mut buf).await?;
if bytes_since_last == 0 {
break;
}
writer.write_all(&buf[..bytes_since_last]).await?;
bytes_so_far += bytes_since_last;
let msg = if tp == "download" {
TransferType::Download(
key.to_string(),
TransferProgressInfo {
total_bytes: total_bytes as usize,
transferred_bytes: bytes_so_far,
},
)
} else {
TransferType::Upload(
key.to_string(),
TransferProgressInfo {
total_bytes: total_bytes as usize,
transferred_bytes: bytes_so_far,
},
)
};
send_response(progress_sender, msg).await;
}

Ok(bytes_so_far)
}

async fn send_response(sender: &TransferSender, msg: TransferType) {
// tracing::debug!("response: {}", &msg);
sender.send(msg).unwrap();
}
2 changes: 2 additions & 0 deletions crates/cc_storage/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,5 @@ pub enum OSSError {
#[error("{0}")]
WithDescription(String),
}

pub type ObjectResult<T> = std::result::Result<T, anyhow::Error>;
4 changes: 2 additions & 2 deletions crates/cc_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ pub type Result<T> = anyhow::Result<T>;
mod client;
mod config;
mod error;
// mod partial_file;
mod partial_file;
mod services;
// mod stream;
mod stream;
mod transfer;
mod types;
pub mod util;
Expand Down
Loading

0 comments on commit 09a639b

Please sign in to comment.