Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve sync command performance #3766

Merged
merged 10 commits into from
May 27, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ use futures_intrusive::sync::SharedSemaphore;
const MAX_SIMULTANEOUS_LOADED_MB: usize = 50;

// How many simultaneous chunks being created at once
const MAX_SIMULTANEOUS_CREATE_CHUNK: usize = 12;
const MAX_SIMULTANEOUS_CREATE_CHUNK: usize = 50;

// How many simultaneous Agent.call() to create_chunk
const MAX_SIMULTANEOUS_CREATE_CHUNK_CALLS: usize = 4;
const MAX_SIMULTANEOUS_CREATE_CHUNK_CALLS: usize = 25;

// How many simultaneous Agent.wait() on create_chunk result
const MAX_SIMULTANEOUS_CREATE_CHUNK_WAITS: usize = 4;
const MAX_SIMULTANEOUS_CREATE_CHUNK_WAITS: usize = 25;

pub(crate) struct Semaphores {
// The "file" semaphore limits how much file data to load at once. A given loaded file's data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,36 @@ use crate::canister_api::{
};
use crate::error::GetAssetPropertiesError;
use crate::error::GetAssetPropertiesError::GetAssetPropertiesFailed;
use futures_intrusive::sync::SharedSemaphore;
use ic_agent::{agent::RejectResponse, AgentError};
use ic_utils::call::SyncCall;
use ic_utils::Canister;
use std::collections::HashMap;

const MAX_CONCURRENT_REQUESTS: usize = 20;

pub(crate) async fn get_assets_properties(
canister: &Canister<'_>,
canister_assets: &HashMap<String, AssetDetails>,
) -> Result<HashMap<String, AssetProperties>, GetAssetPropertiesError> {
let semaphore = SharedSemaphore::new(true, MAX_CONCURRENT_REQUESTS);

let asset_ids = canister_assets.keys().cloned().collect::<Vec<_>>();
let futs = asset_ids
.iter()
.map(|asset_id| async {
semaphore.acquire(1).await;
get_asset_properties(canister, asset_id).await
})
.collect::<Vec<_>>();

let results = futures::future::join_all(futs).await;

let mut all_assets_properties = HashMap::new();
for asset_id in canister_assets.keys() {
match get_asset_properties(canister, asset_id).await {
for (index, result) in results.into_iter().enumerate() {
match result {
Ok(asset_properties) => {
all_assets_properties.insert(asset_id.to_string(), asset_properties);
all_assets_properties.insert(asset_ids[index].to_string(), asset_properties);
}
// older canisters don't have get_assets_properties method
// therefore we can break the loop
Expand All @@ -29,7 +45,7 @@ pub(crate) async fn get_assets_properties(
break;
}
Err(e) => {
return Err(GetAssetPropertiesFailed(asset_id.clone(), e));
return Err(GetAssetPropertiesFailed(asset_ids[index].clone(), e));
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/canisters/frontend/ic-asset/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,15 @@ pub async fn upload_content_and_assemble_sync_operations(
logger,
"Fetching properties for all assets in the canister."
);
let now = std::time::Instant::now();
let canister_asset_properties = get_assets_properties(canister, &canister_assets).await?;

info!(
logger,
"Done fetching properties for all assets in the canister. Took {:?}",
now.elapsed()
);

info!(logger, "Starting batch.");

let batch_id = create_batch(canister).await.map_err(CreateBatchFailed)?;
Expand Down
29 changes: 27 additions & 2 deletions src/canisters/frontend/icx-asset/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use crate::commands::upload::upload;
use anstyle::{AnsiColor, Style};
use candid::Principal;
use clap::builder::Styles;
use clap::{crate_authors, crate_version, Parser};
use clap::{crate_authors, crate_version, Parser, ValueEnum};
use ic_agent::identity::{AnonymousIdentity, BasicIdentity, Secp256k1Identity};
use ic_agent::{agent, Agent, Identity};
use slog::Level;
use std::path::PathBuf;

const DEFAULT_IC_GATEWAY: &str = "https://icp0.io";
Expand Down Expand Up @@ -37,6 +38,30 @@ struct Opts {

#[command(subcommand)]
subcommand: SubCommand,

#[arg(long, value_enum, default_value = "info")]
log_level: LogLevel,
}

#[derive(ValueEnum, Clone, Debug)]
enum LogLevel {
Trace,
Debug,
Info,
Warning,
Error,
}

impl From<LogLevel> for Level {
fn from(log_level: LogLevel) -> Self {
match log_level {
LogLevel::Trace => Level::Trace,
LogLevel::Debug => Level::Debug,
LogLevel::Info => Level::Info,
LogLevel::Warning => Level::Warning,
LogLevel::Error => Level::Error,
}
}
}

#[derive(Parser)]
Expand Down Expand Up @@ -107,7 +132,7 @@ fn style() -> Styles {
async fn main() -> anyhow::Result<()> {
let opts: Opts = Opts::parse();

let logger = support::new_logger();
let logger = support::new_logger(opts.log_level.into());

let agent = Agent::builder()
.with_transport(agent::http_transport::ReqwestTransport::create(
Expand Down
5 changes: 3 additions & 2 deletions src/canisters/frontend/icx-asset/src/support.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use slog::{Drain, Logger};
use slog::{Drain, Level, Logger};

pub struct TermLogFormat<D>
where
Expand Down Expand Up @@ -42,9 +42,10 @@ impl<D: slog_term::Decorator> slog::Drain for TermLogFormat<D> {
}
}

pub(crate) fn new_logger() -> Logger {
pub(crate) fn new_logger(level: Level) -> Logger {
let decorator = slog_term::TermDecorator::new().build();
let drain = TermLogFormat::new(decorator).fuse();
let drain = slog::LevelFilter::new(drain, level).fuse();
let drain = slog_async::Async::new(drain).build().fuse();
Logger::root(drain, slog::o!())
}
Loading