Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug(#477): Ensure refreshed at is not changed on rent collection, wai… #478

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,20 @@ export function handleAccountWebhook({
}
}
const model = sequelize.models[accName];
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
const value = await model.findByPk(account.pubkey);
const changed =
!value ||
Object.entries(sanitized).some(([k, v]) => v !== value.dataValues[k]);
if (changed) {
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
}
}

await t.commit();
Expand Down
2 changes: 1 addition & 1 deletion packages/account-postgres-sink-service/vehnt.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,4 @@ SELECT
approx_fall_rate * 1000000000000 as approx_fall_rate,
approx_fall_rate - real_fall_rate as fall_rate_diff,
approx_ve_tokens - real_ve_tokens as ve_tokens_diff
FROM subdao_delegations;
FROM subdao_delegations;
179 changes: 129 additions & 50 deletions packages/entity-invalidator/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
import { decodeEntityKey } from "@helium/helium-entity-manager-sdk";
import AWS from "aws-sdk";
import { Op } from "sequelize";
import { v4 as uuidv4 } from 'uuid';
import { v4 as uuidv4 } from "uuid";
import { IotHotspotInfo, KeyToAsset, MobileHotspotInfo } from "./model";
// @ts-ignore
import { AWS_REGION, CLOUDFRONT_DISTRIBUTION, LOOKBACK_HOURS, DOMAIN } from "./env";
import {
AWS_REGION,
CLOUDFRONT_DISTRIBUTION,
LOOKBACK_HOURS,
DOMAIN,
} from "./env";
import { chunks } from "@helium/spl-utils";

// How long to wait to check invalidation status again
const INVALIDATION_WAIT = 10000;
// 30 minutes
const INVALIDATION_WAIT_LIMIT = 30 * 60 * 1000;

async function run() {
const date = new Date();
date.setHours(date.getHours() - LOOKBACK_HOURS);
AWS.config.update({ region: AWS_REGION });
const cloudfront = new AWS.CloudFront();

// Invalidate metadata-service routes:
// Invalidate metadata-service routes:
// - /v2/hotspots/pagination-metadata?subnetwork=iot
// - /v2/hotspots/pagination-metadata?subnetwork=mobile
// Or /v2/hotspot* if there is an error
try {
const modelMap: any = {
'iot': IotHotspotInfo,
'mobile': MobileHotspotInfo,
}
const subnetworks = ['iot', 'mobile'];
iot: IotHotspotInfo,
mobile: MobileHotspotInfo,
};
const subnetworks = ["iot", "mobile"];

// Fetch pagination data
const responsePromises = subnetworks.map((subnetwork) => {
return fetch(`https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}`);
return fetch(
`https://${DOMAIN}/v2/hotspots/pagination-metadata?subnetwork=${subnetwork}`
);
});
const jsonPromises = await Promise.all(responsePromises);
const paginationMetadata = await Promise.all(jsonPromises.map((response) => response.json()));
const paginationMetadata = await Promise.all(
jsonPromises.map((response) => response.json())
);
console.log("Fetched pagination metadata for subnetworks");
console.log(paginationMetadata);

// Fetch counts of newly added hotspots
// Fetch counts of newly added hotspots
const totalCountPromises = subnetworks.map((subnetwork) => {
const whereClause = {
created_at: {
Expand All @@ -54,7 +69,7 @@ async function run() {
console.log("Fetched counts of newly added hotspots");
console.log(totalCounts);

// Prepare invalidation paths
// Prepare invalidation paths
const paths: string[] = [];
totalCounts.forEach((count, i) => {
const subnetwork = subnetworks[i];
Expand All @@ -72,39 +87,33 @@ async function run() {
console.log("Invalidation paths");
console.log(paths);

await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths: {
Quantity: paths.length,
Items: paths,
},
},
})
.promise();
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: paths.length,
Items: paths,
},
});
} catch (err) {
console.error("Granular /v2/hotspots invalidation failed, resorting to full invalidation");
console.error(
"Granular /v2/hotspots invalidation failed, resorting to full invalidation"
);
console.error(err);

await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths: {
Quantity: 1,
Items: ["/v2/hotspots*"],
},
},
})
.promise();
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: 1,
Items: ["/v2/hotspots*"],
},
});
}

// Invalidate metadata-service routes:
// - /v2/hotspot/:keyToAssetKey
// - /v1/:keyToAssetKey
// - /v1/:keyToAssetKey
// - /:eccCompact
const limit = 10000;
let lastId = null;
Expand Down Expand Up @@ -145,6 +154,7 @@ async function run() {
});
console.log(`Found ${totalCount} updated records`);
let totalProgress = 0;
const paths: string[] = [];

do {
if (lastId) {
Expand Down Expand Up @@ -177,25 +187,94 @@ async function run() {
});
});

const paths = entities.flatMap((entity) => getPaths(entity));
await cloudfront
.createInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
InvalidationBatch: {
CallerReference: `${uuidv4()}`, // unique identifier for this invalidation batch
Paths: {
Quantity: paths.length,
Items: paths,
},
},
})
.promise();
paths.push(...entities.flatMap((entity) => getPaths(entity)));

lastId = entities[entities.length - 1].address;
totalProgress += entities.length;
console.log(`Processed ${totalProgress} / ${totalCount}`);
}
} while (entities.length === limit);

// Split the paths into batches of 3000
const batches = chunks(paths, 3000)

// Process each batch of invalidations
let i = 0;
for (const batch of batches) {
await invalidateAndWait({
cloudfront,
DistributionId: CLOUDFRONT_DISTRIBUTION,
Paths: {
Quantity: batch.length,
Items: batch,
},
})

console.log(`Invalidated ${i} / ${batches.length} batches`);
i++
}
}

function delay(ms) {
return new Promise((resolve) => setTimeout(resolve, ms));
}

async function invalidateAndWait({
cloudfront,
DistributionId,
Paths,
}: {
cloudfront: AWS.CloudFront,
DistributionId: string,
Paths: {
Quantity: number;
Items: string[]
}
}) {
const invalidationResponse = await cloudfront
.createInvalidation({
DistributionId,
InvalidationBatch: {
CallerReference: `${uuidv4()}`,
Paths,
},
})
.promise();

if (invalidationResponse?.Invalidation) {
const invalidationId = invalidationResponse.Invalidation.Id;

// Check the status of the invalidation batch periodically
let invalidationStatus = await getInvalidationStatus(
cloudfront,
invalidationId
);
let totalWait = 0;
while (invalidationStatus !== "Completed") {
console.log("Invalidation in progress. Waiting for completion...");
totalWait += INVALIDATION_WAIT
await delay(INVALIDATION_WAIT); // Wait for 10 seconds before checking the status again
invalidationStatus = await getInvalidationStatus(
cloudfront,
invalidationId
);

if (totalWait > INVALIDATION_WAIT_LIMIT) {
throw new Error("Exceeded invalidation wait limit")
}
}
}
}

async function getInvalidationStatus(cloudfront: AWS.CloudFront, invalidationId: string) {
const invalidationResponse = await cloudfront
.getInvalidation({
DistributionId: CLOUDFRONT_DISTRIBUTION,
Id: invalidationId,
})
.promise();

return invalidationResponse?.Invalidation?.Status;
}

function getPaths(entity: KeyToAsset): string[] {
Expand Down