From 0985f9659a9ed58c9fe2d59bcbe140542e4a3841 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 17 Aug 2023 08:30:56 -0700 Subject: [PATCH] recorder fixes: - add 'takeReader()' to match takeStreamIter() and catch any exceptions, mark as 'truncated' and finish - add 'WARC-Truncated' in case response is possibly truncated - add 'asyncLoading' to reqresp to avoid removing response while async loading is in progress, avoids navigating away from page --- util/recorder.js | 62 +++++++++++++++++++++++++++++++++++------------- util/reqresp.js | 6 +++++ 2 files changed, 51 insertions(+), 17 deletions(-) diff --git a/util/recorder.js b/util/recorder.js index 5ca6fb190..1e940e364 100644 --- a/util/recorder.js +++ b/util/recorder.js @@ -13,7 +13,7 @@ import { RequestResponseInfo } from "./reqresp.js"; import { baseRules as baseDSRules } from "@webrecorder/wabac/src/rewrite/index.js"; import { rewriteDASH, rewriteHLS } from "@webrecorder/wabac/src/rewrite/rewriteVideo.js"; -import { WARCRecord, AsyncIterReader, concatChunks } from "warcio"; +import { WARCRecord } from "warcio"; import { WARCSerializer } from "warcio/node"; import { WARCWriter } from "./warcwriter.js"; @@ -217,12 +217,14 @@ export class Recorder } handleLoadingFinished(params) { - const reqresp = this.removeReqResp(params.requestId); + const reqresp = this.pendingReqResp(params.requestId, true); - if (!reqresp || !reqresp.url) { + if (!reqresp || !reqresp.asyncLoading) { return; } + this.removeReqResp(params.requestId); + if (!this.isValidUrl(reqresp.url)) { return; } @@ -600,6 +602,7 @@ class AsyncFetcher //super(); this.reqresp = reqresp; this.reqresp.expectedSize = expectedSize; + this.reqresp.asyncLoading = true; this.networkId = networkId; @@ -610,10 +613,10 @@ class AsyncFetcher } async load() { - const { reqresp, recorder, networkId } = this; + const { reqresp, recorder, networkId, filename } = this; const { url } = reqresp; - const { pageid, crawlState, gzip } = recorder; + const { pageid, crawlState, gzip, logDetails } = recorder; let fetched = false; @@ -637,14 +640,14 @@ class AsyncFetcher } reqresp.readSize = readSize; } catch (e) { - logger.error("Error reading + digesting payload", {url, filename: this.filename, ...errJSON(e), ...this.logDetails}, "recorder"); + logger.error("Error reading + digesting payload", {url, filename, ...errJSON(e), ...logDetails}, "recorder"); } if (reqresp.readSize === reqresp.expectedSize || reqresp.expectedSize < 0) { - logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...this.logDetails}, "recorder"); + logger.debug("Async fetch: streaming done", {size: reqresp.readSize, expected: reqresp.expectedSize, networkId, url, ...logDetails}, "recorder"); } else { - logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...this.logDetails}, "recorder"); + logger.warn("Async fetch: possible response size mismatch", {size: reqresp.readSize, expected: reqresp.expectedSize, url, ...logDetails}, "recorder"); //await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url); //return fetched; } @@ -655,7 +658,7 @@ class AsyncFetcher const { currSize, buffers, fh } = externalBuffer; if (buffers && buffers.length && !fh) { - reqresp.payload = await concatChunks(buffers, currSize); + reqresp.payload = Buffer.concat(buffers, currSize); } } @@ -666,7 +669,7 @@ class AsyncFetcher recorder.warcQ.add(() => recorder.writer.writeRecordPair(responseRecord, requestRecord, serializer)); } catch (e) { - logger.error("Error streaming to file", {url, networkId, filename: this.filename, ...errJSON(e), ...this.logDetails}, "recorder"); + logger.error("Error streaming to file", {url, networkId, filename, ...errJSON(e), ...logDetails}, "recorder"); await crawlState.removeDupe(ASYNC_FETCH_DUPE_KEY, url); } finally { recorder.removeReqResp(networkId); @@ -700,19 +703,40 @@ class AsyncFetcher reqresp.fillFetchResponse(resp); - return AsyncIterReader.fromReadable(resp.body.getReader()); + return this.takeReader(resp.body.getReader()); + } + + async* takeReader(reader) { + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + + yield value; + } + } catch (e) { + logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder"); + this.reqresp.truncated = "disconnect"; + } } async* takeStreamIter(cdp, stream) { - while (true) { - const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream}); - const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8"); + try { + while (true) { + const {data, base64Encoded, eof} = await cdp.send("IO.read", {handle: stream}); + const buff = Buffer.from(data, base64Encoded ? "base64" : "utf-8"); - yield buff; + yield buff; - if (eof) { - break; + if (eof) { + break; + } } + } catch (e) { + logger.warn("takeStream interrupted", {...errJSON(e), url: this.reqresp.url, ...this.recorder.logDetails}, "recorder"); + this.reqresp.truncated = "disconnect"; } } } @@ -799,6 +823,10 @@ function createResponse(reqresp, pageid, contentIter) { "WARC-Page-ID": pageid, }; + if (reqresp.truncated) { + warcHeaders["WARC-Truncated"] = reqresp.truncated; + } + if (!contentIter) { contentIter = [reqresp.payload]; } diff --git a/util/reqresp.js b/util/reqresp.js index 523c63b21..17928c1c3 100644 --- a/util/reqresp.js +++ b/util/reqresp.js @@ -46,6 +46,12 @@ export class RequestResponseInfo this.readSize = 0; this.expectedSize = 0; + + // set to true to indicate async loading in progress + this.asyncLoading = false; + + // set to add truncated message + this.truncated = null; } fillRequest(params) {