Skip to content
60 changes: 43 additions & 17 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,61 +2,62 @@ import { dirname } from 'node:path';

import type { Log } from '@apify/log';
import defaultLog, { LogLevel } from '@apify/log';
import { addTimeoutToPromise, TimeoutError, tryCancel } from '@apify/timeout';
import { TimeoutError, addTimeoutToPromise, tryCancel } from '@apify/timeout';
import { cryptoRandomObjectId } from '@apify/utilities';
import type {
AddRequestsBatchedOptions,
AddRequestsBatchedResult,
AutoscaledPoolOptions,
CrawlingContext,
DatasetExportOptions,
EnqueueLinksOptions,
EventManager,
DatasetExportOptions,
FinalStatistics,
GetUserDataFromRequest,
IRequestList,
LoadedContext,
ProxyInfo,
Request,
RequestOptions,
RestrictedCrawlingContext,
RouterHandler,
RouterRoutes,
Session,
SessionPoolOptions,
Source,
StatisticState,
StatisticsOptions,
LoadedContext,
RestrictedCrawlingContext,
} from '@crawlee/core';
import {
AutoscaledPool,
Configuration,
CriticalError,
Dataset,
enqueueLinks,
EnqueueStrategy,
EventType,
KeyValueStore,
mergeCookies,
Monitor,
NonRetryableError,
purgeDefaultStorages,
RequestProvider,
RequestQueueV1,
RequestQueue,
RequestQueueV1,
RequestState,
RetryRequestError,
Router,
SessionError,
SessionPool,
Statistics,
enqueueLinks,
mergeCookies,
purgeDefaultStorages,
validators,
} from '@crawlee/core';
import type { Awaitable, BatchAddRequestsResult, Dictionary, SetStatusMessageOptions } from '@crawlee/types';
import { ROTATE_PROXY_ERRORS, gotScraping } from '@crawlee/utils';
import { stringify } from 'csv-stringify/sync';
import { ensureDir, writeFile, writeJSON } from 'fs-extra';
// @ts-expect-error This throws a compilation error due to got-scraping being ESM only but we only import types, so its alllll gooooood
import type { OptionsInit, Method } from 'got-scraping';
import type { Method, OptionsInit } from 'got-scraping';
import ow, { ArgumentError } from 'ow';
import { getDomain } from 'tldts';
import type { SetRequired } from 'type-fest';
Expand Down Expand Up @@ -351,6 +352,12 @@ export interface BasicCrawlerOptions<Context extends CrawlingContext = BasicCraw
* whether to output them to the Key-Value store.
*/
statisticsOptions?: StatisticsOptions;

/**
* Track and display time estimation and concurrency status in the CLI output at regular intervals.
* @default false
*/
monitor?: boolean;
}

