Skip to content

Commit

Permalink
Additional direct fetch improvements (#678)
Browse files Browse the repository at this point in the history
- use existing headersTimeout in undici to limit time to headers fetch
to 30 seconds, reject direct fetch if timeout is reached
- allow full page timeout for loading payload via direct fetch
- support setting global fetch() settings
- add markPageUsed() to only reuse pages when not doing direct fetch
- apply auth headers to direct fetch
- catch failed fetch and timeout errors
- support failOnFailedSeeds for direct fetch, ensure timeout is working
  • Loading branch information
ikreymer authored Sep 5, 2024
1 parent 9d0e342 commit 802a416
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 91 deletions.
129 changes: 67 additions & 62 deletions src/crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ import {
BEHAVIOR_LOG_FUNC,
DEFAULT_SELECTORS,
DISPLAY,
PAGE_OP_TIMEOUT_SECS,
SITEMAP_INITIAL_FETCH_TIMEOUT_SECS,
} from "./util/constants.js";

import { AdBlockRules, BlockRules } from "./util/blockrules.js";
Expand Down Expand Up @@ -81,10 +83,6 @@ const behaviors = fs.readFileSync(
{ encoding: "utf8" },
);

const FETCH_TIMEOUT_SECS = 30;
const PAGE_OP_TIMEOUT_SECS = 5;
const SITEMAP_INITIAL_FETCH_TIMEOUT_SECS = 30;

const RUN_DETACHED = process.env.DETACHED_CHILD_PROC == "1";

const POST_CRAWL_STATES = [
Expand Down Expand Up @@ -265,12 +263,11 @@ export class Crawler {
this.seeds = this.params.scopedSeeds as ScopedSeed[];
this.numOriginalSeeds = this.seeds.length;

// sum of page load + behavior timeouts + 2 x fetch + cloudflare + link extraction timeouts + extra page delay
// sum of page load + behavior timeouts + 2 x pageop timeouts (for cloudflare, link extraction) + extra page delay
// if exceeded, will interrupt and move on to next page (likely behaviors or some other operation is stuck)
this.maxPageTime =
this.params.pageLoadTimeout +
this.params.behaviorTimeout +
FETCH_TIMEOUT_SECS * 2 +
PAGE_OP_TIMEOUT_SECS * 2 +
this.params.pageExtraDelay;

Expand Down Expand Up @@ -861,10 +858,6 @@ self.__bx_behaviors.selectMainBehavior();
seedId,
seedUrl: this.seeds[seedId].url,
});
await page.setExtraHTTPHeaders({ Authorization: auth });
opts.isAuthSet = true;
} else if (opts.isAuthSet) {
await page.setExtraHTTPHeaders({});
}

const logDetails = { page: url, workerid };
Expand All @@ -873,14 +866,25 @@ self.__bx_behaviors.selectMainBehavior();

if (directFetchCapture) {
try {
const { fetched, mime, ts } = await timedRun(
directFetchCapture({ url, headers: this.headers, cdp }),
this.params.pageLoadTimeout,
"Direct fetch capture attempt timed out",
const headers = auth
? { Authorization: auth, ...this.headers }
: this.headers;

const result = await timedRun(
directFetchCapture({ url, headers, cdp }),
this.params.timeout,
"Direct fetch of page URL timed out",
logDetails,
"fetch",
true,
);

// fetched timed out, already logged, don't retry in browser
if (!result) {
return;
}

const { fetched, mime, ts } = result;

if (mime) {
data.mime = mime;
data.isHTMLPage = isHTMLMime(mime);
Expand All @@ -897,15 +901,33 @@ self.__bx_behaviors.selectMainBehavior();
return;
}
} catch (e) {
// filtered out direct fetch
logger.debug(
"Direct fetch response not accepted, continuing with browser fetch",
logDetails,
"fetch",
);
if (e instanceof Error && e.message === "response-filtered-out") {
// filtered out direct fetch
logger.debug(
"Direct fetch response not accepted, continuing with browser fetch",
logDetails,
"fetch",
);
} else {
logger.error(
"Direct fetch of page URL failed",
{ e, ...logDetails },
"fetch",
);
return;
}
}
}

opts.markPageUsed();

if (auth) {
await page.setExtraHTTPHeaders({ Authorization: auth });
opts.isAuthSet = true;
} else if (opts.isAuthSet) {
await page.setExtraHTTPHeaders({});
}

// run custom driver here
await this.driver({ page, data, crawler: this });

Expand Down Expand Up @@ -1020,27 +1042,35 @@ self.__bx_behaviors.selectMainBehavior();

// if page loaded, considered page finished successfully
// (even if behaviors timed out)
const { loadState, logDetails } = data;
const { loadState, logDetails, depth, url } = data;

if (data.loadState >= LoadState.FULL_PAGE_LOADED) {
logger.info("Page Finished", { loadState, ...logDetails }, "pageStatus");

await this.crawlState.markFinished(data.url);
await this.crawlState.markFinished(url);

if (this.healthChecker) {
this.healthChecker.resetErrors();
}

await this.serializeConfig();

await this.checkLimits();
} else {
await this.crawlState.markFailed(data.url);
await this.crawlState.markFailed(url);

if (this.healthChecker) {
this.healthChecker.incError();
}
}

await this.serializeConfig();
await this.serializeConfig();

if (depth === 0 && this.params.failOnFailedSeed) {
logger.fatal("Seed Page Load Failed, failing crawl", {}, "general", 1);
}

await this.checkLimits();
await this.checkLimits();
}
}

