diff --git a/Cargo.lock b/Cargo.lock index be5a29a..ffb597c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -461,43 +461,93 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.29" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" + +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] [[package]] name = "futures-sink" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-util" -version = "0.3.29" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -579,6 +629,7 @@ dependencies = [ "chrono", "clap", "dotenv", + "futures", "glob", "openssl", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index 45c6b82..3f8814e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ axum = "0.7.2" chrono = { version = "0.4.31", features = ["serde"] } clap = { version = "4.4.12", features = ["derive"] } dotenv = "0.15.0" +futures = "0.3.30" glob = "0.3.1" openssl = { version = "0.10.61", features = ["vendored"] } reqwest = { version = "0.11.23", features = ["json"] } diff --git a/src/main.rs b/src/main.rs index 612953c..588e3b2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,8 +17,9 @@ use axum::{ use chrono::Utc; use clap::Parser; use data::Node; +use futures::stream::{FuturesUnordered, StreamExt}; use honeycomb::HoneyComb; -use semconv::{Attribute, Examples}; +use semconv::{Attribute, Examples, SemanticConventions}; #[derive(Template)] #[template(path = "index.html")] @@ -95,63 +96,15 @@ async fn main() -> anyhow::Result<()> { )); } - let sc = semconv::SemanticConventions::new(&root_dirs)?; + let sc = SemanticConventions::new(&root_dirs)?; let mut root = Node::new("root".to_string(), None); let mut keys: Vec<_> = sc.attribute_map.keys().collect(); keys.sort(); // fetch all the honeycomb data and build a map of attribute name to datasets - let hc = match HoneyComb::new() { - Ok(hc) => { - let auth = hc.list_authorizations().await?; - let required_access = ["columns", "createDatasets", "queries"]; - if auth.has_required_access(&required_access) { - Some(hc) - } else { - eprintln!( - "continuing without honeycomb: missing required access {:?}:\n{}", - required_access, auth - ); - None - } - } - Err(e) => { - eprintln!("continuing without honeycomb: {}", e); - None - } - }; - + let hc = get_honeycomb().await?; if let Some(hc) = &hc { - let now = Utc::now(); - let mut datasets = hc - .list_all_datasets() - .await? - .iter() - .filter_map(|d| { - if (now - d.last_written_at.unwrap_or(now)).num_days() < 60 { - Some(d.slug.clone()) - } else { - None - } - }) - .collect::>(); - datasets.sort(); - - let mut attributes_used_by_datasets: HashMap> = HashMap::new(); - - for dataset in datasets { - println!("fetching columns for dataset: {}", dataset); - let columns = hc.list_all_columns(&dataset).await?; - for column in columns { - if sc.attribute_map.contains_key(&column.key_name) { - let datasets = attributes_used_by_datasets - .entry(column.key_name.clone()) - .or_insert(vec![]); - datasets.push(dataset.clone()); - } - } - } - + let attributes_used_by_datasets = get_attributes_used_by_datasets(hc, &sc).await?; for k in keys { let mut attribute = sc.attribute_map[k].clone(); if let Some(datasets) = attributes_used_by_datasets.get(k) { @@ -183,6 +136,98 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +async fn get_attributes_used_by_datasets( + hc: &HoneyComb, + sc: &SemanticConventions, +) -> anyhow::Result>> { + let now = Utc::now(); + let mut datasets = hc + .list_all_datasets() + .await? + .iter() + .filter_map(|d| { + if (now - d.last_written_at.unwrap_or(now)).num_days() < 60 { + Some(d.slug.clone()) + } else { + None + } + }) + .collect::>(); + datasets.sort(); + + let mut attributes_used_by_datasets: HashMap> = HashMap::new(); + + let mut tasks = FuturesUnordered::new(); + + for dataset in datasets { + let dataset_clone = dataset.clone(); + let hc_clone = hc.clone(); + tasks.push(async move { + let columns = hc_clone.list_all_columns(&dataset_clone).await; + match columns { + Ok(columns) => (dataset_clone, columns), + Err(e) => { + eprintln!( + "error fetching columns for dataset {}: {}", + dataset_clone, e + ); + (dataset_clone, vec![]) + } + } + }); + } + + while let Some((dataset, columns)) = tasks.next().await { + println!("mapping columns for dataset: {}", dataset); + for column in columns { + if sc.attribute_map.contains_key(&column.key_name) { + let datasets = attributes_used_by_datasets + .entry(column.key_name.clone()) + .or_insert(vec![]); + datasets.push(dataset.clone()); + } + } + } + + // for dataset in datasets { + // println!("fetching columns for dataset: {}", dataset); + // let columns = hc.list_all_columns(&dataset).await?; + // for column in columns { + // if sc.attribute_map.contains_key(&column.key_name) { + // let datasets = attributes_used_by_datasets + // .entry(column.key_name.clone()) + // .or_insert(vec![]); + // datasets.push(dataset.clone()); + // } + // } + // } + + Ok(attributes_used_by_datasets) +} + +async fn get_honeycomb() -> anyhow::Result> { + let hc = match HoneyComb::new() { + Ok(hc) => { + let auth = hc.list_authorizations().await?; + let required_access = ["columns", "createDatasets", "queries"]; + if auth.has_required_access(&required_access) { + Some(hc) + } else { + eprintln!( + "continuing without honeycomb: missing required access {:?}:\n{}", + required_access, auth + ); + None + } + } + Err(e) => { + eprintln!("continuing without honeycomb: {}", e); + None + } + }; + Ok(hc) +} + async fn handler() -> impl IntoResponse { IndexTemplate { node: "root".to_owned(),