/**
Expand Down Expand Up @@ -499,6 +506,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
protected retryOnBlocked: boolean;
private _closeEvents?: boolean;

private monitor?: boolean;
private experiments: CrawlerExperiments;
private _experimentWarnings: Partial<Record<keyof CrawlerExperiments, boolean>> = {};

Expand Down Expand Up @@ -542,6 +550,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
experiments: ow.optional.object,

statisticsOptions: ow.optional.object,
monitor: ow.optional.boolean,
};

/**
Expand Down Expand Up @@ -592,6 +601,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
statusMessageCallback,

statisticsOptions,
monitor,
} = options;

this.requestList = requestList;
Expand All @@ -601,6 +611,7 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.statusMessageCallback = statusMessageCallback as StatusMessageCallback;
this.events = config.getEventManager();
this.domainAccessedTime = new Map();
this.monitor = monitor;
this.experiments = experiments;

this._handlePropertyNameChange({
Expand Down Expand Up @@ -754,7 +765,10 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
log,
};

this.autoscaledPoolOptions = { ...autoscaledPoolOptions, ...basicCrawlerAutoscaledPoolConfiguration };
this.autoscaledPoolOptions = {
...autoscaledPoolOptions,
...basicCrawlerAutoscaledPoolConfiguration,
};
}

/**
Expand Down Expand Up @@ -904,11 +918,15 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
this.events.on(EventType.MIGRATING, boundPauseOnMigration);
this.events.on(EventType.ABORTING, boundPauseOnMigration);

const monitor = this.monitor ? new Monitor(this.stats, this.autoscaledPool, this.requestQueue) : null;
monitor?.start();

try {
await this.autoscaledPool!.run();
} finally {
await this.teardown();
await this.stats.stopCapturing();
monitor?.stop();

process.off('SIGINT', sigintHandler);
this.events.off(EventType.MIGRATING, boundPauseOnMigration);
Expand Down Expand Up @@ -949,9 +967,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext

periodicLogger.stop();
await this.setStatusMessage(
`Finished! Total ${this.stats.state.requestsFinished + this.stats.state.requestsFailed} requests: ${
this.stats.state.requestsFinished
} succeeded, ${this.stats.state.requestsFailed} failed.`,
`Finished! Total ${
this.stats.state.requestsFinished + this.stats.state.requestsFailed
} requests: ${this.stats.state.requestsFinished} succeeded, ${this.stats.state.requestsFailed} failed.`,
{ isStatusMessageTerminal: true, level: 'INFO' },
);
this.running = false;
Expand Down Expand Up @@ -1202,7 +1220,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
source['inProgress'].add(request.id!);
}

await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront });
await source.reclaimRequest(request, {
forefront: request.userData?.__crawlee?.forefront,
});
}, delay);

return true;
Expand Down Expand Up @@ -1466,7 +1486,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
retryCount,
});

await source.reclaimRequest(request, { forefront: request.userData?.__crawlee?.forefront });
await source.reclaimRequest(request, {
forefront: request.userData?.__crawlee?.forefront,
});
return;
}
}
Expand Down Expand Up @@ -1495,7 +1517,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
try {
return (await cb()) as T;
} catch (e: any) {
Object.defineProperty(e, 'triggeredFromUserHandler', { value: true });
Object.defineProperty(e, 'triggeredFromUserHandler', {
value: true,
});
throw e;
}
}
Expand Down Expand Up @@ -1704,7 +1728,9 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
return baseUrl.hostname === loadedBaseUrl.hostname;
}
case EnqueueStrategy.SameDomain: {
const baseUrlHostname = getDomain(baseUrl.hostname, { mixedInputs: false });
const baseUrlHostname = getDomain(baseUrl.hostname, {
mixedInputs: false,
});

if (baseUrlHostname) {
const loadedBaseUrlHostname = getDomain(loadedBaseUrl.hostname, { mixedInputs: false });
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ export * from './session_pool';
export * from './storages';
export * from './validators';
export * from './cookie_utils';
export * from './monitor';
export { PseudoUrl } from '@apify/pseudo_url';
export { Dictionary, Awaitable, Constructor, StorageClient, Cookie, QueueOperationInfo } from '@crawlee/types';
142 changes: 142 additions & 0 deletions packages/core/src/monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import os from 'os';

import type { AutoscaledPool, RequestProvider, Statistics } from '.';

export class Monitor {
private statistics: Statistics;
private autoscaledPool: AutoscaledPool | undefined;
private requestQueue: RequestProvider | undefined;

private intervalId: NodeJS.Timeout | null = null;
private monitorDisplay: MonitorDisplay | null = null;

constructor(
statistics: Statistics,
autoscaledPool: AutoscaledPool | undefined,
requestQueue: RequestProvider | undefined,
) {
this.statistics = statistics;
this.autoscaledPool = autoscaledPool;
this.requestQueue = requestQueue;
}

start(interval: number = 500) {
if (!this.monitorDisplay) {
this.monitorDisplay = new MonitorDisplay();
}

this.intervalId = setInterval(async () => {
await this.display();
}, interval);
}

stop() {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}

private async display() {
const stats = this.statistics.calculate();
const now = new Date();
const startTime = this.statistics.state.crawlerStartedAt;
const elapsedTime = now.getTime() - new Date(startTime!).getTime();
const cpuLoad = os.loadavg()[0];
const memLoad = (os.totalmem() - os.freemem()) / os.totalmem();
const { requestsFinished } = this.statistics.state;
const assumedTotalCount = this.requestQueue?.assumedTotalCount ?? 0;

if (!this.monitorDisplay) {
throw new Error('Start the monitor first');
}

this.monitorDisplay.log(`Start: ${startTime ? formatDateTime(new Date(startTime)) : undefined}`);
this.monitorDisplay.log(`Now: ${formatDateTime(now)} (running for ${elapsedTime / 1000}s)`);
this.monitorDisplay.log(
`Progress: ${requestsFinished} / ${assumedTotalCount} (${((requestsFinished / assumedTotalCount) * 100).toFixed(2)}%), failed: ${this.statistics.state.requestsFailed} (${((this.statistics.state.requestsFailed / assumedTotalCount) * 100).toFixed(2)}%)`,
);
this.monitorDisplay.log(
`Remaining: ${this.estimateRemainingTime(stats)} seconds (${(stats.requestsFinishedPerMinute / 60).toFixed(2)} pages/seconds)`,
);
this.monitorDisplay.log(`Sys. load: ${cpuLoad.toFixed(2)}% CPU / ${(memLoad * 100).toFixed(2)}% Memory`);
this.monitorDisplay.log(
`Concurrencies: Current ${this.autoscaledPool?.currentConcurrency}, Desired ${this.autoscaledPool?.desiredConcurrency}`,
);

// TODO: Add list of URLs that are currently being processed

this.monitorDisplay.resetCursor();
}

private estimateRemainingTime(stats: ReturnType<Statistics['calculate']>) {
const na = 'N/A';
if (!this.requestQueue) {
return na;
}

const remainingRequests = this.requestQueue.assumedTotalCount - this.statistics.state.requestsFinished;
const avgDuration = stats.requestAvgFinishedDurationMillis;
const remainingTime = (remainingRequests * avgDuration) / 1000;
const safeRemainingTime = Number.isFinite(remainingTime) ? remainingTime.toFixed(2) : na;
return safeRemainingTime;
}
}

const CLEAR_LINE = '\x1B[K';

class MonitorDisplay {
private lastLinesCount: number = 0;
private linesCount: number = 0;

public log(str: string): void {
// We create an empty line at the start so that any console.log calls
// from within the script are above our output.
if (this.linesCount === 0) {
// eslint-disable-next-line no-console
console.log(CLEAR_LINE); // erases the current line
this.linesCount += 1;
}

// Strip lines that are too long
// const strToLog = str.substring(0, 78);
const strToLog = str;
// eslint-disable-next-line no-console
console.log(`${CLEAR_LINE}${strToLog}`);
this.linesCount += 1;
}

public resetCursor(): void {
// move cursor up to draw over out output
process.stdout.write(`\x1B[${this.linesCount}A`);
this.lastLinesCount = this.linesCount;
this.linesCount = 0;
}

public close(): void {
// move cursor down so that console output stays
process.stdout.write(`\x1B[${this.lastLinesCount}B`);
}
}

function formatDateTime(datetime: Date | number): string {
const date = typeof datetime === 'number' ? new Date(datetime) : datetime;

const dateStr = `${date.getFullYear()}-${padDate(date.getMonth() + 1, 2)}-${padDate(date.getDate(), 2)}`;
const timeStr =
`${padDate(date.getHours(), 2)}` +
`:${padDate(date.getMinutes(), 2)}` +
`:${padDate(date.getSeconds(), 2)}` +
`.${padDate(date.getMilliseconds(), 3)}`;

return `${dateStr} ${timeStr}`;
}

function padDate(value: number | string, num: number): string {
const str = value.toString();
if (str.length >= num) {
return str;
}
const zeroesToAdd = num - str.length;
return '0'.repeat(zeroesToAdd) + str;
}
Loading