From 802a416c7ebaccf00235e567e2931ad6d42e90b9 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 5 Sep 2024 13:28:49 -0700 Subject: [PATCH] Additional direct fetch improvements (#678) - use existing headersTimeout in undici to limit time to headers fetch to 30 seconds, reject direct fetch if timeout is reached - allow full page timeout for loading payload via direct fetch - support setting global fetch() settings - add markPageUsed() to only reuse pages when not doing direct fetch - apply auth headers to direct fetch - catch failed fetch and timeout errors - support failOnFailedSeeds for direct fetch, ensure timeout is working --- src/crawler.ts | 129 +++++++++++++++-------------- src/replaycrawler.ts | 2 + src/util/constants.ts | 4 + src/util/originoverride.ts | 2 + src/util/proxy.ts | 33 ++++---- src/util/recorder.ts | 32 +++++-- src/util/worker.ts | 17 ++-- tests/multi-instance-crawl.test.js | 2 +- 8 files changed, 130 insertions(+), 91 deletions(-) diff --git a/src/crawler.ts b/src/crawler.ts index 8417efb43..4976faa41 100644 --- a/src/crawler.ts +++ b/src/crawler.ts @@ -48,6 +48,8 @@ import { BEHAVIOR_LOG_FUNC, DEFAULT_SELECTORS, DISPLAY, + PAGE_OP_TIMEOUT_SECS, + SITEMAP_INITIAL_FETCH_TIMEOUT_SECS, } from "./util/constants.js"; import { AdBlockRules, BlockRules } from "./util/blockrules.js"; @@ -81,10 +83,6 @@ const behaviors = fs.readFileSync( { encoding: "utf8" }, ); -const FETCH_TIMEOUT_SECS = 30; -const PAGE_OP_TIMEOUT_SECS = 5; -const SITEMAP_INITIAL_FETCH_TIMEOUT_SECS = 30; - const RUN_DETACHED = process.env.DETACHED_CHILD_PROC == "1"; const POST_CRAWL_STATES = [ @@ -265,12 +263,11 @@ export class Crawler { this.seeds = this.params.scopedSeeds as ScopedSeed[]; this.numOriginalSeeds = this.seeds.length; - // sum of page load + behavior timeouts + 2 x fetch + cloudflare + link extraction timeouts + extra page delay + // sum of page load + behavior timeouts + 2 x pageop timeouts (for cloudflare, link extraction) + extra page delay // if exceeded, will interrupt and move on to next page (likely behaviors or some other operation is stuck) this.maxPageTime = this.params.pageLoadTimeout + this.params.behaviorTimeout + - FETCH_TIMEOUT_SECS * 2 + PAGE_OP_TIMEOUT_SECS * 2 + this.params.pageExtraDelay; @@ -861,10 +858,6 @@ self.__bx_behaviors.selectMainBehavior(); seedId, seedUrl: this.seeds[seedId].url, }); - await page.setExtraHTTPHeaders({ Authorization: auth }); - opts.isAuthSet = true; - } else if (opts.isAuthSet) { - await page.setExtraHTTPHeaders({}); } const logDetails = { page: url, workerid }; @@ -873,14 +866,25 @@ self.__bx_behaviors.selectMainBehavior(); if (directFetchCapture) { try { - const { fetched, mime, ts } = await timedRun( - directFetchCapture({ url, headers: this.headers, cdp }), - this.params.pageLoadTimeout, - "Direct fetch capture attempt timed out", + const headers = auth + ? { Authorization: auth, ...this.headers } + : this.headers; + + const result = await timedRun( + directFetchCapture({ url, headers, cdp }), + this.params.timeout, + "Direct fetch of page URL timed out", logDetails, "fetch", - true, ); + + // fetched timed out, already logged, don't retry in browser + if (!result) { + return; + } + + const { fetched, mime, ts } = result; + if (mime) { data.mime = mime; data.isHTMLPage = isHTMLMime(mime); @@ -897,15 +901,33 @@ self.__bx_behaviors.selectMainBehavior(); return; } } catch (e) { - // filtered out direct fetch - logger.debug( - "Direct fetch response not accepted, continuing with browser fetch", - logDetails, - "fetch", - ); + if (e instanceof Error && e.message === "response-filtered-out") { + // filtered out direct fetch + logger.debug( + "Direct fetch response not accepted, continuing with browser fetch", + logDetails, + "fetch", + ); + } else { + logger.error( + "Direct fetch of page URL failed", + { e, ...logDetails }, + "fetch", + ); + return; + } } } + opts.markPageUsed(); + + if (auth) { + await page.setExtraHTTPHeaders({ Authorization: auth }); + opts.isAuthSet = true; + } else if (opts.isAuthSet) { + await page.setExtraHTTPHeaders({}); + } + // run custom driver here await this.driver({ page, data, crawler: this }); @@ -1020,27 +1042,35 @@ self.__bx_behaviors.selectMainBehavior(); // if page loaded, considered page finished successfully // (even if behaviors timed out) - const { loadState, logDetails } = data; + const { loadState, logDetails, depth, url } = data; if (data.loadState >= LoadState.FULL_PAGE_LOADED) { logger.info("Page Finished", { loadState, ...logDetails }, "pageStatus"); - await this.crawlState.markFinished(data.url); + await this.crawlState.markFinished(url); if (this.healthChecker) { this.healthChecker.resetErrors(); } + + await this.serializeConfig(); + + await this.checkLimits(); } else { - await this.crawlState.markFailed(data.url); + await this.crawlState.markFailed(url); if (this.healthChecker) { this.healthChecker.incError(); } - } - await this.serializeConfig(); + await this.serializeConfig(); + + if (depth === 0 && this.params.failOnFailedSeed) { + logger.fatal("Seed Page Load Failed, failing crawl", {}, "general", 1); + } - await this.checkLimits(); + await this.checkLimits(); + } } async teardownPage({ workerid }: WorkerOpts) { @@ -1694,8 +1724,6 @@ self.__bx_behaviors.selectMainBehavior(); const logDetails = data.logDetails; - const failCrawlOnError = depth === 0 && this.params.failOnFailedSeed; - // Attempt to load the page: // - Already tried direct fetch w/o browser before getting here, and that resulted in an HTML page or non-200 response // so now loading using the browser @@ -1760,19 +1788,8 @@ self.__bx_behaviors.selectMainBehavior(); ); data.skipBehaviors = true; } else if (!downloadResponse) { - if (failCrawlOnError) { - // if fail on error, immediately fail here - logger.fatal( - "Page Load Timeout, failing crawl", - { - msg, - ...logDetails, - }, - "general", - 1, - ); - // log if not already log and rethrow, consider page failed - } else if (msg !== "logged") { + // log if not already log and rethrow, consider page failed + if (msg !== "logged") { logger.error("Page Load Failed, skipping page", { msg, loadState: data.loadState, @@ -1818,26 +1835,14 @@ self.__bx_behaviors.selectMainBehavior(); } if (failed) { - if (failCrawlOnError) { - logger.fatal( - "Seed Page Load Error, failing crawl", - { - status, - ...logDetails, - }, - "general", - 1, - ); - } else { - logger.error( - isChromeError ? "Page Crashed on Load" : "Page Invalid Status", - { - status, - ...logDetails, - }, - ); - throw new Error("logged"); - } + logger.error( + isChromeError ? "Page Crashed on Load" : "Page Invalid Status", + { + status, + ...logDetails, + }, + ); + throw new Error("logged"); } const contentType = resp.headers()["content-type"]; diff --git a/src/replaycrawler.ts b/src/replaycrawler.ts index 7ac382673..455cdc03e 100644 --- a/src/replaycrawler.ts +++ b/src/replaycrawler.ts @@ -397,6 +397,8 @@ export class ReplayCrawler extends Crawler { return; } + opts.markPageUsed(); + const date = new Date(ts); const timestamp = date.toISOString().slice(0, 19).replace(/[T:-]/g, ""); diff --git a/src/util/constants.ts b/src/util/constants.ts index 7a069cad0..681debf0e 100644 --- a/src/util/constants.ts +++ b/src/util/constants.ts @@ -26,6 +26,10 @@ export const BEHAVIOR_LOG_FUNC = "__bx_log"; export const ADD_LINK_FUNC = "__bx_addLink"; export const MAX_DEPTH = 1000000; +export const FETCH_HEADERS_TIMEOUT_SECS = 30; +export const PAGE_OP_TIMEOUT_SECS = 5; +export const SITEMAP_INITIAL_FETCH_TIMEOUT_SECS = 30; + export const DEFAULT_SELECTORS = [ { selector: "a[href]", diff --git a/src/util/originoverride.ts b/src/util/originoverride.ts index cc37118cd..b74e9f79d 100644 --- a/src/util/originoverride.ts +++ b/src/util/originoverride.ts @@ -2,6 +2,8 @@ import { HTTPRequest, Page } from "puppeteer-core"; import { formatErr, logger } from "./logger.js"; import { Browser } from "./browser.js"; +import { fetch } from "undici"; + export class OriginOverride { originOverride: { origUrl: URL; destUrl: URL }[]; diff --git a/src/util/proxy.ts b/src/util/proxy.ts index 82bd22106..50282b403 100644 --- a/src/util/proxy.ts +++ b/src/util/proxy.ts @@ -1,5 +1,5 @@ import net from "net"; -import { Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici"; +import { Agent, Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici"; import child_process from "child_process"; @@ -7,6 +7,7 @@ import { logger } from "./logger.js"; import { socksDispatcher } from "fetch-socks"; import type { SocksProxyType } from "socks/typings/common/constants.js"; +import { FETCH_HEADERS_TIMEOUT_SECS } from "./constants.js"; const SSH_PROXY_LOCAL_PORT = 9722; @@ -29,7 +30,7 @@ export async function initProxy( // eslint-disable-next-line @typescript-eslint/no-explicit-any params: Record, detached: boolean, -) { +): Promise { let proxy = params.proxyServer; if (!proxy) { @@ -38,24 +39,28 @@ export async function initProxy( if (proxy && proxy.startsWith("ssh://")) { proxy = await runSSHD(params, detached); } - if (proxy) { - const dispatcher = createDispatcher(proxy); - if (dispatcher) { - setGlobalDispatcher(dispatcher); - return proxy; - } - } - return ""; + + const agentOpts: Agent.Options = { + headersTimeout: FETCH_HEADERS_TIMEOUT_SECS * 1000, + }; + + // set global fetch() dispatcher (with proxy, if any) + const dispatcher = createDispatcher(proxy, agentOpts); + setGlobalDispatcher(dispatcher); + return proxy; } -export function createDispatcher(proxyUrl: string): Dispatcher | undefined { +export function createDispatcher( + proxyUrl: string, + opts: Agent.Options, +): Dispatcher { if (proxyUrl.startsWith("http://") || proxyUrl.startsWith("https://")) { // HTTP PROXY does not support auth, as it's not supported in the browser // so must drop username/password for consistency const url = new URL(proxyUrl); url.username = ""; url.password = ""; - return new ProxyAgent({ uri: url.href }); + return new ProxyAgent({ uri: url.href, ...opts }); } else if ( proxyUrl.startsWith("socks://") || proxyUrl.startsWith("socks5://") || @@ -71,9 +76,9 @@ export function createDispatcher(proxyUrl: string): Dispatcher | undefined { userId: url.username || undefined, password: url.password || undefined, }; - return socksDispatcher(params); + return socksDispatcher(params, { ...opts, connect: undefined }); } else { - return undefined; + return new Agent(opts); } } diff --git a/src/util/recorder.ts b/src/util/recorder.ts index 766bdcaff..0ce2dd7ee 100644 --- a/src/util/recorder.ts +++ b/src/util/recorder.ts @@ -8,7 +8,7 @@ import { isRedirectStatus, } from "./reqresp.js"; -import { fetch, Response } from "undici"; +import { fetch, getGlobalDispatcher, Response } from "undici"; // @ts-expect-error TODO fill in why error is expected import { getCustomRewriter } from "@webrecorder/wabac/src/rewrite/index.js"; @@ -1188,12 +1188,6 @@ export class Recorder { reqresp.requestHeaders = headers; reqresp.ts = ts; - logger.debug( - "Directly fetching page URL without browser", - { url, ...this.logDetails }, - "recorder", - ); - let mime: string = ""; const filter = (resp: Response) => { @@ -1207,7 +1201,17 @@ export class Recorder { mime = ct.split(";")[0]; } - return !isHTMLMime(mime); + const result = !isHTMLMime(mime); + + if (result) { + logger.info( + "Directly fetching page URL without browser", + { url, ...this.logDetails }, + "fetch", + ); + } + + return result; }; // ignore dupes: if previous URL was not a page, still load as page. if previous was page, @@ -1389,7 +1393,7 @@ class AsyncFetcher { externalBuffer.buffers = [reqresp.payload]; } else if (fh) { logger.warn( - "Large streamed written to WARC, but not returned to browser, requires reading into memory", + "Large payload written to WARC, but not returned to browser (would require rereading into memory)", { url, actualSize: reqresp.readSize, maxSize: this.maxFetchSize }, "recorder", ); @@ -1449,12 +1453,22 @@ class AsyncFetcher { signal = abort.signal; } + const dispatcher = getGlobalDispatcher().compose((dispatch) => { + return (opts, handler) => { + if (opts.headers) { + reqresp.requestHeaders = opts.headers as Record; + } + return dispatch(opts, handler); + }; + }); + const resp = await fetch(url!, { method, headers, body: reqresp.postData || undefined, signal, redirect: this.manualRedirect ? "manual" : "follow", + dispatcher, }); if (this.filter && !this.filter(resp) && abort) { diff --git a/src/util/worker.ts b/src/util/worker.ts index 17d3580a8..b56ee58ec 100644 --- a/src/util/worker.ts +++ b/src/util/worker.ts @@ -2,7 +2,11 @@ import os from "os"; import { logger, formatErr } from "./logger.js"; import { sleep, timedRun } from "./timing.js"; -import { DirectFetchRequest, Recorder } from "./recorder.js"; +import { + DirectFetchRequest, + DirectFetchResponse, + Recorder, +} from "./recorder.js"; import { rxEscape } from "./seeds.js"; import { CDPSession, Page } from "puppeteer-core"; import { PageState, WorkerId } from "./state.js"; @@ -21,10 +25,9 @@ export type WorkerOpts = { // eslint-disable-next-line @typescript-eslint/ban-types callbacks: Record; directFetchCapture: - | (( - request: DirectFetchRequest, - ) => Promise<{ fetched: boolean; mime: string; ts: Date }>) + | ((request: DirectFetchRequest) => Promise) | null; + markPageUsed: () => void; frameIdToExecId: Map; isAuthSet?: boolean; }; @@ -134,7 +137,6 @@ export class PageWorker { async initPage(url: string): Promise { let reuse = !this.crashed && !!this.opts && !!this.page; if (!this.alwaysReuse) { - ++this.reuseCount; reuse = this.reuseCount <= MAX_REUSE && this.isSameOrigin(url); } if (reuse) { @@ -183,6 +185,11 @@ export class PageWorker { callbacks: this.callbacks, directFetchCapture, frameIdToExecId: new Map(), + markPageUsed: () => { + if (!this.alwaysReuse) { + this.reuseCount++; + } + }, }; if (this.recorder) { diff --git a/tests/multi-instance-crawl.test.js b/tests/multi-instance-crawl.test.js index 2890a0b9c..33c9383d2 100644 --- a/tests/multi-instance-crawl.test.js +++ b/tests/multi-instance-crawl.test.js @@ -33,7 +33,7 @@ afterAll(async () => { }); function runCrawl(name) { - const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=crawl --hostname=${name} webrecorder/browsertrix-crawler crawl --url https://www.webrecorder.net/ --limit 4 --collection shared-${name} --crawlId testcrawl --redisStoreUrl redis://redis:6379`); + const crawler = exec(`docker run --rm -v $PWD/test-crawls:/crawls --network=crawl --hostname=${name} webrecorder/browsertrix-crawler crawl --url https://www.webrecorder.net/ --limit 4 --exclude community --collection shared-${name} --crawlId testcrawl --redisStoreUrl redis://redis:6379`); return new Promise((resolve) => { crawler.on("exit", (code) => {