Skip to content

Commit

Permalink
refactor - async column fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
jerbly committed Jan 13, 2024
1 parent a6705cd commit be142b5
Show file tree
Hide file tree
Showing 3 changed files with 159 additions and 62 deletions.
71 changes: 61 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ axum = "0.7.2"
chrono = { version = "0.4.31", features = ["serde"] }
clap = { version = "4.4.12", features = ["derive"] }
dotenv = "0.15.0"
futures = "0.3.30"
glob = "0.3.1"
openssl = { version = "0.10.61", features = ["vendored"] }
reqwest = { version = "0.11.23", features = ["json"] }
Expand Down
149 changes: 97 additions & 52 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ use axum::{
use chrono::Utc;
use clap::Parser;
use data::Node;
use futures::stream::{FuturesUnordered, StreamExt};
use honeycomb::HoneyComb;
use semconv::{Attribute, Examples};
use semconv::{Attribute, Examples, SemanticConventions};

#[derive(Template)]
#[template(path = "index.html")]
Expand Down Expand Up @@ -95,63 +96,15 @@ async fn main() -> anyhow::Result<()> {
));
}

let sc = semconv::SemanticConventions::new(&root_dirs)?;
let sc = SemanticConventions::new(&root_dirs)?;
let mut root = Node::new("root".to_string(), None);
let mut keys: Vec<_> = sc.attribute_map.keys().collect();
keys.sort();

// fetch all the honeycomb data and build a map of attribute name to datasets
let hc = match HoneyComb::new() {
Ok(hc) => {
let auth = hc.list_authorizations().await?;
let required_access = ["columns", "createDatasets", "queries"];
if auth.has_required_access(&required_access) {
Some(hc)
} else {
eprintln!(
"continuing without honeycomb: missing required access {:?}:\n{}",
required_access, auth
);
None
}
}
Err(e) => {
eprintln!("continuing without honeycomb: {}", e);
None
}
};

let hc = get_honeycomb().await?;
if let Some(hc) = &hc {
let now = Utc::now();
let mut datasets = hc
.list_all_datasets()
.await?
.iter()
.filter_map(|d| {
if (now - d.last_written_at.unwrap_or(now)).num_days() < 60 {
Some(d.slug.clone())
} else {
None
}
})
.collect::<Vec<_>>();
datasets.sort();

let mut attributes_used_by_datasets: HashMap<String, Vec<String>> = HashMap::new();

for dataset in datasets {
println!("fetching columns for dataset: {}", dataset);
let columns = hc.list_all_columns(&dataset).await?;
for column in columns {
if sc.attribute_map.contains_key(&column.key_name) {
let datasets = attributes_used_by_datasets
.entry(column.key_name.clone())
.or_insert(vec![]);
datasets.push(dataset.clone());
}
}
}

let attributes_used_by_datasets = get_attributes_used_by_datasets(hc, &sc).await?;
for k in keys {
let mut attribute = sc.attribute_map[k].clone();
if let Some(datasets) = attributes_used_by_datasets.get(k) {
Expand Down Expand Up @@ -183,6 +136,98 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn get_attributes_used_by_datasets(
hc: &HoneyComb,
sc: &SemanticConventions,
) -> anyhow::Result<HashMap<String, Vec<String>>> {
let now = Utc::now();
let mut datasets = hc
.list_all_datasets()
.await?
.iter()
.filter_map(|d| {
if (now - d.last_written_at.unwrap_or(now)).num_days() < 60 {
Some(d.slug.clone())
} else {
None
}
})
.collect::<Vec<_>>();
datasets.sort();

let mut attributes_used_by_datasets: HashMap<String, Vec<String>> = HashMap::new();

let mut tasks = FuturesUnordered::new();

for dataset in datasets {
let dataset_clone = dataset.clone();
let hc_clone = hc.clone();
tasks.push(async move {
let columns = hc_clone.list_all_columns(&dataset_clone).await;
match columns {
Ok(columns) => (dataset_clone, columns),
Err(e) => {
eprintln!(
"error fetching columns for dataset {}: {}",
dataset_clone, e
);
(dataset_clone, vec![])
}
}
});
}

while let Some((dataset, columns)) = tasks.next().await {
println!("mapping columns for dataset: {}", dataset);
for column in columns {
if sc.attribute_map.contains_key(&column.key_name) {
let datasets = attributes_used_by_datasets
.entry(column.key_name.clone())
.or_insert(vec![]);
datasets.push(dataset.clone());
}
}
}

// for dataset in datasets {
// println!("fetching columns for dataset: {}", dataset);
// let columns = hc.list_all_columns(&dataset).await?;
// for column in columns {
// if sc.attribute_map.contains_key(&column.key_name) {
// let datasets = attributes_used_by_datasets
// .entry(column.key_name.clone())
// .or_insert(vec![]);
// datasets.push(dataset.clone());
// }
// }
// }

Ok(attributes_used_by_datasets)
}

async fn get_honeycomb() -> anyhow::Result<Option<HoneyComb>> {
let hc = match HoneyComb::new() {
Ok(hc) => {
let auth = hc.list_authorizations().await?;
let required_access = ["columns", "createDatasets", "queries"];
if auth.has_required_access(&required_access) {
Some(hc)
} else {
eprintln!(
"continuing without honeycomb: missing required access {:?}:\n{}",
required_access, auth
);
None
}
}
Err(e) => {
eprintln!("continuing without honeycomb: {}", e);
None
}
};
Ok(hc)
}

async fn handler() -> impl IntoResponse {
IndexTemplate {
node: "root".to_owned(),
Expand Down

0 comments on commit be142b5

Please sign in to comment.