From d3ec6aaba74b78e0f3cb5b128b9c3885f34e145a Mon Sep 17 00:00:00 2001 From: Chewing Glass Date: Thu, 9 Nov 2023 10:07:48 -0600 Subject: [PATCH] bug(#477): Ensure refreshed at is not changed on rent collection, wait for invalidations --- .../src/utils/handleAccountWebhook.ts | 22 ++- .../account-postgres-sink-service/vehnt.sql | 2 +- packages/entity-invalidator/src/index.ts | 179 +++++++++++++----- 3 files changed, 144 insertions(+), 59 deletions(-) diff --git a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts index 5cfa41e2e..0c173e1da 100644 --- a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts +++ b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts @@ -75,14 +75,20 @@ export function handleAccountWebhook({ } } const model = sequelize.models[accName]; - await model.upsert( - { - address: account.pubkey, - refreshed_at: now, - ...sanitized, - }, - { transaction: t } - ); + const value = await model.findByPk(account.pubkey); + const changed = + !value || + Object.entries(sanitized).some(([k, v]) => v !== value.dataValues[k]); + if (changed) { + await model.upsert( + { + address: account.pubkey, + refreshed_at: now, + ...sanitized, + }, + { transaction: t } + ); + } } await t.commit(); diff --git a/packages/account-postgres-sink-service/vehnt.sql b/packages/account-postgres-sink-service/vehnt.sql index 683d0cc26..66a4d56ca 100644 --- a/packages/account-postgres-sink-service/vehnt.sql +++ b/packages/account-postgres-sink-service/vehnt.sql @@ -117,4 +117,4 @@ SELECT approx_fall_rate * 1000000000000 as approx_fall_rate, approx_fall_rate - real_fall_rate as fall_rate_diff, approx_ve_tokens - real_ve_tokens as ve_tokens_diff -FROM subdao_delegations; \ No newline at end of file +FROM subdao_delegations; diff --git a/packages/entity-invalidator/src/index.ts b/packages/entity-invalidator/src/index.ts index d118c16b9..9d7cbc511 100644 --- a/packages/entity-invalidator/src/index.ts +++ b/packages/entity-invalidator/src/index.ts @@ -1,10 +1,21 @@ import { decodeEntityKey } from "@helium/helium-entity-manager-sdk"; import AWS from "aws-sdk"; import { Op } from "sequelize"; -import { v4 as uuidv4 } from 'uuid'; +import { v4 as uuidv4 } from "uuid"; import { IotHotspotInfo, KeyToAsset, MobileHotspotInfo } from "./model"; // @ts-ignore -import { AWS_REGION, CLOUDFRONT_DISTRIBUTION, LOOKBACK_HOURS, DOMAIN } from "./env"; +import { + AWS_REGION, + CLOUDFRONT_DISTRIBUTION, + LOOKBACK_HOURS, + DOMAIN, +} from "./env"; +import { chunks } from "@helium/spl-utils"; + +// How long to wait to check invalidation status again +const INVALIDATION_WAIT = 10000; +// 30 minutes +const INVALIDATION_WAIT_LIMIT = 30 * 60 * 1000; async function run() { const date = new Date(); @@ -12,27 +23,31 @@ async function run() { AWS.config.update({ region: AWS_REGION }); const cloudfront = new AWS.CloudFront(); - // Invalidate metadata-service routes: + // Invalidate metadata-service routes: // - /v2/hotspots/pagination-metadata?subnetwork=iot // - /v2/hotspots/pagination-metadata?subnetwork=mobile // Or /v2/hotspot* if there is an error try { const modelMap: any = { - 'iot': IotHotspotInfo, - 'mobile': MobileHotspotInfo, - } - const subnetworks = ['iot', 'mobile']; + iot: IotHotspotInfo, + mobile: MobileHotspotInfo, + }; + const subnetworks = ["iot", "mobile"]; // Fetch pagination data const responsePromises = subnetworks.map((subnetwork) => { - return fetch(`https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}`); + return fetch( + `https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}` + ); }); const jsonPromises = await Promise.all(responsePromises); - const paginationMetadata = await Promise.all(jsonPromises.map((response) => response.json())); + const paginationMetadata = await Promise.all( + jsonPromises.map((response) => response.json()) + ); console.log("Fetched pagination metadata for subnetworks"); console.log(paginationMetadata); - // Fetch counts of newly added hotspots + // Fetch counts of newly added hotspots const totalCountPromises = subnetworks.map((subnetwork) => { const whereClause = { created_at: { @@ -54,7 +69,7 @@ async function run() { console.log("Fetched counts of newly added hotspots"); console.log(totalCounts); - // Prepare invalidation paths + // Prepare invalidation paths const paths: string[] = []; totalCounts.forEach((count, i) => { const subnetwork = subnetworks[i]; @@ -72,39 +87,33 @@ async function run() { console.log("Invalidation paths"); console.log(paths); - await cloudfront - .createInvalidation({ - DistributionId: CLOUDFRONT_DISTRIBUTION, - InvalidationBatch: { - CallerReference: `${uuidv4()}`, - Paths: { - Quantity: paths.length, - Items: paths, - }, - }, - }) - .promise(); + await invalidateAndWait({ + cloudfront, + DistributionId: CLOUDFRONT_DISTRIBUTION, + Paths: { + Quantity: paths.length, + Items: paths, + }, + }); } catch (err) { - console.error("Granular /v2/hotspots invalidation failed, resorting to full invalidation"); + console.error( + "Granular /v2/hotspots invalidation failed, resorting to full invalidation" + ); console.error(err); - await cloudfront - .createInvalidation({ - DistributionId: CLOUDFRONT_DISTRIBUTION, - InvalidationBatch: { - CallerReference: `${uuidv4()}`, - Paths: { - Quantity: 1, - Items: ["/v2/hotspots*"], - }, - }, - }) - .promise(); + await invalidateAndWait({ + cloudfront, + DistributionId: CLOUDFRONT_DISTRIBUTION, + Paths: { + Quantity: 1, + Items: ["/v2/hotspots*"], + }, + }); } // Invalidate metadata-service routes: // - /v2/hotspot/:keyToAssetKey - // - /v1/:keyToAssetKey + // - /v1/:keyToAssetKey // - /:eccCompact const limit = 10000; let lastId = null; @@ -145,6 +154,7 @@ async function run() { }); console.log(`Found ${totalCount} updated records`); let totalProgress = 0; + const paths: string[] = []; do { if (lastId) { @@ -177,25 +187,94 @@ async function run() { }); }); - const paths = entities.flatMap((entity) => getPaths(entity)); - await cloudfront - .createInvalidation({ - DistributionId: CLOUDFRONT_DISTRIBUTION, - InvalidationBatch: { - CallerReference: `${uuidv4()}`, // unique identifier for this invalidation batch - Paths: { - Quantity: paths.length, - Items: paths, - }, - }, - }) - .promise(); + paths.push(...entities.flatMap((entity) => getPaths(entity))); lastId = entities[entities.length - 1].address; totalProgress += entities.length; console.log(`Processed ${totalProgress} / ${totalCount}`); } } while (entities.length === limit); + + // Split the paths into batches of 3000 + const batches = chunks(paths, 3000) + + // Process each batch of invalidations + let i = 0; + for (const batch of batches) { + await invalidateAndWait({ + cloudfront, + DistributionId: CLOUDFRONT_DISTRIBUTION, + Paths: { + Quantity: batch.length, + Items: batch, + }, + }) + + console.log(`Invalidated ${i} / ${batches.length} batches`); + i++ + } +} + +function delay(ms) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function invalidateAndWait({ + cloudfront, + DistributionId, + Paths, +}: { + cloudfront: AWS.CloudFront, + DistributionId: string, + Paths: { + Quantity: number; + Items: string[] + } +}) { + const invalidationResponse = await cloudfront + .createInvalidation({ + DistributionId, + InvalidationBatch: { + CallerReference: `${uuidv4()}`, + Paths, + }, + }) + .promise(); + + if (invalidationResponse?.Invalidation) { + const invalidationId = invalidationResponse.Invalidation.Id; + + // Check the status of the invalidation batch periodically + let invalidationStatus = await getInvalidationStatus( + cloudfront, + invalidationId + ); + let totalWait = 0; + while (invalidationStatus !== "Completed") { + console.log("Invalidation in progress. Waiting for completion..."); + totalWait += INVALIDATION_WAIT + await delay(INVALIDATION_WAIT); // Wait for 10 seconds before checking the status again + invalidationStatus = await getInvalidationStatus( + cloudfront, + invalidationId + ); + + if (totalWait > INVALIDATION_WAIT_LIMIT) { + throw new Error("Exceeded invalidation wait limit") + } + } + } +} + +async function getInvalidationStatus(cloudfront: AWS.CloudFront, invalidationId: string) { + const invalidationResponse = await cloudfront + .getInvalidation({ + DistributionId: CLOUDFRONT_DISTRIBUTION, + Id: invalidationId, + }) + .promise(); + + return invalidationResponse?.Invalidation?.Status; } function getPaths(entity: KeyToAsset): string[] {