Skip to content
This repository has been archived by the owner on Nov 9, 2022. It is now read-only.

Commit

Permalink
Merge pull request #27 from ArweaveTeam/v0.4.0
Browse files Browse the repository at this point in the history
v0.4.0
  • Loading branch information
TheLoneRonin authored Feb 26, 2021
2 parents 4cf15f5 + c440263 commit f292a43
Show file tree
Hide file tree
Showing 15 changed files with 268 additions and 40 deletions.
3 changes: 2 additions & 1 deletion .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"quotes": [2, "single", { "avoidEscape": true }],
"require-jsdoc": 0,
"max-len": 0,
"camelcase": 0
"camelcase": 0,
"guard-for-in": 0
}
}
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ cache
snapshot
.snapshot
.import
*.temp

**/*.log
**/.DS_Store
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ There are several million transactions on the Arweave chain. In order to effecti

1. 16GB RAM (ideally 32GB RAM)

2. ~100GB of SSD storage available
2. ~1TB of SSD storage available

3. Intel i5 / AMD FX or greater, +4 vCPUs should be more than enough, these are typically Intel Xeon CPUs.

Expand All @@ -26,6 +26,8 @@ This guide is designed to use Docker Compose. There is also the development vers

Also make sure to read the [Snapshot Guide](./SNAPSHOT.md) to expedite the synchronization process for your Gateway.

Furthermore, to ensure redundancy. Make sure to check out the [Rescan Guide](./RESCAN.md).

## Environment

By default, there is a default environment you can use located at `.env.docker` in the repository.
Expand Down
18 changes: 18 additions & 0 deletions RESCAN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Rescan Guide

While syncing, sometimes transactions may not be returned by Arweave Nodes. You can recover these transactions by running a rescan.
If you received an error while retrieving a transaction while running a node. You can recover it by running.

```bash
yarn rescan:cache
```

## Rescanning Snapshots

If you are recovering from an imported snapshot. It should list missing transactions in the Snapshot under `snapshot/.rescan`.

You can recover snapshot transactions by running.

```bash
yarn rescan:snapshot
```
3 changes: 2 additions & 1 deletion SNAPSHOT.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,6 @@ info: [snapshot] successfully imported tags.csv
Make sure when running an actual Gateway you copy the `.snapshot` file from the `snapshot` folder into the root directory.

```bash
cp snapshot/.snapshot .snapshot
mkdir cache
cp snapshot/.snapshot cache/.snapshot
```
14 changes: 10 additions & 4 deletions migrations/20200404025828_initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ export async function up(knex: Knex) {
for (let i = 0; i < indices.length; i++) {
const index = indices[i];
table.string(index, 64);
table.index(index, `index_${index}_transactions`, 'BTREE');
table.index(index, `index_${index}_transactions`, 'HASH');
}

table.primary(['id'], 'pkey_transactions');
table.index(['height'], 'transactions_height', 'HASH');
table.index(['owner_address'], 'transactions_owner_address', 'HASH');
table.index(['target'], 'transactions_target', 'HASH');
})
.createTable('blocks', (table) => {
table.string('id', 64).notNullable();
Expand All @@ -44,16 +47,19 @@ export async function up(knex: Knex) {
table.timestamp('created_at').defaultTo(knex.fn.now());

table.primary(['id'], 'pkey_blocks');
table.index(['height'], 'blocks_height', 'HASH');
})
.createTable('tags', (table) => {
table.string('tx_id', 64).notNullable();
table.integer('index').notNullable();
table.string('name');
table.text('value');
table.string('name', 2048);
table.string('value', 2048);
table.timestamp('created_at').defaultTo(knex.fn.now());

table.primary(['tx_id', 'index'], 'pkey_tags');
table.index(['name', 'value'], 'index_name_value', 'BTREE');
table.index(['tx_id', 'name'], 'tags_tx_id_name', 'BTREE');
table.index(['name'], 'tags_name', 'HASH');
table.index(['name', 'value'], 'tags_name_value', 'BTREE');
});
}

Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@arweave/gateway",
"version": "0.3.1",
"version": "0.4.0",
"main": "dist/src/Gateway.js",
"repository": "[email protected]:ArweaveTeam/gateway.git",
"author": "Arweave <[email protected]>",
Expand All @@ -13,6 +13,9 @@
"dev:restart": "npm run migrate:down && npm run migrate:latest && npm run dev:start",
"dev:snapshot": "npm run dev:build && node dist/src/Snapshot.js",
"dev:import": "npm run dev:build && node dist/src/Import.js",
"rescan:cache": "npm run dev:build && node dist/src/Rescan.cache.js",
"rescan:snapshot": "npm run dev:build && node dist/src/Rescan.snapshot.js",
"rescan:temp": "npm run dev:build && node dist/src/Rescan.temp.js",
"migrate:down": "knex migrate:down",
"migrate:up": "knex migrate:up",
"migrate:latest": "knex migrate:latest",
Expand All @@ -29,6 +32,7 @@
"body-parser": "^1.19.0",
"colors": "^1.4.0",
"dotenv": "^8.2.0",
"event-stream": "^4.0.1",
"express": "^4.17.1",
"graphql": "^15.5.0",
"graphql-fields": "^2.0.3",
Expand Down
3 changes: 3 additions & 0 deletions src/Rescan.cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {startRescan} from './database/rescan.database';

(async () => await startRescan('cache/.rescan'))();
3 changes: 3 additions & 0 deletions src/Rescan.snapshot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {startRescan} from './database/rescan.database';

(async () => await startRescan('snapshot/.rescan'))();
3 changes: 3 additions & 0 deletions src/Rescan.temp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import {startRescan} from './database/rescan.database';

