Skip to content

Commit 20bd5de

Browse files
committed
refactor(jobs): expose a single entry point
1 parent 1acd535 commit 20bd5de

40 files changed

+873
-813
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ DIRECT_URL=postgresql://blobscan:s3cr3t@localhost:5432/blobscan_dev?schema=publi
1111

1212
BLOBSCAN_WEB_TAG=next
1313
BLOBSCAN_API_TAG=next
14+
BLOBSCAN_JOBS_TAG=next
1415
INDEXER_TAG=master
1516

1617
### blobscan website

Dockerfile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,11 @@ WORKDIR /app
145145

146146
ENV NODE_ENV=production
147147

148+
COPY --from=jobs-builder /app/node_modules/.prisma ./node_modules/.prisma
148149
COPY --from=jobs-builder /app/node_modules/prisma ./node_modules/prisma
149150
COPY --from=jobs-builder /app/node_modules/@prisma ./node_modules/@prisma
150151
COPY --from=jobs-builder /app/apps/jobs/dist ./
151152

152153
ADD docker-entrypoint.sh /
153-
ENTRYPOINT ["/docker-entrypoint.sh"]
154+
ENTRYPOINT ["/docker-entrypoint.sh"]
155+
CMD ["jobs"]

apps/jobs/esbuild.config.mjs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,17 @@
11
// esbuild.config.js
22
import * as esbuild from "esbuild";
3-
import glob from "fast-glob";
43
import fs from "fs";
54

6-
const entryPoints = await glob("src/**/worker.ts");
7-
85
const result = await esbuild.build({
9-
entryPoints,
6+
entryPoints: ["src/index.ts"],
107
outdir: "dist",
118
outbase: "src",
129
platform: "node",
1310
target: "node20",
1411
format: "cjs",
1512
metafile: !!process.env.BUILD_METADATA_ENABLED,
1613
bundle: true,
17-
sourcemap: true,
18-
treeShaking: true,
19-
external: ["ioredis", "prisma", "@prisma/client"],
14+
external: [".prisma", "prisma", "@prisma/client"],
2015
});
2116

