Skip to content

Commit

Permalink
refactor: Refactor Monitor class to improve code readability and main…
Browse files Browse the repository at this point in the history
…tainability
  • Loading branch information
ImBIOS committed Oct 2, 2024
1 parent 47e294f commit d4ae373
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 31 deletions.
16 changes: 9 additions & 7 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { dirname } from 'node:path';

import type { Log } from '@apify/log';
import defaultLog, { LogLevel } from '@apify/log';
import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout';
import { TimeoutError, addTimeoutToPromise, tryCancel } from '@apify/timeout';
import { cryptoRandomObjectId } from '@apify/utilities';
import type {
AddRequestsBatchedOptions,
Expand All @@ -25,22 +25,19 @@ import type {
Session,
SessionPoolOptions,
Source,
StatisticsOptions,
StatisticState,
StatisticsOptions,
} from '@crawlee/core';
import {
AutoscaledPool,
Configuration,
CriticalError,
Dataset,
enqueueLinks,
EnqueueStrategy,
EventType,
KeyValueStore,
mergeCookies,
Monitor,
NonRetryableError,
purgeDefaultStorages,
RequestProvider,
RequestQueue,
RequestQueueV1,
Expand All @@ -50,10 +47,13 @@ import {
SessionError,
SessionPool,
Statistics,
enqueueLinks,
mergeCookies,
purgeDefaultStorages,
validators,
} from '@crawlee/core';
import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types';
import { gotScraping, ROTATE_PROXY_ERRORS } from '@crawlee/utils';
import { ROTATE_PROXY_ERRORS, gotScraping } from '@crawlee/utils';
import { stringify } from 'csv-stringify/sync';
import { ensureDir, writeFile, writeJSON } from 'fs-extra';
// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types, so its alllll gooooood
Expand Down Expand Up @@ -550,6 +550,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
experiments: ow.optional.object,

statisticsOptions: ow.optional.object,
monitor: ow.optional.boolean,
};

/**
Expand Down Expand Up @@ -600,6 +601,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
statusMessageCallback,

statisticsOptions,
monitor,
} = options;

this.requestList = requestList;
Expand Down Expand Up @@ -916,7 +918,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.events.on(EventType.MIGRATING, boundPauseOnMigration);
this.events.on(EventType.ABORTING, boundPauseOnMigration);

const monitor = this.monitor ? new Monitor(this.stats, this.log) : null;
const monitor = this.monitor ? new Monitor(this.stats, this.autoscaledPool, this.requestQueue) : null;
monitor?.start();

try {
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export * from './session_pool';
export * from './storages';
export * from './validators';
export * from './cookie_utils';
export * from './monitor';
export { PseudoUrl } from '@apify/pseudo_url';
export { Dictionary, Awaitable, Constructor, StorageClient, Cookie, QueueOperationInfo } from '@crawlee/types';
133 changes: 109 additions & 24 deletions packages/core/src/monitor.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import os from 'os';

import type { Statistics } from './crawlers/statistics';
import type { Log } from './log';
import { log as defaultLog } from './log';
import type { AutoscaledPool, RequestProvider, Statistics } from '.';

export class Monitor {
private log: Log;
private statistics: Statistics;
private autoscaledPool: AutoscaledPool | undefined;
private requestQueue: RequestProvider | undefined;

private intervalId: NodeJS.Timeout | null = null;
private monitorDisplay: MonitorDisplay | null = null;

constructor(statistics: Statistics, log: Log = defaultLog) {
constructor(
statistics: Statistics,
autoscaledPool: AutoscaledPool | undefined,
requestQueue: RequestProvider | undefined,
) {
this.statistics = statistics;
this.log = log.child({ prefix: 'Monitor' });
this.autoscaledPool = autoscaledPool;
this.requestQueue = requestQueue;
}

start(interval: number = 5000) {
this.intervalId = setInterval(() => {
this.display();
start(interval: number = 500) {
if (!this.monitorDisplay) {
this.monitorDisplay = new MonitorDisplay();
}

this.intervalId = setInterval(async () => {
await this.display();
}, interval);
}

Expand All @@ -27,31 +37,106 @@ export class Monitor {
}
}

private display() {
private async display() {
const stats = this.statistics.calculate();
const now = new Date();
const startTime = this.statistics.state.crawlerStartedAt;
const elapsedTime = now.getTime() - new Date(startTime!).getTime();
const cpuLoad = os.loadavg()[0];
const memLoad = (os.totalmem() - os.freemem()) / os.totalmem();
const { requestsFinished } = this.statistics.state;
const assumedTotalCount = this.requestQueue?.assumedTotalCount ?? 0;

if (!this.monitorDisplay) {
throw new Error('Start the monitor first');
}

this.monitorDisplay.log(`Start: ${startTime ? formatDateTime(new Date(startTime)) : undefined}`);
this.monitorDisplay.log(`Now: ${formatDateTime(now)} (running for ${elapsedTime / 1000}s)`);
this.monitorDisplay.log(
`Progress: ${requestsFinished} / ${assumedTotalCount} (${((requestsFinished / assumedTotalCount) * 100).toFixed(2)}%), failed: ${this.statistics.state.requestsFailed} (${((this.statistics.state.requestsFailed / assumedTotalCount) * 100).toFixed(2)}%)`,
);
this.monitorDisplay.log(
`Remaining: ${this.estimateRemainingTime(stats)} seconds (${(stats.requestsFinishedPerMinute / 60).toFixed(2)} pages/seconds)`,
);
this.monitorDisplay.log(`Sys. load: ${cpuLoad.toFixed(2)}% CPU / ${(memLoad * 100).toFixed(2)}% Memory`);
this.monitorDisplay.log(
`Concurrencies: Current ${this.autoscaledPool?.currentConcurrency}, Desired ${this.autoscaledPool?.desiredConcurrency}`,
);

// TODO: Add list of URLs that are currently being processed

this.log.info(`
Start: ${startTime}
Now: ${now} (running for ${elapsedTime / 1000}s)
Progress: ${this.statistics.state.requestsFinished} / ${stats.requestsTotal} (${
(this.statistics.state.requestsFinished / stats.requestsTotal) * 100
}%), failed: ${this.statistics.state.requestsFailed} (${
(this.statistics.state.requestsFailed / stats.requestsTotal) * 100
}%)
Remaining: ${this.estimateRemainingTime(stats)} (${stats.requestsFinishedPerMinute} req/min)
Sys. load: ${cpuLoad.toFixed(2)} / ${(memLoad * 100).toFixed(2)}%
Concurrencies: ${this.statistics.state.requestsRetries}
`);
this.monitorDisplay.resetCursor();
}

private estimateRemainingTime(stats: ReturnType<Statistics['calculate']>) {
const remainingRequests = stats.requestsTotal - this.statistics.state.requestsFinished;
const na = 'N/A';
if (!this.requestQueue) {
return na;
}

const remainingRequests = this.requestQueue.assumedTotalCount - this.statistics.state.requestsFinished;
const avgDuration = stats.requestAvgFinishedDurationMillis;
return (remainingRequests * avgDuration) / 1000;
const remainingTime = (remainingRequests * avgDuration) / 1000;
const safeRemainingTime = Number.isFinite(remainingTime) ? remainingTime.toFixed(2) : na;
return safeRemainingTime;
}
}

const CLEAR_LINE = '\x1B[K';

class MonitorDisplay {
private lastLinesCount: number = 0;
private linesCount: number = 0;

public log(str: string): void {
// We create an empty line at the start so that any console.log calls
// from within the script are above our output.
if (this.linesCount === 0) {
// eslint-disable-next-line no-console
console.log(CLEAR_LINE); // erases the current line
this.linesCount += 1;
}

// Strip lines that are too long
// const strToLog = str.substring(0, 78);
const strToLog = str;
// eslint-disable-next-line no-console
console.log(`${CLEAR_LINE}${strToLog}`);
this.linesCount += 1;
}

public resetCursor(): void {
// move cursor up to draw over out output
process.stdout.write(`\x1B[${this.linesCount}A`);
this.lastLinesCount = this.linesCount;
this.linesCount = 0;
}

public close(): void {
// move cursor down so that console output stays
process.stdout.write(`\x1B[${this.lastLinesCount}B`);
}
}

function formatDateTime(datetime: Date | number): string {
const date = typeof datetime === 'number' ? new Date(datetime) : datetime;

const dateStr = `${date.getFullYear()}-${padDate(date.getMonth() + 1, 2)}-${padDate(date.getDate(), 2)}`;
const timeStr =
`${padDate(date.getHours(), 2)}` +
`:${padDate(date.getMinutes(), 2)}` +
`:${padDate(date.getSeconds(), 2)}` +
`.${padDate(date.getMilliseconds(), 3)}`;

return `${dateStr} ${timeStr}`;
}

function padDate(value: number | string, num: number): string {
const str = value.toString();
if (str.length >= num) {
return str;
}
const zeroesToAdd = num - str.length;
return '0'.repeat(zeroesToAdd) + str;
}

0 comments on commit d4ae373

Please sign in to comment.