(async () => await startRescan('.rescan.temp'))();
42 changes: 28 additions & 14 deletions src/Snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {ansBundles} from './utility/ans.utility';
import {mkdir} from './utility/file.utility';
import {log} from './utility/log.utility';
import {sleep} from './utility/sleep.utility';
import {getNodeInfo, getData} from './query/node.query';
import {getNodeInfo, getDataFromChunks} from './query/node.query';
import {block} from './query/block.query';
import {transaction, tagValue, Tag} from './query/transaction.query';
import {formatBlock} from './database/block.database';
Expand All @@ -28,6 +28,7 @@ export const streams = {
block: createWriteStream('snapshot/block.csv', {flags: 'a'}),
transaction: createWriteStream('snapshot/transaction.csv', {flags: 'a'}),
tags: createWriteStream('snapshot/tags.csv', {flags: 'a'}),
rescan: createWriteStream('snapshot/.rescan', {flags: 'a'}),
};

export function configureSnapshotBar(start: number, end: number) {
Expand All @@ -42,9 +43,9 @@ export function configureSnapshotBar(start: number, end: number) {
}

export async function snapshot() {
if (existsSync('.snapshot')) {
if (existsSync('snapshot/.snapshot')) {
log.info('[snapshot] existing snapshot state found');
const snapshotState = parseInt(readFileSync('.snapshot').toString());
const snapshotState = parseInt(readFileSync('snapshot/.snapshot').toString());

if (!isNaN(snapshotState)) {
const nodeInfo = await getNodeInfo();
Expand Down Expand Up @@ -133,7 +134,7 @@ export async function storeTransactions(txs: Array<string>, height: number) {
await Promise.all(batch);
}

export async function storeTransaction(tx: string, height: number) {
export async function storeTransaction(tx: string, height: number, retry: boolean = true) {
try {
const currentTransaction = await transaction(tx);
const ft = formatTransaction(currentTransaction);
Expand All @@ -153,19 +154,32 @@ export async function storeTransaction(tx: string, height: number) {
const ans102 = tagValue(preservedTags, 'Bundle-Type') === 'ANS-102';

if (ans102) {
try {
const ansPayload = await getData(ft.id);
const ansTxs = await ansBundles.unbundleData(ansPayload);

await processANSTransaction(ansTxs);
} catch (error) {
console.log('');
log.info(`[snapshot] malformed ANS payload at height ${height} for tx ${ft.id}`);
}
await processAns(ft.id, height);
}
} catch (error) {
console.log('');
log.info(`[snapshot] could not retrieve tx ${tx} at height ${height}`);
log.info(`[snapshot] could not retrieve tx ${tx} at height ${height} ${retry ? ', attempting to retrieve again' : ', missing tx stored in .rescan'}`);
if (retry) {
await storeTransaction(tx, height, false);
} else {
streams.rescan.write(`${tx},${height},normal\n`);
}
}
}

export async function processAns(id: string, height: number, retry: boolean = true) {
try {
const ansPayload = await getDataFromChunks(id);
const ansTxs = await ansBundles.unbundleData(ansPayload);

await processANSTransaction(ansTxs);
} catch (error) {
if (retry) {
await processAns(id, height, false);
} else {
log.info(`[database] malformed ANS payload at height ${height} for tx ${id}`);
streams.rescan.write(`${id},${height},ans\n`);
}
}
}

Expand Down
80 changes: 80 additions & 0 deletions src/database/rescan.database.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import ProgressBar from 'progress';
import {existsSync, createReadStream, createWriteStream, readFileSync, writeFileSync, unlinkSync} from 'fs';
import {split, mapSync} from 'event-stream';
import {config} from 'dotenv';
import {log} from '../utility/log.utility';
import {mkdir} from '../utility/file.utility';
import {importTransactions, importTags} from '../database/import.database';
import {storeTransaction, processAns, streams} from '../database/sync.database';

config();
mkdir('snapshot');
mkdir('cache');

export interface TxStream {
tx: string;
height: string;
type: string;
}

export const rescan = createWriteStream('.rescan.temp');
export let bar: ProgressBar;

export async function startRescan(path: string = 'cache/.rescan') {
log.info('[rescan] starting rescan');

if (existsSync(path)) {
log.info('[rescan] found existing rescan file. Indexing missing transactions.');
await streamTransactions(path);
}
}

export async function streamTransactions(path: string) {
const txs: Array<TxStream> = [];

createReadStream(path)
.pipe(split())
.pipe(mapSync((line: string) => {
const [tx, height, type] = line.split(',');
txs.push({tx, height, type});
}))
.on('end', async () => {
txs.pop();

for (let i = 0; i < txs.length; i++) {
const {tx, height, type} = txs[i];
await restoreTransaction(tx, height, type);
}

const rescan = readFileSync('.rescan.temp');
writeFileSync(path, rescan);
unlinkSync('.resync.temp');

log.info('[rescan] complete, unindexed transaction stored in .rescan');

process.exit();
});
}

export async function restoreTransaction(tx: string, height: string, type: string) {
try {
if (type === 'normal') {
await storeTransaction(tx, Number(height));
}

if (type === 'ans') {
await processAns(tx, Number(height));
}

await importTransactions(`${process.cwd()}/cache/transaction.csv`);
await importTags(`${process.cwd()}/cache/tags.csv`);

streams.transaction.cache = createWriteStream('cache/transaction.csv');
streams.tags.cache = createWriteStream('cache/tags.csv');

log.info(`[rescan] successfully added missing tx ${tx} at height ${height}`);
} catch (error) {
log.info(`[rescan] failed ${tx} at ${height}, added to the .rescan.temp file. It was not added to the database`);
rescan.write(`${tx},${height}\n`);
}
}
Loading

0 comments on commit f292a43

Please sign in to comment.