Skip to content

Commit

Permalink
store: Fix ordering issues with different id types
Browse files Browse the repository at this point in the history
  • Loading branch information
incrypto32 committed Jan 9, 2025
1 parent 1ff44c6 commit ad34735
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 33 deletions.
9 changes: 8 additions & 1 deletion store/postgres/src/relational.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,13 @@ impl Layout {
Ok((ewt, block))
};

fn compare_entity_data_ext(a: &EntityDataExt, b: &EntityDataExt) -> std::cmp::Ordering {
a.block_number
.cmp(&b.block_number)
.then_with(|| a.entity.cmp(&b.entity))
.then_with(|| a.id.cmp(&b.id))
}

// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
Expand All @@ -589,7 +596,7 @@ impl Layout {
while lower_now.is_some() || upper_now.is_some() {
let (ewt, block) = match (lower_now, upper_now) {
(Some(lower), Some(upper)) => {
match lower.cmp(&upper) {
match compare_entity_data_ext(lower, upper) {
std::cmp::Ordering::Greater => {
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
let (ewt, block) = transform(upper, EntityOperationKind::Delete)?;
Expand Down
35 changes: 3 additions & 32 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use graph::schema::{EntityKey, EntityType, FulltextAlgorithm, FulltextConfig, In
use graph::{components::store::AttributeNames, data::store::scalar};
use inflector::Inflector;
use itertools::Itertools;
use std::cmp::Ordering;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::convert::TryFrom;
use std::fmt::{self, Display};
Expand Down Expand Up @@ -558,48 +557,20 @@ impl EntityData {
}
}

#[derive(QueryableByName, Clone, Debug, Default, Eq)]
#[derive(QueryableByName, Clone, Debug, Default)]
pub struct EntityDataExt {
#[diesel(sql_type = Text)]
pub entity: String,
#[diesel(sql_type = Jsonb)]
pub data: serde_json::Value,
#[diesel(sql_type = Integer)]
pub block_number: i32,
#[diesel(sql_type = Text)]
pub id: String,
#[diesel(sql_type = Binary)]
pub id: Vec<u8>,
#[diesel(sql_type = BigInt)]
pub vid: i64,
}

impl Ord for EntityDataExt {
fn cmp(&self, other: &Self) -> Ordering {
let ord = self.block_number.cmp(&other.block_number);
if ord != Ordering::Equal {
ord
} else {
let ord = self.entity.cmp(&other.entity);
if ord != Ordering::Equal {
ord
} else {
self.id.cmp(&other.id)
}
}
}
}

impl PartialOrd for EntityDataExt {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl PartialEq for EntityDataExt {
fn eq(&self, other: &Self) -> bool {
self.cmp(other) == Ordering::Equal
}
}

/// The equivalent of `graph::data::store::Value` but in a form that does
/// not require further transformation during `walk_ast`. This form takes
/// the idiosyncrasies of how we serialize values into account (e.g., that
Expand Down
94 changes: 94 additions & 0 deletions store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,32 @@ const SCHEMA_GQL: &str = "
id: ID!,
count: Int!,
}
type BytesId @entity {
id: Bytes!,
value: String!
}
type Int8Id @entity {
id: Int8!,
value: String!
}
type StringId @entity {
id: String!,
value: String!
}
type PoolCreated @entity(immutable: true) {
id: Bytes!,
token0: Bytes!,
token1: Bytes!,
fee: Int!,
tickSpacing: Int!,
pool: Bytes!,
blockNumber: BigInt!,
blockTimestamp: BigInt!,
transactionHash: Bytes!,
transactionFrom: Bytes!,
transactionGasPrice: BigInt!,
logIndex: BigInt!
}
";

const COUNTER: &str = "Counter";
Expand Down Expand Up @@ -406,3 +432,71 @@ fn read_immutable_only_range_test() {
assert_eq!(e.len(), 4);
})
}

#[test]
fn read_range_pool_created_test() {
run_test(|store, writable, sourceable, deployment| async move {
let result_entities = vec![
format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000) }}, vid: 1 }}])"),
format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001) }}, vid: 2 }}])"),
];

// Rest of the test remains the same
let subgraph_store = store.subgraph_store();
writable.deployment_synced().unwrap();

let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap();
let entity_types = vec![pool_created_type.clone()];

for count in (1..=2).map(|x| x as i64) {
let id = if count == 1 {
"0xff80818283848586"
} else {
"0xff90919293949596"
};

let data = entity! { TEST_SUBGRAPH_SCHEMA =>
id: id,
token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
fee: if count == 1 { 500 } else { 3000 },
tickSpacing: if count == 1 { 10 } else { 60 },
pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" },
blockNumber: 12369621 + count - 1,
blockTimestamp: 1620243254 + count - 1,
transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }),
transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
transactionGasPrice: 100000000000i64,
logIndex: count - 1
};

let key = pool_created_type.parse_key(id).unwrap();
let op = EntityOperation::Set {
key: key.clone(),
data: EntityV::new(data, count),
};

transact_entity_operations(
&subgraph_store,
&deployment,
block_pointer(count as u8),
vec![op],
)
.await
.unwrap();
}
writable.flush().await.unwrap();
writable.deployment_synced().unwrap();

let br: Range<BlockNumber> = 0..18;
let e: BTreeMap<i32, Vec<EntitySourceOperation>> = sourceable
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
.unwrap();
assert_eq!(e.len(), 2);
for en in &e {
let index = *en.0 - 1;
let a = result_entities[index as usize].clone();
assert_eq!(a, format!("{:?}", en));
}
})
}

0 comments on commit ad34735

Please sign in to comment.