async teardownPage({ workerid }: WorkerOpts) {
Expand Down Expand Up @@ -1694,8 +1724,6 @@ self.__bx_behaviors.selectMainBehavior();

const logDetails = data.logDetails;

const failCrawlOnError = depth === 0 && this.params.failOnFailedSeed;

// Attempt to load the page:
// - Already tried direct fetch w/o browser before getting here, and that resulted in an HTML page or non-200 response
// so now loading using the browser
Expand Down Expand Up @@ -1760,19 +1788,8 @@ self.__bx_behaviors.selectMainBehavior();
);
data.skipBehaviors = true;
} else if (!downloadResponse) {
if (failCrawlOnError) {
// if fail on error, immediately fail here
logger.fatal(
"Page Load Timeout, failing crawl",
{
msg,
...logDetails,
},
"general",
1,
);
// log if not already log and rethrow, consider page failed
} else if (msg !== "logged") {
// log if not already log and rethrow, consider page failed
if (msg !== "logged") {
logger.error("Page Load Failed, skipping page", {
msg,
loadState: data.loadState,
Expand Down Expand Up @@ -1818,26 +1835,14 @@ self.__bx_behaviors.selectMainBehavior();
}

if (failed) {
if (failCrawlOnError) {
logger.fatal(
"Seed Page Load Error, failing crawl",
{
status,
...logDetails,
},
"general",
1,
);
} else {
logger.error(
isChromeError ? "Page Crashed on Load" : "Page Invalid Status",
{
status,
...logDetails,
},
);
throw new Error("logged");
}
logger.error(
isChromeError ? "Page Crashed on Load" : "Page Invalid Status",
{
status,
...logDetails,
},
);
throw new Error("logged");
}

const contentType = resp.headers()["content-type"];
Expand Down
2 changes: 2 additions & 0 deletions src/replaycrawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,8 @@ export class ReplayCrawler extends Crawler {
return;
}

opts.markPageUsed();

const date = new Date(ts);

const timestamp = date.toISOString().slice(0, 19).replace(/[T:-]/g, "");
Expand Down
4 changes: 4 additions & 0 deletions src/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ export const BEHAVIOR_LOG_FUNC = "__bx_log";
export const ADD_LINK_FUNC = "__bx_addLink";
export const MAX_DEPTH = 1000000;

export const FETCH_HEADERS_TIMEOUT_SECS = 30;
export const PAGE_OP_TIMEOUT_SECS = 5;
export const SITEMAP_INITIAL_FETCH_TIMEOUT_SECS = 30;

export const DEFAULT_SELECTORS = [
{
selector: "a[href]",
Expand Down
2 changes: 2 additions & 0 deletions src/util/originoverride.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { HTTPRequest, Page } from "puppeteer-core";
import { formatErr, logger } from "./logger.js";
import { Browser } from "./browser.js";

import { fetch } from "undici";

export class OriginOverride {
originOverride: { origUrl: URL; destUrl: URL }[];

Expand Down
33 changes: 19 additions & 14 deletions src/util/proxy.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import net from "net";
import { Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici";
import { Agent, Dispatcher, ProxyAgent, setGlobalDispatcher } from "undici";

import child_process from "child_process";

import { logger } from "./logger.js";

import { socksDispatcher } from "fetch-socks";
import type { SocksProxyType } from "socks/typings/common/constants.js";
import { FETCH_HEADERS_TIMEOUT_SECS } from "./constants.js";

const SSH_PROXY_LOCAL_PORT = 9722;

Expand All @@ -29,7 +30,7 @@ export async function initProxy(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
params: Record<string, any>,
detached: boolean,
) {
): Promise<string | undefined> {
let proxy = params.proxyServer;

if (!proxy) {
Expand All @@ -38,24 +39,28 @@ export async function initProxy(
if (proxy && proxy.startsWith("ssh://")) {
proxy = await runSSHD(params, detached);
}
if (proxy) {
const dispatcher = createDispatcher(proxy);
if (dispatcher) {
setGlobalDispatcher(dispatcher);
return proxy;
}
}
return "";

const agentOpts: Agent.Options = {
headersTimeout: FETCH_HEADERS_TIMEOUT_SECS * 1000,
};

// set global fetch() dispatcher (with proxy, if any)
const dispatcher = createDispatcher(proxy, agentOpts);
setGlobalDispatcher(dispatcher);
return proxy;
}

export function createDispatcher(proxyUrl: string): Dispatcher | undefined {
export function createDispatcher(
proxyUrl: string,
opts: Agent.Options,
): Dispatcher {
if (proxyUrl.startsWith("http://") || proxyUrl.startsWith("https://")) {
// HTTP PROXY does not support auth, as it's not supported in the browser
// so must drop username/password for consistency
const url = new URL(proxyUrl);
url.username = "";
url.password = "";
return new ProxyAgent({ uri: url.href });
return new ProxyAgent({ uri: url.href, ...opts });
} else if (
proxyUrl.startsWith("socks://") ||
proxyUrl.startsWith("socks5://") ||
Expand All @@ -71,9 +76,9 @@ export function createDispatcher(proxyUrl: string): Dispatcher | undefined {
userId: url.username || undefined,
password: url.password || undefined,
};
return socksDispatcher(params);
return socksDispatcher(params, { ...opts, connect: undefined });
} else {
return undefined;
return new Agent(opts);
}
}

Expand Down
Loading

0 comments on commit 802a416

Please sign in to comment.