Skip to content

Commit

Permalink
WARC writer + incremental indexing fixes (#679)
Browse files Browse the repository at this point in the history
- ensure WARC rollover happens only after response/request + cdx or
single record + cdx have been written
- ensure request payload is buffered for POST request indexing
- update to warcio 2.3.1 for POST request case-insensitive
'content-type' check
- recorder: remove unused 'tempdir', no longer used as warcio chooses a
temp file on it's own
  • Loading branch information
ikreymer authored Sep 5, 2024
1 parent 0d6a0b0 commit 9d0e342
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 34 deletions.
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"tsc": "^2.0.4",
"undici": "^6.18.2",
"uuid": "8.3.2",
"warcio": "^2.3.0",
"warcio": "^2.3.1",
"ws": "^7.4.4",
"yargs": "^17.7.2"
},
Expand Down Expand Up @@ -65,6 +65,7 @@
"testTimeout": 90000
},
"resolutions": {
"wrap-ansi": "7.0.0"
"wrap-ansi": "7.0.0",
"warcio": "^2.3.1"
}
}
4 changes: 0 additions & 4 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ export class Crawler {
otherPagesFile: string;

archivesDir: string;
tempdir: string;
warcCdxDir: string;
indexesDir: string;

Expand Down Expand Up @@ -295,7 +294,6 @@ export class Crawler {

// archives dir
this.archivesDir = path.join(this.collDir, "archive");
this.tempdir = path.join(os.tmpdir(), "tmp-dl");

// indexes dirs
this.warcCdxDir = path.join(this.collDir, "warc-cdx");
Expand Down Expand Up @@ -480,7 +478,6 @@ export class Crawler {

if (!this.params.dryRun) {
await fsp.mkdir(this.archivesDir, { recursive: true });
await fsp.mkdir(this.tempdir, { recursive: true });
await fsp.mkdir(this.warcCdxDir, { recursive: true });
}

Expand Down Expand Up @@ -2581,7 +2578,6 @@ self.__bx_behaviors.selectMainBehavior();
workerid: id,
crawler: this,
writer,
tempdir: this.tempdir,
});

this.browser.recorders.push(res);
Expand Down
27 changes: 4 additions & 23 deletions src/util/recorder.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
import path from "path";

import { v4 as uuidv4 } from "uuid";

import PQueue from "p-queue";

import { logger, formatErr } from "./logger.js";
import { sleep, timedRun, timestampNow } from "./timing.js";
import { sleep, timedRun } from "./timing.js";
import {
RequestResponseInfo,
isHTMLMime,
Expand Down Expand Up @@ -142,8 +138,6 @@ export class Recorder {
logDetails: Record<string, any> = {};
skipping = false;

tempdir: string;

gzip = true;

writer: WARCWriter;
Expand All @@ -157,21 +151,17 @@ export class Recorder {
workerid,
writer,
crawler,
tempdir,
}: {
workerid: WorkerId;
writer: WARCWriter;
crawler: Crawler;
tempdir: string;
}) {
this.workerid = workerid;
this.crawler = crawler;
this.crawlState = crawler.crawlState;

this.writer = writer;

this.tempdir = tempdir;

this.fetcherQ = new PQueue({ concurrency: 1 });

this.frameIdToExecId = null;
Expand Down Expand Up @@ -1274,9 +1264,6 @@ class AsyncFetcher {

recorder: Recorder;

tempdir: string;
filename: string;

manualRedirect = false;

constructor({
Expand All @@ -1299,19 +1286,13 @@ class AsyncFetcher {

this.recorder = recorder;

this.tempdir = recorder.tempdir;
this.filename = path.join(
this.tempdir,
`${timestampNow()}-${uuidv4()}.data`,
);

this.maxFetchSize = maxFetchSize;

this.manualRedirect = manualRedirect;
}

async load() {
const { reqresp, recorder, networkId, filename } = this;
const { reqresp, recorder, networkId } = this;
const { url, status } = reqresp;

const { pageid, crawlState, gzip, logDetails } = recorder;
Expand Down Expand Up @@ -1361,7 +1342,7 @@ class AsyncFetcher {
} catch (e) {
logger.error(
"Error reading + digesting payload",
{ url, filename, ...formatErr(e), ...logDetails },
{ url, ...formatErr(e), ...logDetails },
"recorder",
);
}
Expand Down Expand Up @@ -1436,7 +1417,7 @@ class AsyncFetcher {
}
logger.debug(
"Streaming Fetch Error",
{ url, networkId, filename, ...formatErr(e), ...logDetails },
{ url, networkId, ...formatErr(e), ...logDetails },
"recorder",
);
// indicate response is ultimately not valid
Expand Down
14 changes: 13 additions & 1 deletion src/util/warcwriter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,21 @@ export class WARCWriter implements IndexerOffsetLength {

this._writeCDX(responseRecord);

if (requestRecord.httpHeaders?.method !== "GET") {
await requestRecord.readFully(false);
}

const requestSerializer = new WARCSerializer(requestRecord, opts);
this.recordLength = await this._writeRecord(
requestRecord,
requestSerializer,
);

this._writeCDX(requestRecord);

if (this.offset >= this.rolloverSize) {
this.fh = await this.initFH();
}
}

private addToQueue(
Expand Down Expand Up @@ -197,6 +205,10 @@ export class WARCWriter implements IndexerOffsetLength {
this.recordLength = await this._writeRecord(record, requestSerializer);

this._writeCDX(record);

if (this.offset >= this.rolloverSize) {
this.fh = await this.initFH();
}
}

writeNewResourceRecord(
Expand Down Expand Up @@ -257,7 +269,7 @@ export class WARCWriter implements IndexerOffsetLength {
let total = 0;
const url = record.warcTargetURI;

if (!this.fh || this.offset >= this.rolloverSize) {
if (!this.fh) {
this.fh = await this.initFH();
}

Expand Down
8 changes: 4 additions & 4 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5277,10 +5277,10 @@ walker@^1.0.8:
dependencies:
makeerror "1.0.12"

warcio@^2.3.0:
version "2.3.0"
resolved "https://registry.yarnpkg.com/warcio/-/warcio-2.3.0.tgz#a655df9b5986a53e5d05aa68cda51bfefdfa8347"
integrity sha512-PCHcZ/fDE5+QECOFe/n/vzyDmAITJ1mvLx1jVONJ0uaV9OwcTbIWoh7Z0+OQwQdq8Wr1Nnb2hwhtHJ7J+9rHIQ==
warcio@^2.3.0, warcio@^2.3.1:
version "2.3.1"
resolved "https://registry.yarnpkg.com/warcio/-/warcio-2.3.1.tgz#8ac9de897de1a556161168f2a3938b60929908ca"
integrity sha512-PjcWqzXfs6HdWfHi1V/i8MoMmV5M0Csg3rOa2mqCJ1dmCJXswVfQ0VXbEVumwavNIW2oFFj6LJoCHHeL4Ls/zw==
dependencies:
"@types/pako" "^1.0.7"
"@types/stream-buffers" "^3.0.7"
Expand Down

0 comments on commit 9d0e342

Please sign in to comment.