Skip to content

Commit

Permalink
optimize link extraction: (fixes #376)
Browse files Browse the repository at this point in the history
- dedup urls in browser first
- don't return entire list of URLs, process one-at-a-time via callback
- add exposeFunction per page in setupPage, then register 'addLink' callback for each pages' handler
- optimize addqueue: atomically check if already at max urls and if url already seen in one redis call
- better logging: log rejected promises for link extraction
  • Loading branch information
ikreymer committed Sep 14, 2023
1 parent afecec0 commit 247b9a9
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 209 deletions.
107 changes: 58 additions & 49 deletions crawler.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { collectAllFileSources } from "./util/file_reader.js";

import { Browser } from "./util/browser.js";

import { BEHAVIOR_LOG_FUNC, HTML_TYPES, DEFAULT_SELECTORS } from "./util/constants.js";
import { ADD_LINK_FUNC, BEHAVIOR_LOG_FUNC, HTML_TYPES, DEFAULT_SELECTORS } from "./util/constants.js";

import { AdBlockRules, BlockRules } from "./util/blockrules.js";
import { OriginOverride } from "./util/originoverride.js";
Expand Down Expand Up @@ -381,7 +381,7 @@ export class Crawler {
return seed.isIncluded(url, depth, extraHops, logDetails);
}

async setupPage({page, cdp, workerid}) {
async setupPage({page, cdp, workerid, callbacks}) {
await this.browser.setupPage({page, cdp});

if ((this.adBlockRules && this.params.blockAds) ||
Expand Down Expand Up @@ -419,6 +419,8 @@ export class Crawler {
await this.screencaster.screencastPage(page, cdp, workerid);
}

await page.exposeFunction(ADD_LINK_FUNC, (url) => callbacks.addLink && callbacks.addLink(url));

if (this.params.behaviorOpts) {
await page.exposeFunction(BEHAVIOR_LOG_FUNC, (logdata) => this._behaviorLog(logdata, page.url(), workerid));
await this.browser.addInitScript(page, behaviors);
Expand Down Expand Up @@ -459,7 +461,8 @@ self.__bx_behaviors.selectMainBehavior();
async crawlPage(opts) {
await this.writeStats();

const {page, cdp, data, workerid} = opts;
const {page, cdp, data, workerid, callbacks} = opts;
data.callbacks = callbacks;

const {url} = data;

Expand Down Expand Up @@ -1056,7 +1059,7 @@ self.__bx_behaviors.selectMainBehavior();
}

async loadPage(page, data, selectorOptsList = DEFAULT_SELECTORS) {
const {url, seedId, depth, extraHops = 0} = data;
const {url, seedId, depth} = data;

const logDetails = data.logDetails;

Expand Down Expand Up @@ -1186,12 +1189,9 @@ self.__bx_behaviors.selectMainBehavior();
return;
}

logger.debug("Extracting links");
logger.debug("Extracting links", logDetails);

for (const opts of selectorOptsList) {
const links = await this.extractLinks(page, data.filteredFrames, opts, logDetails);
await this.queueInScopeUrls(seedId, links, depth, extraHops, logDetails);
}
await this.extractLinks(page, data, selectorOptsList, logDetails);
}

async netIdle(page, details) {
Expand All @@ -1210,52 +1210,66 @@ self.__bx_behaviors.selectMainBehavior();
}
}

async extractLinks(page, frames, {selector = "a[href]", extract = "href", isAttribute = false} = {}, logDetails) {
const results = [];
async extractLinks(page, data, selectors = DEFAULT_SELECTORS, logDetails) {
const {seedId, depth, extraHops = 0, filteredFrames, callbacks} = data;

const loadProp = (options) => {
const { selector, extract } = options;
return [...document.querySelectorAll(selector)].map(elem => elem[extract]);
};
let links = [];
const promiseList = [];

const loadAttr = (options) => {
const { selector, extract } = options;
return [...document.querySelectorAll(selector)].map(elem => elem.getAttribute(extract));
callbacks.addLink = (url) => {
links.push(url);
if (links.length == 500) {
promiseList.push(this.queueInScopeUrls(seedId, links, depth, extraHops, logDetails));
links = [];
}
};

const loadFunc = isAttribute ? loadAttr : loadProp;
const loadLinks = (options) => {
const { selector, extract, isAttribute, addLinkFunc } = options;
const urls = new Set();

const getAttr = elem => urls.add(elem.getAttribute(extract));
const getProp = elem => urls.add(elem[extract]);

const getter = isAttribute ? getAttr : getProp;

frames = frames || page.frames();
document.querySelectorAll(selector).forEach(getter);

const func = window[addLinkFunc];
urls.forEach(url => func.call(this, url));

return true;
};

const frames = filteredFrames || page.frames();

try {
const linkResults = await Promise.allSettled(
frames.map(frame => timedRun(
frame.evaluate(loadFunc, {selector, extract}),
PAGE_OP_TIMEOUT_SECS,
"Link extraction timed out",
logDetails,
))
);
for (const {selector = "a[href]", extract = "href", isAttribute = false} of selectors) {
const promiseResults = await Promise.allSettled(
frames.map(frame => timedRun(
frame.evaluate(loadLinks, {selector, extract, isAttribute, addLinkFunc: ADD_LINK_FUNC}),
PAGE_OP_TIMEOUT_SECS,
"Link extraction timed out",
logDetails,
))
);

if (linkResults) {
let i = 0;
for (const linkResult of linkResults) {
if (!linkResult) {
logger.warn("Link Extraction timed out in frame", {frameUrl: frames[i].url, ...logDetails});
continue;
}
if (!linkResult.value) continue;
for (const link of linkResult.value) {
results.push(link);
for (let i = 0; i < promiseResults.length; i++) {
const {status, reason} = promiseResults[i];
if (status === "rejected") {
logger.warn("Link Extraction failed in frame", {reason, frameUrl: frames[i].url, ...logDetails});
}
i++;
}
}

} catch (e) {
logger.warn("Link Extraction failed", e);
}
return results;

if (links.length) {
promiseList.push(this.queueInScopeUrls(seedId, links, depth, extraHops, logDetails));
}

await Promise.allSettled(promiseList);
}

async queueInScopeUrls(seedId, urls, depth, extraHops = 0, logDetails = {}) {
Expand Down Expand Up @@ -1304,22 +1318,17 @@ self.__bx_behaviors.selectMainBehavior();
}

async queueUrl(seedId, url, depth, extraHops = 0) {
logger.debug(`Queuing url ${url}`);
if (this.limitHit) {
return false;
}

if (this.pageLimit > 0 && (await this.crawlState.numSeen() >= this.pageLimit)) {
if (!await this.crawlState.addToQueue({url, seedId, depth, extraHops}, this.pageLimit)) {
this.limitHit = true;
return false;
} else {
logger.debug(`Queued url ${url}`);
}

if (await this.crawlState.has(url)) {
return false;
}

//await this.crawlState.add(url);
await this.crawlState.addToQueue({url, seedId, depth, extraHops});
return true;
}

Expand Down
1 change: 1 addition & 0 deletions util/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class BaseBrowser
handleSIGHUP: signals,
handleSIGINT: signals,
handleSIGTERM: signals,
protocolTimeout: 0,

defaultViewport,
waitForInitialPage: false,
Expand Down
1 change: 1 addition & 0 deletions util/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
export const HTML_TYPES = ["text/html", "application/xhtml", "application/xhtml+xml"];
export const WAIT_UNTIL_OPTS = ["load", "domcontentloaded", "networkidle0", "networkidle2"];
export const BEHAVIOR_LOG_FUNC = "__bx_log";
export const ADD_LINK_FUNC = "__bx_addLink";
export const MAX_DEPTH = 1000000;

export const DEFAULT_SELECTORS = [{
Expand Down
14 changes: 11 additions & 3 deletions util/state.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,17 @@ export class RedisCrawlState
redis.defineCommand("addqueue", {
numberOfKeys: 3,
lua: `
redis.call('sadd', KEYS[3], ARGV[1]);
local size = redis.call('scard', KEYS[3]);
local limit = tonumber(ARGV[4]);
if limit > 0 and size >= limit then
return 0;
end
if redis.call('sadd', KEYS[3], ARGV[1]) == 0 then
return 0;
end
redis.call('zadd', KEYS[2], ARGV[2], ARGV[3]);
redis.call('hdel', KEYS[1], ARGV[1]);
return 1;
`
});

Expand Down Expand Up @@ -233,14 +241,14 @@ return 0;
return (res >= 3);
}

async addToQueue({url, seedId, depth = 0, extraHops = 0} = {}) {
async addToQueue({url, seedId, depth = 0, extraHops = 0} = {}, limit = 0) {
const added = this._timestamp();
const data = {added, url, seedId, depth};
if (extraHops) {
data.extraHops = extraHops;
}

await this.redis.addqueue(this.pkey, this.qkey, this.skey, url, this._getScore(data), JSON.stringify(data));
return await this.redis.addqueue(this.pkey, this.qkey, this.skey, url, this._getScore(data), JSON.stringify(data), limit);
}

async nextFromQueue() {
Expand Down
13 changes: 11 additions & 2 deletions util/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ export class PageWorker
this.reuseCount = 0;
this.page = null;
this.cdp = null;
this.callbacks = null;

this.opts = null;

Expand Down Expand Up @@ -115,7 +116,8 @@ export class PageWorker

this.page = page;
this.cdp = cdp;
this.opts = {page: this.page, cdp: this.cdp, workerid};
this.callbacks = {};
this.opts = {page: this.page, cdp: this.cdp, workerid, callbacks: this.callbacks};

// updated per page crawl
this.crashed = false;
Expand Down Expand Up @@ -196,6 +198,8 @@ export class PageWorker
async runLoop() {
const crawlState = this.crawler.crawlState;

let loggedWaiting = false;

while (await this.crawler.isCrawlRunning()) {
const data = await crawlState.nextFromQueue();

Expand All @@ -207,6 +211,8 @@ export class PageWorker
// run timed crawl of page
await this.timedCrawlPage({...opts, data});

loggedWaiting = false;

} else {
// indicate that the worker has no more work (mostly for screencasting, status, etc...)
// depending on other works, will either get more work or crawl will end
Expand All @@ -217,7 +223,10 @@ export class PageWorker

// if pending, sleep and check again
if (pending) {
logger.debug("No crawl tasks, but pending tasks remain, waiting", {pending, workerid: this.id}, "worker");
if (!loggedWaiting) {
logger.debug("No crawl tasks, but pending tasks remain, waiting", {pending, workerid: this.id}, "worker");
loggedWaiting = true;
}
await sleep(0.5);
} else {
// if no pending and queue size is still empty, we're done!
Expand Down
Loading

0 comments on commit 247b9a9

Please sign in to comment.