Skip to content

Commit

Permalink
Streaming in-place WACZ creation + CDXJ indexing (#673)
Browse files Browse the repository at this point in the history
Fixes #674 

This PR supersedes #505, and instead of using js-wacz for optimized WACZ
creation:
- generates an 'in-place' or 'streaming' WACZ in the crawler, without
having to copy the data again.
- WACZ contents are streamed to remote upload (or to disk) from existing
files on disk
- CDXJ indices per-WARC are first written to 'warc-cdx' directory, then merged using the linux 'sort' command, and compressed to ZipNum if >50K (or always if using --generateCDX)
- All data in the WARCs is written and read only once
- Should result in significant speed / disk usage improvements:
previously WARC was written once, then read again (for CDXJ indexing),
read again (for adding to new WACZ ZIP), written to disk (into new WACZ
ZIP), read again (if upload to remote endpoint). Now, WARCs are written
once, along with the per-WARC CDXJ, the CDXJ only is reread, sorted and merged on-disk, and all
data is read once to either generate WACZ on disk or upload to remote.

---------

Co-authored-by: Tessa Walsh <[email protected]>
  • Loading branch information
ikreymer and tw4l authored Aug 29, 2024
1 parent 8934fea commit 85a07af
Show file tree
Hide file tree
Showing 15 changed files with 644 additions and 197 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ jobs:
- name: add http-server for tests
run: yarn add -D http-server

- name: install py-wacz as root for tests
run: sudo pip install wacz

- name: run all tests as root
run: sudo DOCKER_HOST_NAME=172.17.0.1 yarn test
run: sudo DOCKER_HOST_NAME=172.17.0.1 yarn test -validate

- name: run saved state + qa compare test as non-root - with volume owned by current user
run: |
Expand Down
7 changes: 0 additions & 7 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ EXPOSE 9222 9223 6080

WORKDIR /app

ADD requirements.txt /app/
RUN python3 -m venv /app/python-venv && \
/app/python-venv/bin/pip install -U setuptools && \
/app/python-venv/bin/pip install -r requirements.txt && \
ln -s /app/python-venv/bin/wacz /usr/bin/wacz && \
ln -s /app/python-venv/bin/cdxj-indexer /usr/bin/cdxj-indexer

ADD package.json yarn.lock /app/

# to allow forcing rebuilds from this stage
Expand Down
10 changes: 7 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
},
"dependencies": {
"@novnc/novnc": "^1.4.0",
"@types/sax": "^1.2.7",
"@webrecorder/wabac": "^2.19.7",
"@webrecorder/wabac": "^2.19.8",
"browsertrix-behaviors": "^0.6.4",
"client-zip": "^2.4.5",
"fetch-socks": "^1.3.0",
"get-folder-size": "^4.0.0",
"husky": "^8.0.3",
Expand All @@ -36,7 +36,7 @@
"tsc": "^2.0.4",
"undici": "^6.18.2",
"uuid": "8.3.2",
"warcio": "^2.2.1",
"warcio": "^2.3.0",
"ws": "^7.4.4",
"yargs": "^17.7.2"
},
Expand All @@ -46,6 +46,7 @@
"@types/node": "^20.8.7",
"@types/pixelmatch": "^5.2.6",
"@types/pngjs": "^6.0.4",
"@types/sax": "^1.2.7",
"@types/uuid": "^9.0.6",
"@types/ws": "^8.5.8",
"@typescript-eslint/eslint-plugin": "^6.10.0",
Expand All @@ -62,5 +63,8 @@
"jest": {
"transform": {},
"testTimeout": 90000
},
"resolutions": {
"wrap-ansi": "7.0.0"
}
}
196 changes: 75 additions & 121 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import { parseArgs } from "./util/argParser.js";

import yaml from "js-yaml";

import { WACZ, WACZInitOpts, mergeCDXJ } from "./util/wacz.js";

