From badb3a075df9ddc4b86a08050507c3c4e08cc5b2 Mon Sep 17 00:00:00 2001 From: Mike Hilgendorf Date: Wed, 15 Jan 2025 13:23:21 -0600 Subject: [PATCH] fix: checkin improvements - conversion impls in tangram_client - defer writing graph objects to output stage of checkin (#436) - write all objects in checkin in single message to SQLite (#437) --- packages/client/src/graph/data.rs | 2 +- packages/client/src/object/data.rs | 10 + packages/server/src/artifact/checkin.rs | 2 +- .../server/src/artifact/checkin/lockfile.rs | 29 +- .../server/src/artifact/checkin/object.rs | 59 ++-- .../server/src/artifact/checkin/output.rs | 290 +++++++++++++----- 6 files changed, 263 insertions(+), 129 deletions(-) diff --git a/packages/client/src/graph/data.rs b/packages/client/src/graph/data.rs index 7198941a6..f50563dbe 100644 --- a/packages/client/src/graph/data.rs +++ b/packages/client/src/graph/data.rs @@ -11,7 +11,7 @@ pub struct Graph { pub nodes: Vec, } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, Debug, serde::Deserialize, serde::Serialize, derive_more::TryUnwrap)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum Node { Directory(Directory), diff --git a/packages/client/src/object/data.rs b/packages/client/src/object/data.rs index d17f99fc4..a0e95be05 100644 --- a/packages/client/src/object/data.rs +++ b/packages/client/src/object/data.rs @@ -65,3 +65,13 @@ impl Object { } } } + +impl From for Object { + fn from(value: tg::artifact::Data) -> Self { + match value { + tg::artifact::data::Artifact::Directory(data) => Object::Directory(data), + tg::artifact::data::Artifact::File(data) => Object::File(data), + tg::artifact::data::Artifact::Symlink(data) => Object::Symlink(data), + } + } +} diff --git a/packages/server/src/artifact/checkin.rs b/packages/server/src/artifact/checkin.rs index 5c5dd3bda..2a50c2a4a 100644 --- a/packages/server/src/artifact/checkin.rs +++ b/packages/server/src/artifact/checkin.rs @@ -120,7 +120,7 @@ impl Server { .map_err(|source| tg::error!(!source, "failed to write objects"))?; // Write the output to the database. - self.write_output_to_database(&output_graph) + self.write_output_to_database(output_graph.clone(), object_graph.clone()) .await .map_err(|source| tg::error!(!source, "failed to write to the database"))?; diff --git a/packages/server/src/artifact/checkin/lockfile.rs b/packages/server/src/artifact/checkin/lockfile.rs index ee40bf97c..1fb8745e5 100644 --- a/packages/server/src/artifact/checkin/lockfile.rs +++ b/packages/server/src/artifact/checkin/lockfile.rs @@ -116,6 +116,7 @@ impl Server { data.clone() } else { let id: tg::artifact::Id = id.try_into().unwrap(); + tg::Artifact::with_id(id.clone()).data(self).await.map_err( |source| tg::error!(!source, %artifact = id, "missing artifact in object graph"), )? @@ -184,12 +185,13 @@ impl Server { ) -> tg::Result { let id = tg::file::Id::new(&data.serialize()?); let (contents, executable) = match data { - tg::file::Data::Graph { graph, node } => { - let file = tg::Graph::with_id(graph.clone()).object(self).await?.nodes[*node] - .clone() - .try_unwrap_file() - .unwrap(); - let contents = file.contents.id(self).await?; + tg::file::Data::Graph { + graph: graph_id, + node, + } => { + let graph = &graph.graphs.get(graph_id).unwrap().0; + let file = graph.nodes[*node].clone().try_unwrap_file().unwrap(); + let contents = file.contents; let executable = file.executable; (contents, executable) }, @@ -234,17 +236,18 @@ impl Server { ) -> tg::Result { let id = tg::symlink::Id::new(&data.serialize()?); let subpath = match data { - tg::symlink::Data::Graph { graph, node } => { - let symlink = tg::Graph::with_id(graph.clone()).object(self).await?.nodes[*node] - .clone() - .try_unwrap_symlink() - .unwrap(); + tg::symlink::Data::Graph { + graph: graph_id, + node, + } => { + let graph = &graph.graphs.get(graph_id).unwrap().0; + let symlink = graph.nodes[*node].clone().try_unwrap_symlink().unwrap(); match symlink { - tg::graph::object::Symlink::Artifact { + tg::graph::data::Symlink::Artifact { artifact: _, subpath, } => subpath, - tg::graph::object::Symlink::Target { target } => Some(target), + tg::graph::data::Symlink::Target { target } => Some(target), } }, tg::symlink::Data::Target { target } => Some(PathBuf::from(target)), diff --git a/packages/server/src/artifact/checkin/object.rs b/packages/server/src/artifact/checkin/object.rs index cc45b454c..0b98a4d4d 100644 --- a/packages/server/src/artifact/checkin/object.rs +++ b/packages/server/src/artifact/checkin/object.rs @@ -5,12 +5,14 @@ use std::{ collections::{BTreeMap, BTreeSet}, os::unix::fs::PermissionsExt as _, path::{Path, PathBuf}, + sync::Arc, }; use tangram_client::{self as tg, handle::Ext}; use tangram_either::Either; #[derive(Debug)] pub struct Graph { + pub graphs: BTreeMap, pub indices: BTreeMap, pub nodes: Vec, pub paths: BTreeMap, @@ -50,7 +52,7 @@ impl Server { input: &input::Graph, unify: &unify::Graph, root: &unify::Id, - ) -> tg::Result { + ) -> tg::Result> { let mut indices = BTreeMap::new(); let mut paths: BTreeMap = BTreeMap::new(); let mut nodes = Vec::with_capacity(unify.nodes.len()); @@ -60,13 +62,14 @@ impl Server { .await; let mut graph = Graph { + graphs: BTreeMap::new(), indices, nodes, paths, objects, }; self.create_objects(input, &mut graph).await?; - Ok(graph) + Ok(Arc::new(graph)) } async fn create_object_graph_inner( @@ -132,7 +135,6 @@ impl Server { input: &input::Graph, graph: &mut Graph, ) -> tg::Result<()> { - let mut graph_metadata = BTreeMap::new(); let mut file_metadata = BTreeMap::new(); // Partition the graph into its strongly connected components. @@ -193,14 +195,14 @@ impl Server { graph.nodes[index].id.replace(id.clone().into()); graph.objects.insert(id.into(), index); + // Update the graph. + let bytes = data.serialize()?; + let id = tg::artifact::Id::new(data.kind(), &bytes); + graph.nodes[index].data.replace(data.clone()); + graph.nodes[index].id.replace(id.clone().into()); + // Get the metadata. - let metadata = self.compute_object_metadata( - graph, - index, - &data, - &file_metadata, - &graph_metadata, - ); + let metadata = self.compute_object_metadata(graph, index, &data, &file_metadata); graph.nodes[index].metadata.replace(metadata); } else { // Otherwise, construct an object graph. @@ -211,10 +213,12 @@ impl Server { // Store the graph. let bytes = object_graph.serialize()?; let id = tg::graph::Id::new(&bytes); - let arg = tg::object::put::Arg { bytes }; - self.put_object(&id.clone().into(), arg).await?; + let metadata = + self.compute_graph_metadata(graph, &object_graph, &scc, &file_metadata); + graph + .graphs + .insert(id.clone(), (object_graph.clone(), metadata)); - // Update data. for old_index in scc.iter().copied() { // Get the index within the object graph. let new_index = indices.get(&old_index).copied().unwrap(); @@ -246,11 +250,6 @@ impl Server { graph.objects.insert(id.into(), old_index); } - // Update the graph metadata. - let metadata = - self.compute_graph_metadata(graph, &object_graph, &scc, &file_metadata); - graph_metadata.insert(id.clone(), metadata); - // Update metadata. for old_index in scc { // Get the metadata. @@ -260,7 +259,6 @@ impl Server { old_index, data, &file_metadata, - &graph_metadata, ); graph.nodes[old_index].metadata.replace(metadata); } @@ -711,7 +709,6 @@ impl Server { index: usize, data: &tg::artifact::Data, file_metadata: &BTreeMap, - graph_metadata: &BTreeMap, ) -> tg::object::Metadata { if let Some(metadata) = graph.nodes[index].metadata.clone() { return metadata; @@ -730,10 +727,16 @@ impl Server { let mut weight = weight.map(|weight| weight + data_size); match data { - tg::artifact::Data::Directory(tg::directory::Data::Graph { graph, .. }) - | tg::artifact::Data::File(tg::file::Data::Graph { graph, .. }) - | tg::artifact::Data::Symlink(tg::symlink::Data::Graph { graph, .. }) => { - let metadata = graph_metadata.get(graph).unwrap(); + tg::artifact::Data::Directory(tg::directory::Data::Graph { + graph: graph_id, .. + }) + | tg::artifact::Data::File(tg::file::Data::Graph { + graph: graph_id, .. + }) + | tg::artifact::Data::Symlink(tg::symlink::Data::Graph { + graph: graph_id, .. + }) => { + let metadata = &graph.graphs.get(graph_id).unwrap().1; complete &= metadata.complete; if let Some(c) = metadata.count { count = count.map(|count| count + c); @@ -761,13 +764,7 @@ impl Server { let metadata = graph.nodes[edge.index].metadata.clone().unwrap_or_else(|| { if let Some(data) = graph.nodes[edge.index].data.as_ref() { - self.compute_object_metadata( - graph, - edge.index, - data, - file_metadata, - graph_metadata, - ) + self.compute_object_metadata(graph, edge.index, data, file_metadata) } else { tg::object::Metadata { complete: false, diff --git a/packages/server/src/artifact/checkin/output.rs b/packages/server/src/artifact/checkin/output.rs index 55a474af2..04cb9c771 100644 --- a/packages/server/src/artifact/checkin/output.rs +++ b/packages/server/src/artifact/checkin/output.rs @@ -1,16 +1,18 @@ use super::{input, object}; use crate::{temp::Temp, Server}; use futures::{stream::FuturesUnordered, FutureExt, StreamExt, TryStreamExt as _}; -use indoc::formatdoc; +use indoc::{formatdoc, indoc}; +use itertools::Itertools; use std::{ collections::BTreeSet, ffi::OsStr, os::unix::fs::PermissionsExt as _, path::{Path, PathBuf}, - sync::RwLock, + sync::{Arc, RwLock}, }; use tangram_client as tg; -use tangram_database::{self as db, prelude::*}; +use tangram_database::{self as db, prelude::*, Transaction}; +use tangram_either::Either; use time::format_description::well_known::Rfc3339; #[derive(Debug)] @@ -45,7 +47,7 @@ impl Server { &self, input: &input::Graph, object: &object::Graph, - ) -> tg::Result { + ) -> tg::Result> { // Create the state. let state = RwLock::new(State { nodes: Vec::with_capacity(object.nodes.len()), @@ -61,7 +63,7 @@ impl Server { nodes: state.into_inner().unwrap().nodes, }; - Ok(output) + Ok(Arc::new(output)) } async fn try_create_output_graph_inner( @@ -134,7 +136,11 @@ impl Server { Ok(Some(output_index)) } - pub async fn write_output_to_database(&self, output: &Graph) -> tg::Result<()> { + pub async fn write_output_to_database( + &self, + output: Arc, + object: Arc, + ) -> tg::Result<()> { // Get a database connection. let mut connection = self .database @@ -148,88 +154,94 @@ impl Server { .await .map_err(|source| tg::error!(!source, "failed to create a transaction"))?; - // Get the output in reverse-topological order. - let mut stack = vec![0]; - let mut visited = vec![false; output.nodes.len()]; - while let Some(output_index) = stack.pop() { - // Check if we've visited this node yet. - if visited[output_index] { - continue; - } - visited[output_index] = true; + match transaction { + Either::Left(sqlite) => { + sqlite + .with(move |transaction| { + for (id, (data, metadata)) in &object.graphs { + write_object_sqlite( + transaction, + &id.clone().into(), + &data.clone().into(), + metadata, + )?; + } + + // Get the output in reverse-topological order. + let mut stack = vec![0]; + let mut visited = vec![false; output.nodes.len()]; + while let Some(output_index) = stack.pop() { + // Check if we've visited this node yet. + if visited[output_index] { + continue; + } + visited[output_index] = true; + + // Get the output data. + let output = &output.nodes[output_index]; + + // Write the object. + write_object_sqlite( + transaction, + &output.id.clone().into(), + &output.data.clone().into(), + &output.metadata, + )?; + stack.extend(output.edges.iter().map(|edge| edge.node)); + } + + Ok::<_, tg::Error>(()) + }) + .await?; + sqlite + .commit() + .await + .map_err(|source| tg::error!(!source, "failed to commit the transaction"))?; + }, + Either::Right(transaction) => { + // Write graphs. + for (id, (data, metadata)) in &object.graphs { + write_object_postgres( + &transaction, + &id.clone().into(), + &data.clone().into(), + metadata, + ) + .await?; + } - // Get the output data. - let output = &output.nodes[output_index]; - - // Write to the database. - let p = transaction.p(); - let statement = formatdoc!( - " - insert into objects (id, bytes, complete, count, depth, weight, touched_at) - values ({p}1, {p}2, {p}3, {p}4, {p}5, {p}6, {p}7) - on conflict (id) do update set touched_at = {p}7; - " - ); - let now = time::OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); - - let params: Vec = db::params![ - output.id, - output.data.serialize()?, - output.metadata.complete, - output.metadata.count, - output.metadata.depth, - output.metadata.weight, - now - ]; - transaction - .execute(statement, params) - .await - .map_err(|source| { - tg::error!(!source, "failed to put the artifact into the database") - })?; - - // Insert the object's children into the database. - output - .data - .children() - .iter() - .map(|child| { - let id = output.id.clone(); - let transaction = &transaction; - async move { - let statement = formatdoc!( - " - insert into object_children (object, child) - values ({p}1, {p}2) - on conflict (object, child) do nothing; - " - ); - let params = db::params![id, child]; - transaction - .execute(statement, params) - .await - .map_err(|source| { - tg::error!( - !source, - "failed to put the object children into the database" - ) - }) - .ok() + // Get the output in reverse-topological order. + let mut stack = vec![0]; + let mut visited = vec![false; output.nodes.len()]; + while let Some(output_index) = stack.pop() { + // Check if we've visited this node yet. + if visited[output_index] { + continue; } - }) - .collect::>() - .collect::>() - .await; + visited[output_index] = true; + + // Get the output data. + let output = &output.nodes[output_index]; + + // Write the object. + write_object_postgres( + &transaction, + &output.id.clone().into(), + &output.data.clone().into(), + &output.metadata, + ) + .await?; + stack.extend(output.edges.iter().map(|edge| edge.node)); + } - stack.extend(output.edges.iter().map(|edge| edge.node)); + // Commit the transaction. + transaction + .commit() + .await + .map_err(|source| tg::error!(!source, "failed to commit the transaction"))?; + }, } - // Commit the transaction. - transaction - .commit() - .await - .map_err(|source| tg::error!(!source, "failed to commit the transaction"))?; - Ok(()) } @@ -693,3 +705,115 @@ fn set_file_times_to_epoch_inner( Ok(()) } + +fn write_object_sqlite( + transaction: &mut rusqlite::Transaction<'_>, + id: &tg::object::Id, + data: &tg::object::Data, + metadata: &tg::object::Metadata, +) -> tg::Result<()> { + let statement = indoc!( + r#" + insert into objects (id, bytes, complete, count, depth, weight, touched_at) + values (?1, ?2, ?3, ?4, ?5, ?6, ?7) + on conflict (id) do update set touched_at = ?7 + "# + ); + let mut statement = transaction + .prepare_cached(statement) + .map_err(|source| tg::error!(!source, "failed to prepare statement"))?; + let now = time::OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); + let params: Vec = db::params![ + id, + data.serialize()?, + metadata.complete, + metadata.count, + metadata.depth, + metadata.weight, + now + ]; + statement + .execute(rusqlite::params_from_iter(params)) + .map_err(|source| tg::error!(!source, "failed to execute statement"))?; + data.children() + .iter() + .map(|child| { + let statement = indoc!( + r#" + insert into object_children (object, child) + values (?1, ?2) + on conflict (object, child) do nothing; + "# + ); + let mut statement = transaction + .prepare_cached(statement) + .map_err(|source| tg::error!(!source, "failed to prepare statement"))?; + let params: Vec = db::params![id, child,]; + statement + .execute(rusqlite::params_from_iter(params)) + .map_err(|source| tg::error!(!source, "failed to execute the statement"))?; + Ok(()) + }) + .try_collect() +} + +async fn write_object_postgres( + transaction: &tangram_database::postgres::Transaction<'_>, + id: &tg::object::Id, + data: &tg::object::Data, + metadata: &tg::object::Metadata, +) -> tg::Result<()> { + // Write to the database. + let p = transaction.p(); + let statement = formatdoc!( + " + insert into objects (id, bytes, complete, count, depth, weight, touched_at) + values ({p}1, {p}2, {p}3, {p}4, {p}5, {p}6, {p}7) + on conflict (id) do update set touched_at = {p}7; + " + ); + let now = time::OffsetDateTime::now_utc().format(&Rfc3339).unwrap(); + let params: Vec = db::params![ + id, + data.serialize()?, + metadata.complete, + metadata.count, + metadata.depth, + metadata.weight, + now + ]; + transaction + .execute(statement, params) + .await + .map_err(|source| tg::error!(!source, "failed to put the artifact into the database"))?; + data.children() + .iter() + .map(|child| { + let id = id.clone(); + let transaction = &transaction; + async move { + let statement = formatdoc!( + " + insert into object_children (object, child) + values ({p}1, {p}2) + on conflict (object, child) do nothing; + " + ); + let params = db::params![id, child]; + transaction + .execute(statement, params) + .await + .map_err(|source| { + tg::error!( + !source, + "failed to put the object children into the database" + ) + }) + .ok() + } + }) + .collect::>() + .collect::>() + .await; + Ok(()) +}