2217
if (process.env.BUILD_METADATA_ENABLED) {

apps/jobs/package.json

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,10 @@
66
"build": "node esbuild.config.mjs",
77
"build:metadata": "BUILD_METADATA_ENABLED=true pnpm build",
88
"clean": "git clean -xdf node_modules",
9-
"dev": "concurrently -n eth-price,stats,swarm-stamp -c green,blue,magenta \"pnpm dev:eth-price\" \"pnpm dev:stats\" \"pnpm dev:swarm-stamp\"",
10-
"dev:eth-price": "pnpm with-env tsx src/eth-price/worker.ts",
11-
"dev:stats": "pnpm with-env tsx src/stats/worker.ts",
12-
"dev:swarm-stamp": "pnpm with-env tsx src/swarm-stamp/worker.ts",
9+
"dev": "pnpm with-env tsx src/index.ts",
1310
"lint": "eslint .",
1411
"lint:fix": "pnpm lint --fix",
15-
"start": "concurrently -n eth-price,stats,swarm-stamp -c green,blue,magenta \"pnpm start:eth-price\" \"pnpm start:stats\" \"pnpm start:swarm-stamp\"",
16-
"start:eth-price": "pnpm with-env node dist/eth-price/worker.js",
17-
"start:stats": "pnpm with-env node dist/stats/worker.js",
18-
"start:swarm-stamp": "pnpm with-env node dist/swarm-stamp/worker.js",
12+
"start": "pnpm with-env node dist/index.js",
1913
"test": "pnpm with-env:test vitest",
2014
"test:ui": "pnpm with-env:test vitest --ui",
2115
"with-env:test": "dotenv -e ../../.env.test --",

apps/jobs/src/BaseCronJob.ts renamed to apps/jobs/src/cron-jobs/BaseCronJob.ts

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
/* eslint-disable @typescript-eslint/no-misused-promises */
2+
import type { Processor } from "bullmq";
23
import { Queue, Worker } from "bullmq";
34
import type { Redis } from "ioredis";
45

56
import { createModuleLogger } from "@blobscan/logger";
67
import type { Logger } from "@blobscan/logger";
78

8-
import { ErrorException } from "./errors";
9-
import { createRedis } from "./redis";
9+
import { ErrorException } from "../errors";
10+
import { createRedis } from "../redis";
1011

1112
export interface CommonCronJobConfig {
1213
redisUriOrConnection: Redis | string;
@@ -15,11 +16,12 @@ export interface CommonCronJobConfig {
1516

1617
export interface BaseCronJobConfig extends CommonCronJobConfig {
1718
name: string;
18-
jobFn: () => Promise<void>;
19+
processor: Processor;
20+
jobData?: Record<string, unknown>;
1921
}
2022

2123
export class CronJobError extends ErrorException {
22-
constructor(cronJobName: string, message: string, cause: unknown) {
24+
constructor(cronJobName: string, message: string, cause?: unknown) {
2325
super(`Cron job "${cronJobName}" failed: ${message}`, cause);
2426
}
2527
}
@@ -28,22 +30,25 @@ export class BaseCronJob {
2830
name: string;
2931
cronPattern: string;
3032

31-
protected jobFn: () => Promise<void>;
3233
protected logger: Logger;
3334

3435
protected connection: Redis;
35-
protected worker: Worker | undefined;
36-
protected queue: Queue | undefined;
36+
protected worker?: Worker;
37+
protected queue?: Queue;
38+
39+
protected jobData?: Record<string, unknown>;
3740

3841
constructor({
3942
name,
4043
cronPattern,
4144
redisUriOrConnection,
42-
jobFn,
45+
processor: processorFile,
46+
jobData,
4347
}: BaseCronJobConfig) {
4448
this.name = `${name}-cron-job`;
4549
this.cronPattern = cronPattern;
4650
this.logger = createModuleLogger(this.name);
51+
this.jobData = jobData;
4752

4853
let connection: Redis;
4954

@@ -57,7 +62,7 @@ export class BaseCronJob {
5762
connection,
5863
});
5964

60-
this.worker = new Worker(this.queue.name, jobFn, {
65+
this.worker = new Worker(this.queue.name, processorFile, {
6166
connection,
6267
});
6368

@@ -70,13 +75,12 @@ export class BaseCronJob {
7075
});
7176

7277
this.connection = connection;
73-
this.jobFn = jobFn;
7478
}
7579

7680
async start() {
7781
try {
7882
const jobName = `${this.name}-job`;
79-
const repeatableJob = await this.queue?.add(jobName, null, {
83+
const repeatableJob = await this.queue?.add(jobName, this.jobData, {
8084
repeat: {
8185
pattern: this.cronPattern,
8286
},
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import { formatDate } from "../../utils";
2+
import { BaseCronJob } from "../BaseCronJob";
3+
import type { CommonCronJobConfig } from "../BaseCronJob";
4+
import dailyStats from "./processor";
5+
import type { DailyStatsJobResult } from "./types";
6+
7+
export type DailyStatsCronJobConfig = CommonCronJobConfig;
8+
9+
export class DailyStatsCronJob extends BaseCronJob {
10+
constructor({ redisUriOrConnection, cronPattern }: DailyStatsCronJobConfig) {
11+
const name = "daily-stats";
12+
13+
super({
14+
name,
15+
redisUriOrConnection,
16+
cronPattern,
17+
processor: dailyStats,
18+
});
19+
20+
this.worker?.on("completed", (_, result) => {
21+
const result_ = result as DailyStatsJobResult;
22+
23+
if (!result_) {
24+
this.logger.info(
25+
"Daily stats aggregation skipped: no blocks indexed yet"
26+
);
27+
28+
return;
29+
}
30+
31+
const { fromDate, toDate, totalAggregationsCreated } = result_;
32+
33+
if (fromDate === toDate) {
34+
this.logger.info(`Daily stats aggregation skipped: already up to date`);
35+
36+
return;
37+
}
38+
39+
this.logger.info(
40+
`Daily data up to day ${formatDate(
41+
toDate
42+
)} aggregated. ${totalAggregationsCreated} daily stats created successfully.`
43+
);
44+
});
45+
}
46+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import { normalizeDate, toDailyDate } from "@blobscan/dayjs";
2+
import { prisma } from "@blobscan/db";
3+
4+
import type { DailyStatsJobResult } from "./types";
5+
6+
export default async (): Promise<DailyStatsJobResult> => {
7+
const lastIndexedBlock = await prisma.block.findLatest();
8+
9+
if (!lastIndexedBlock) {
10+
return;
11+
}
12+
const targetDate = normalizeDate(lastIndexedBlock.timestamp).subtract(
13+
1,
14+
"day"
15+
);
16+
const targetDay = toDailyDate(targetDate);
17+
18+
const rawLastDailyStatsDay = await prisma.dailyStats.findFirst({
19+
select: { day: true },
20+
where: { category: null, rollup: null },
21+
orderBy: { day: "desc" },
22+
});
23+
const lastDailyStatsDay = rawLastDailyStatsDay?.day
24+
? normalizeDate(rawLastDailyStatsDay.day)
25+
: undefined;
26+
27+
if (lastDailyStatsDay && lastDailyStatsDay.isSame(targetDay, "day")) {
28+
return {
29+
fromDate: lastDailyStatsDay.utc().toISOString(),
30+
toDate: targetDay.utc().toISOString(),
31+
totalAggregationsCreated: 0,
32+
};
33+
}
34+
35+
const [blobDailyStats] = await prisma.dailyStats.aggregate({
36+
from: lastDailyStatsDay?.add(1, "day"),
37+
to: targetDay,
38+
});
39+
40+
return {
41+
fromDate: lastDailyStatsDay?.utc().toISOString(),
42+
toDate: targetDate.utc().toISOString(),
43+
totalAggregationsCreated: blobDailyStats.length,
44+
};
45+
};
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import type { SandboxedJob } from "bullmq";
2+
3+
export type DailyStatsJobResult =
4+
| {
5+
fromDate?: string;
6+
toDate: string;
7+
totalAggregationsCreated: number;
8+
}
9+
| undefined;
10+
11+
export type DailyStatsSanboxedJob = SandboxedJob<
12+
undefined,
13+
DailyStatsJobResult
14+
>;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import type { PriceFeed } from "@blobscan/price-feed";
2+
3+
import type { CommonCronJobConfig } from "../BaseCronJob";
4+
import { BaseCronJob } from "../BaseCronJob";
5+
import ethPrice from "./processor";
6+
7+
export interface EthPriceCronJobConfig extends CommonCronJobConfig {
8+
ethUsdPriceFeed: PriceFeed;
9+
}
10+
11+
export class EthPriceCronJob extends BaseCronJob {
12+
constructor({ ...restConfig }: CommonCronJobConfig) {
13+
super({
14+
...restConfig,
15+
name: "eth-price",
16+
processor: ethPrice,
17+
});
18+
19+
this.worker?.on("completed", (_, result?) => {
20+
const { price, timestamp, roundId } = result as {
21+
price?: number;
22+
timestamp: string;
23+
roundId?: string;
24+
};
25+
26+
if (!price && !roundId) {
27+
this.logger.warn(
28+
`Skipping eth price update: No price data found for datetime ${timestamp}`
29+
);
30+
31+
return;
32+
}
33+
34+
this.logger.info(
35+
`ETH price indexed: $${price} at ${timestamp} recorded (retrieved from round ${roundId})`
36+
);
37+
});
38+
}
39+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import * as chains from "viem/chains";
2+
3+
import { PriceFeed } from "@blobscan/price-feed";
4+
5+
import { env } from "../../env";
6+
import { client } from "./viem";
7+
8+
let priceFeed: PriceFeed | undefined;
9+
10+
export async function getEthUsdPriceFeed() {
11+
if (!priceFeed) {
12+
const chain = Object.values(chains).find(
13+
(c) => c.id === env.ETH_PRICE_SYNCER_CHAIN_ID
14+
);
15+
16+
if (!chain) {
17+
throw new Error(
18+
`Can't initialize ETH price syncer: chain with id ${env.ETH_PRICE_SYNCER_CHAIN_ID} not found`
19+
);
20+
}
21+
22+
priceFeed = await PriceFeed.create({
23+
client,
24+
dataFeedContractAddress:
25+
env.ETH_PRICE_SYNCER_ETH_USD_PRICE_FEED_CONTRACT_ADDRESS as `0x${string}`,
26+
timeTolerance: env.ETH_PRICE_SYNCER_TIME_TOLERANCE,
27+
});
28+
}
29+
30+
return priceFeed;
31+
}

0 commit comments

Comments
 (0)