import { HealthChecker } from "./util/healthcheck.js";
import { TextExtractViaSnapshot } from "./util/textextract.js";
import {
Expand Down Expand Up @@ -62,7 +64,12 @@ import {
import { Recorder } from "./util/recorder.js";
import { SitemapReader } from "./util/sitemapper.js";
import { ScopedSeed } from "./util/seeds.js";
import { WARCWriter, createWARCInfo, setWARCInfo } from "./util/warcwriter.js";
import {
WARCWriter,
createWARCInfo,
setWARCInfo,
streamFinish,
} from "./util/warcwriter.js";
import { isHTMLMime, isRedirectStatus } from "./util/reqresp.js";
import { initProxy } from "./util/proxy.js";

Expand Down Expand Up @@ -117,7 +124,7 @@ export class Crawler {

pagesFH?: WriteStream | null = null;
extraPagesFH?: WriteStream | null = null;
logFH!: WriteStream;
logFH: WriteStream | null = null;

crawlId: string;

Expand Down Expand Up @@ -150,7 +157,8 @@ export class Crawler {

archivesDir: string;
tempdir: string;
tempCdxDir: string;
warcCdxDir: string;
indexesDir: string;

screenshotWriter: WARCWriter | null;
textWriter: WARCWriter | null;
Expand Down Expand Up @@ -288,7 +296,10 @@ export class Crawler {
// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");
this.tempCdxDir = path.join(this.collDir, "tmp-cdx");

// indexes dirs
this.warcCdxDir = path.join(this.collDir, "warc-cdx");
this.indexesDir = path.join(this.collDir, "indexes");

this.screenshotWriter = null;
this.textWriter = null;
Expand Down Expand Up @@ -470,7 +481,7 @@ export class Crawler {
if (!this.params.dryRun) {
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.tempCdxDir, { recursive: true });
await fsp.mkdir(this.warcCdxDir, { recursive: true });
}

this.logFH = fs.createWriteStream(this.logFilename, { flags: "a" });
Expand Down Expand Up @@ -1478,36 +1489,24 @@ self.__bx_behaviors.selectMainBehavior();
await this.combineWARC();
}

if (this.params.generateCDX && !this.params.dryRun) {
logger.info("Generating CDX");
await fsp.mkdir(path.join(this.collDir, "indexes"), { recursive: true });
await this.crawlState.setStatus("generate-cdx");
logger.info("Crawling done");

const warcList = await fsp.readdir(this.archivesDir);
const warcListFull = warcList.map((filename) =>
path.join(this.archivesDir, filename),
if (
(this.params.generateCDX || this.params.generateWACZ) &&
!this.params.dryRun
) {
logger.info("Merging CDX");
await this.crawlState.setStatus(
this.params.generateWACZ ? "generate-wacz" : "generate-cdx",
);

//const indexResult = await this.awaitProcess(child_process.spawn("wb-manager", ["reindex", this.params.collection], {cwd: this.params.cwd}));
const params = [
"-o",
path.join(this.collDir, "indexes", "index.cdxj"),
...warcListFull,
];
const indexResult = await this.awaitProcess(
child_process.spawn("cdxj-indexer", params, { cwd: this.params.cwd }),
await mergeCDXJ(
this.warcCdxDir,
this.indexesDir,
this.params.generateWACZ ? null : false,
);
if (indexResult === 0) {
logger.debug("Indexing complete, CDX successfully created");
} else {
logger.error("Error indexing and generating CDX", {
"status code": indexResult,
});
}
}

logger.info("Crawling done");

if (
this.params.generateWACZ &&
!this.params.dryRun &&
Expand Down Expand Up @@ -1543,11 +1542,9 @@ self.__bx_behaviors.selectMainBehavior();
if (!this.logFH) {
return;
}
try {
await new Promise<void>((resolve) => this.logFH.close(() => resolve()));
} catch (e) {
// ignore
}
const logFH = this.logFH;
this.logFH = null;
await streamFinish(logFH);
}

async generateWACZ() {
Expand Down Expand Up @@ -1577,110 +1574,67 @@ self.__bx_behaviors.selectMainBehavior();
logger.fatal("No WARC Files, assuming crawl failed");
}

logger.debug("End of log file, storing logs in WACZ");
const waczPath = path.join(this.collDir, this.params.collection + ".wacz");

// Build the argument list to pass to the wacz create command
const waczFilename = this.params.collection.concat(".wacz");
const waczPath = path.join(this.collDir, waczFilename);
const streaming = !!this.storage;

const createArgs = [
"create",
"-o",
waczPath,
"--pages",
this.seedPagesFile,
"--extra-pages",
this.otherPagesFile,
"--copy-pages",
"--log-directory",
this.logDir,
];
if (!streaming) {
logger.debug("WACZ will be written to disk", { path: waczPath }, "wacz");
} else {
logger.debug("WACZ will be stream uploaded to remote storage");
}

logger.debug("End of log file in WACZ, storing logs to WACZ file");

await this.closeLog();

const waczOpts: WACZInitOpts = {
input: warcFileList.map((x) => path.join(this.archivesDir, x)),
output: waczPath,
pages: this.pagesDir,
logDirectory: this.logDir,
warcCdxDir: this.warcCdxDir,
indexesDir: this.indexesDir,
softwareString: this.infoString,
};

if (process.env.WACZ_SIGN_URL) {
createArgs.push("--signing-url");
createArgs.push(process.env.WACZ_SIGN_URL);
waczOpts.signingUrl = process.env.WACZ_SIGN_URL;
if (process.env.WACZ_SIGN_TOKEN) {
createArgs.push("--signing-token");
createArgs.push(process.env.WACZ_SIGN_TOKEN);
waczOpts.signingToken = "bearer " + process.env.WACZ_SIGN_TOKEN;
}
}

if (this.params.title) {
createArgs.push("--title");
createArgs.push(this.params.title);
waczOpts.title = this.params.title;
}

if (this.params.description) {
createArgs.push("--desc");
createArgs.push(this.params.description);
}

createArgs.push("-f");

warcFileList.forEach((val) =>
createArgs.push(path.join(this.archivesDir, val)),
);

// create WACZ
const waczResult = await this.awaitProcess(
child_process.spawn("wacz", createArgs, { detached: RUN_DETACHED }),
);

if (waczResult !== 0) {
logger.error("Error creating WACZ", { "status code": waczResult });
logger.fatal("Unable to write WACZ successfully");
waczOpts.description = this.params.description;
}

logger.debug(`WACZ successfully generated and saved to: ${waczPath}`);

// Verify WACZ
/*
const validateArgs = ["validate"];
validateArgs.push("-f");
validateArgs.push(waczPath);
try {
const wacz = new WACZ(waczOpts, this.collDir);
if (!streaming) {
await wacz.generateToFile(waczPath);
}

const waczVerifyResult = await this.awaitProcess(child_process.spawn("wacz", validateArgs));
if (this.storage) {
await this.crawlState.setStatus("uploading-wacz");
const filename = process.env.STORE_FILENAME || "@[email protected]";
const targetFilename = interpolateFilename(filename, this.crawlId);

if (waczVerifyResult !== 0) {
console.log("validate", waczVerifyResult);
logger.fatal("Unable to verify WACZ created successfully");
}
*/
if (this.storage) {
await this.crawlState.setStatus("uploading-wacz");
const filename = process.env.STORE_FILENAME || "@[email protected]";
const targetFilename = interpolateFilename(filename, this.crawlId);
await this.storage.uploadCollWACZ(wacz, targetFilename, isFinished);
return true;
}

await this.storage.uploadCollWACZ(waczPath, targetFilename, isFinished);
return true;
return false;
} catch (e) {
logger.error("Error creating WACZ", e);
if (!streaming) {
logger.fatal("Unable to write WACZ successfully");
}
}

return false;
}

awaitProcess(proc: ChildProcess) {
const stdout: string[] = [];
const stderr: string[] = [];

proc.stdout!.on("data", (data) => {
stdout.push(data.toString());
});

proc.stderr!.on("data", (data) => {
stderr.push(data.toString());
});

return new Promise((resolve) => {
proc.on("close", (code) => {
if (stdout.length) {
logger.debug(stdout.join("\n"));
}
if (stderr.length && this.params.logging.includes("debug")) {
logger.debug(stderr.join("\n"));
}
resolve(code);
});
});
}

logMemory() {
Expand Down Expand Up @@ -2604,7 +2558,7 @@ self.__bx_behaviors.selectMainBehavior();

return new WARCWriter({
archivesDir: this.archivesDir,
tempCdxDir: this.tempCdxDir,
warcCdxDir: this.warcCdxDir,
filenameTemplate,
rolloverSize: this.params.rolloverSize,
gzip,
Expand Down
2 changes: 1 addition & 1 deletion src/util/argParser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class ArgParser {

generateWACZ: {
alias: ["generatewacz", "generateWacz"],
describe: "If set, generate wacz",
describe: "If set, generate WACZ on disk",
type: "boolean",
default: false,
},
Expand Down
1 change: 1 addition & 0 deletions src/util/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export const LOG_CONTEXT_TYPES = [
"crawlStatus",
"links",
"sitemap",
"wacz",
"replay",
"proxy",
] as const;
Expand Down
Loading

0 comments on commit 85a07af

Please sign in to comment.