Skip to content

Commit

Permalink
Store timestamp when marking subgraph as synced (#5566)
Browse files Browse the repository at this point in the history
We migrate `synced` from a boolean value to a nullable `synced_at`
timestamp on `subgraphs.subgraph_deployment` and `unused_deployments`.
The default timestamp used in the migration is the unix epoch,
`1970-01-01 00:00:00+00`.

Note that in the down migration we skip removal of `DEFAULT false` on
`unused_deployments.`
  • Loading branch information
encalypto authored Aug 8, 2024
1 parent cd5d7d9 commit 2d751ca
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 19 deletions.
3 changes: 2 additions & 1 deletion graph/src/data/subgraph/schema.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Entity types that contain the graph-node state.
use anyhow::{anyhow, bail, Error};
use chrono::{DateTime, Utc};
use hex;
use rand::rngs::OsRng;
use rand::Rng;
Expand Down Expand Up @@ -159,7 +160,7 @@ pub struct SubgraphDeploymentEntity {
pub manifest: SubgraphManifestEntity,
pub failed: bool,
pub health: SubgraphHealth,
pub synced: bool,
pub synced_at: Option<DateTime<Utc>>,
pub fatal_error: Option<SubgraphError>,
pub non_fatal_errors: Vec<SubgraphError>,
/// The earliest block for which we have data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
DROP VIEW info.subgraph_info;

ALTER TABLE subgraphs.subgraph_deployment ADD COLUMN synced BOOLEAN NOT NULL DEFAULT false;
ALTER TABLE unused_deployments ADD COLUMN synced BOOLEAN NOT NULL DEFAULT false;

UPDATE subgraphs.subgraph_deployment SET synced = synced_at IS NOT NULL;
UPDATE unused_deployments SET synced = synced_at IS NOT NULL;

-- NB: We keep the default on unused_deployment, as it was there before.
ALTER TABLE subgraphs.subgraph_deployment ALTER COLUMN synced DROP DEFAULT;

ALTER TABLE subgraphs.subgraph_deployment DROP COLUMN synced_at;
ALTER TABLE unused_deployments DROP COLUMN synced_at;

CREATE VIEW info.subgraph_info AS
SELECT ds.id AS schema_id,
ds.name AS schema_name,
ds.subgraph,
ds.version,
s.name,
CASE
WHEN s.pending_version = v.id THEN 'pending'::text
WHEN s.current_version = v.id THEN 'current'::text
ELSE 'unused'::text
END AS status,
d.failed,
d.synced
FROM deployment_schemas ds,
subgraphs.subgraph_deployment d,
subgraphs.subgraph_version v,
subgraphs.subgraph s
WHERE d.deployment = ds.subgraph::text AND v.deployment = d.deployment AND v.subgraph = s.id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
DROP VIEW info.subgraph_info;

ALTER TABLE subgraphs.subgraph_deployment ADD COLUMN synced_at TIMESTAMPTZ;
ALTER TABLE unused_deployments ADD COLUMN synced_at TIMESTAMPTZ;

UPDATE subgraphs.subgraph_deployment SET synced_at = '1970-01-01 00:00:00 UTC' WHERE synced;
UPDATE unused_deployments SET synced_at = '1970-01-01 00:00:00 UTC' WHERE synced;

ALTER TABLE subgraphs.subgraph_deployment DROP COLUMN synced;
ALTER TABLE unused_deployments DROP COLUMN synced;

CREATE VIEW info.subgraph_info AS
SELECT ds.id AS schema_id,
ds.name AS schema_name,
ds.subgraph,
ds.version,
s.name,
CASE
WHEN s.pending_version = v.id THEN 'pending'::text
WHEN s.current_version = v.id THEN 'current'::text
ELSE 'unused'::text
END AS status,
d.failed,
d.synced_at
FROM deployment_schemas ds,
subgraphs.subgraph_deployment d,
subgraphs.subgraph_version v,
subgraphs.subgraph s
WHERE d.deployment = ds.subgraph::text AND v.deployment = d.deployment AND v.subgraph = s.id;
11 changes: 5 additions & 6 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{advisory_lock, detail::GraphNodeVersion, primary::DeploymentId};
use diesel::{
connection::SimpleConnection,
dsl::{count, delete, insert_into, select, sql, update},
dsl::{count, delete, insert_into, now, select, sql, update},
sql_types::{Bool, Integer},
};
use diesel::{expression::SqlLiteral, pg::PgConnection, sql_types::Numeric};
Expand Down Expand Up @@ -132,7 +132,7 @@ table! {
deployment -> Text,
failed -> Bool,
health -> crate::deployment::SubgraphHealthMapping,
synced -> Bool,
synced_at -> Nullable<Timestamptz>,
fatal_error -> Nullable<Text>,
non_fatal_errors -> Array<Text>,
earliest_block_number -> Integer,
Expand Down Expand Up @@ -737,9 +737,9 @@ pub fn set_synced(conn: &mut PgConnection, id: &DeploymentHash) -> Result<(), St
update(
d::table
.filter(d::deployment.eq(id.as_str()))
.filter(d::synced.eq(false)),
.filter(d::synced_at.is_null()),
)
.set(d::synced.eq(true))
.set(d::synced_at.eq(now))
.execute(conn)?;
Ok(())
}
Expand All @@ -762,7 +762,7 @@ pub fn exists_and_synced(conn: &mut PgConnection, id: &str) -> Result<bool, Stor

let synced = d::table
.filter(d::deployment.eq(id))
.select(d::synced)
.select(d::synced_at.is_not_null())
.first(conn)
.optional()?
.unwrap_or(false);
Expand Down Expand Up @@ -1142,7 +1142,6 @@ pub fn create_deployment(
d::id.eq(site.id),
d::deployment.eq(site.deployment.as_str()),
d::failed.eq(false),
d::synced.eq(false),
d::health.eq(SubgraphHealth::Healthy),
d::fatal_error.eq::<Option<String>>(None),
d::non_fatal_errors.eq::<Vec<String>>(vec![]),
Expand Down
13 changes: 8 additions & 5 deletions store/postgres/src/detail.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use git_testament::{git_testament, git_testament_macros};
use graph::blockchain::BlockHash;
use graph::data::store::scalar::ToPrimitive;
use graph::data::subgraph::schema::{SubgraphError, SubgraphManifestEntity};
use graph::prelude::{BigDecimal, BlockPtr, DeploymentHash, StoreError, SubgraphDeploymentEntity};
use graph::prelude::{
chrono::{DateTime, Utc},
BigDecimal, BlockPtr, DeploymentHash, StoreError, SubgraphDeploymentEntity,
};
use graph::schema::InputSchema;
use graph::{constraint_violation, data::subgraph::status, prelude::web3::types::H256};
use itertools::Itertools;
Expand Down Expand Up @@ -46,7 +49,7 @@ pub struct DeploymentDetail {
pub deployment: String,
pub failed: bool,
health: HealthType,
pub synced: bool,
pub synced_at: Option<DateTime<Utc>>,
fatal_error: Option<String>,
non_fatal_errors: Vec<String>,
/// The earliest block for which we have history
Expand Down Expand Up @@ -188,7 +191,7 @@ pub(crate) fn info_from_details(
deployment,
failed: _,
health,
synced,
synced_at,
fatal_error: _,
non_fatal_errors: _,
earliest_block_number,
Expand Down Expand Up @@ -238,7 +241,7 @@ pub(crate) fn info_from_details(
Ok(status::Info {
id: id.into(),
subgraph: deployment,
synced,
synced: synced_at.is_some(),
health,
paused: None,
fatal_error,
Expand Down Expand Up @@ -446,7 +449,7 @@ impl StoredDeploymentEntity {
manifest: manifest.as_manifest(schema),
failed: detail.failed,
health: detail.health.into(),
synced: detail.synced,
synced_at: detail.synced_at,
fatal_error: None,
non_fatal_errors: vec![],
earliest_block_number: detail.earliest_block_number,
Expand Down
18 changes: 11 additions & 7 deletions store/postgres/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ use diesel::{
use graph::{
components::store::DeploymentLocator,
constraint_violation,
data::store::scalar::ToPrimitive,
data::subgraph::{status, DeploymentFeatures},
data::{
store::scalar::ToPrimitive,
subgraph::{status, DeploymentFeatures},
},
prelude::{
anyhow, serde_json, DeploymentHash, EntityChange, EntityChangeOperation, NodeId,
StoreError, SubgraphName, SubgraphVersionSwitchingMode,
anyhow,
chrono::{DateTime, Utc},
serde_json, DeploymentHash, EntityChange, EntityChangeOperation, NodeId, StoreError,
SubgraphName, SubgraphVersionSwitchingMode,
},
};
use graph::{
Expand Down Expand Up @@ -175,7 +179,7 @@ table! {
latest_ethereum_block_hash -> Nullable<Binary>,
latest_ethereum_block_number -> Nullable<Integer>,
failed -> Bool,
synced -> Bool,
synced_at -> Nullable<Timestamptz>,
}
}

Expand Down Expand Up @@ -228,7 +232,7 @@ pub struct UnusedDeployment {
pub latest_ethereum_block_hash: Option<Vec<u8>>,
pub latest_ethereum_block_number: Option<i32>,
pub failed: bool,
pub synced: bool,
pub synced_at: Option<DateTime<Utc>>,
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, AsExpression, FromSqlRow)]
Expand Down Expand Up @@ -1676,7 +1680,7 @@ impl<'a> Connection<'a> {
u::latest_ethereum_block_hash.eq(latest_hash),
u::latest_ethereum_block_number.eq(latest_number),
u::failed.eq(detail.failed),
u::synced.eq(detail.synced),
u::synced_at.eq(detail.synced_at),
))
.execute(self.conn.as_mut())?;
}
Expand Down

0 comments on commit 2d751ca

Please sign in to comment.