Skip to content

Commit

Permalink
recorder fixes:
Browse files Browse the repository at this point in the history
- add 'takeReader()' to match takeStreamIter() and catch any exceptions, mark as 'truncated' and finish
- add 'WARC-Truncated' in case response is possibly truncated
- add 'asyncLoading' to reqresp to avoid removing response while async loading is in progress, avoids navigating away from page
  • Loading branch information
ikreymer committed Aug 17, 2023
1 parent b35a33d commit 0985f96
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
62 changes: 45 additions & 17 deletions util/recorder.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { RequestResponseInfo } from "./reqresp.js";
import { baseRules as baseDSRules } from "@webrecorder/wabac/src/rewrite/index.js";
import { rewriteDASH, rewriteHLS } from "@webrecorder/wabac/src/rewrite/rewriteVideo.js";

import { WARCRecord, AsyncIterReader, concatChunks } from "warcio";
import { WARCRecord } from "warcio";
import { WARCSerializer } from "warcio/node";
import { WARCWriter } from "./warcwriter.js";

Expand Down Expand Up @@ -217,12 +217,14 @@ export class Recorder
}

handleLoadingFinished(params) {
const reqresp = this.removeReqResp(params.requestId);
const reqresp = this.pendingReqResp(params.requestId, true);

if (!reqresp || !reqresp.url) {
if (!reqresp || !reqresp.asyncLoading) {
return;
}

this.removeReqResp(params.requestId);

if (!this.isValidUrl(reqresp.url)) {
return;
}
Expand Down Expand Up @@ -600,6 +602,7 @@ class AsyncFetcher
//super();
this.reqresp = reqresp;
this.reqresp.expectedSize = expectedSize;
this.reqresp.asyncLoading = true;

this.networkId = networkId;

Expand All @@ -610,10 +613,10 @@ class AsyncFetcher
}

async load() {
const { reqresp, recorder, networkId } = this;
const { reqresp, recorder, networkId, filename } = this;
const { url } = reqresp;

const { pageid, crawlState, gzip } = recorder;
const { pageid, crawlState, gzip, logDetails } = recorder;

let fetched = false;

Expand All @@ -637,14 +640,14 @@ class AsyncFetcher
}
reqresp.readSize = readSize;
} catch (e) {
logger.error("Error reading + digesting payload", {url, filename: this.filename, ...errJSON(e), ...this.logDetails}, "recorder");
logger.error("Error reading + digesting payload", {url, filename, ...errJSON(e), ...logDetails}, "recorder");
}

if (reqresp.readSize === reqresp.expectedSize || reqresp.expectedSize < 0) {
logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...this.logDetails}, "recorder");
logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...logDetails}, "recorder");

} else {
logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...this.logDetails}, "recorder");
logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...logDetails}, "recorder");
//await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
//return fetched;
}
Expand All @@ -655,7 +658,7 @@ class AsyncFetcher
const { currSize, buffers, fh } = externalBuffer;

if (buffers && buffers.length && !fh) {
reqresp.payload = await concatChunks(buffers, currSize);
reqresp.payload = Buffer.concat(buffers, currSize);
}
}

Expand All @@ -666,7 +669,7 @@ class AsyncFetcher
recorder.warcQ.add(() => recorder.writer.writeRecordPair(responseRecord, requestRecord, serializer));

} catch (e) {
logger.error("Error streaming to file", {url, networkId, filename: this.filename, ...errJSON(e), ...this.logDetails}, "recorder");
logger.error("Error streaming to file", {url, networkId, filename, ...errJSON(e), ...logDetails}, "recorder");
await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url);
} finally {
recorder.removeReqResp(networkId);
Expand Down Expand Up @@ -700,19 +703,40 @@ class AsyncFetcher

reqresp.fillFetchResponse(resp);

return AsyncIterReader.fromReadable(resp.body.getReader());
return this.takeReader(resp.body.getReader());
}

async* takeReader(reader) {
try {
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}

yield value;
}
} catch (e) {
logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}

async* takeStreamIter(cdp, stream) {
while (true) {
const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream});
const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8");
try {
while (true) {
const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream});
const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8");

yield buff;
yield buff;

if (eof) {
break;
if (eof) {
break;
}
}
} catch (e) {
logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder");
this.reqresp.truncated = "disconnect";
}
}
}
Expand Down Expand Up @@ -799,6 +823,10 @@ function createResponse(reqresp, pageid, contentIter) {
"WARC-Page-ID": pageid,
};

if (reqresp.truncated) {
warcHeaders["WARC-Truncated"] = reqresp.truncated;
}

if (!contentIter) {
contentIter = [reqresp.payload];
}
Expand Down
6 changes: 6 additions & 0 deletions util/reqresp.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ export class RequestResponseInfo

this.readSize = 0;
this.expectedSize = 0;

// set to true to indicate async loading in progress
this.asyncLoading = false;

// set to add truncated message
this.truncated = null;
}

fillRequest(params) {
Expand Down

0 comments on commit 0985f96

Please sign in to comment.