Skip to content

Commit a93dd53

Browse files
authored
fix(core): Handle task runner accept timeout error (#14709)
1 parent 5aa6054 commit a93dd53

File tree

4 files changed

+65
-7
lines changed

4 files changed

+65
-7
lines changed

packages/cli/src/task-runners/task-broker/__tests__/task-broker.service.test.ts

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import type { TaskRunnersConfig } from '@n8n/config';
22
import type { RunnerMessage, TaskResultData } from '@n8n/task-runner';
33
import { mock } from 'jest-mock-extended';
4+
import type { Logger } from 'n8n-core';
45
import { ApplicationError, type INodeTypeBaseDescription } from 'n8n-workflow';
56

67
import { Time } from '@/constants';
78
import type { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
89

910
import { TaskRejectError } from '../errors/task-reject.error';
10-
import { TaskRunnerTimeoutError } from '../errors/task-runner-timeout.error';
11+
import { TaskRunnerExecutionTimeoutError } from '../errors/task-runner-execution-timeout.error';
1112
import { TaskBroker } from '../task-broker.service';
1213
import type { TaskOffer, TaskRequest, TaskRunner } from '../task-broker.service';
1314

@@ -715,7 +716,7 @@ describe('TaskBroker', () => {
715716
});
716717
});
717718

718-
describe('task timeouts', () => {
719+
describe('task execution timeouts', () => {
719720
let taskBroker: TaskBroker;
720721
let config: TaskRunnersConfig;
721722
let runnerLifecycleEvents = mock<TaskRunnerLifecycleEvents>();
@@ -879,11 +880,54 @@ describe('TaskBroker', () => {
879880
expect(requesterCallback).toHaveBeenCalledWith({
880881
type: 'broker:taskerror',
881882
taskId,
882-
error: expect.any(TaskRunnerTimeoutError),
883+
error: expect.any(TaskRunnerExecutionTimeoutError),
883884
});
884885

885886
expect(clearTimeout).toHaveBeenCalled();
886887
expect(taskBroker.getTasks().get(taskId)).toBeUndefined();
887888
});
888889
});
890+
891+
describe('task runner accept timeout', () => {
892+
it('broker should handle timeout when waiting for acknowledgment of offer accept', async () => {
893+
const runnerId = 'runner1';
894+
const runner = mock<TaskRunner>({ id: runnerId });
895+
const messageCallback = jest.fn();
896+
const loggerMock = mock<Logger>();
897+
898+
taskBroker = new TaskBroker(loggerMock, mock(), mock());
899+
taskBroker.registerRunner(runner, messageCallback);
900+
901+
const offer: TaskOffer = {
902+
offerId: 'offer1',
903+
runnerId,
904+
taskType: 'taskType1',
905+
validFor: 1000,
906+
validUntil: createValidUntil(1000),
907+
};
908+
909+
const request: TaskRequest = {
910+
requestId: 'request1',
911+
requesterId: 'requester1',
912+
taskType: 'taskType1',
913+
};
914+
915+
jest.useFakeTimers();
916+
917+
const acceptPromise = taskBroker.acceptOffer(offer, request);
918+
919+
jest.advanceTimersByTime(2100);
920+
921+
await acceptPromise;
922+
923+
expect(request.acceptInProgress).toBe(false);
924+
expect(loggerMock.warn).toHaveBeenCalledWith(
925+
expect.stringContaining(
926+
`Runner (${runnerId}) took too long to acknowledge acceptance of task`,
927+
),
928+
);
929+
930+
jest.useRealTimers();
931+
});
932+
});
889933
});
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { OperationalError } from 'n8n-workflow';
2+
3+
export class TaskRunnerAcceptTimeoutError extends OperationalError {
4+
constructor(taskId: string, runnerId: string) {
5+
super(`Runner (${runnerId}) took too long to acknowledge acceptance of task (${taskId})`, {
6+
level: 'warning',
7+
});
8+
}
9+
}

packages/cli/src/task-runners/task-broker/errors/task-runner-timeout.error.ts renamed to packages/cli/src/task-runners/task-broker/errors/task-runner-execution-timeout.error.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import type { TaskRunnerMode } from '@n8n/config/src/configs/runners.config';
22
import { OperationalError } from 'n8n-workflow';
33

4-
export class TaskRunnerTimeoutError extends OperationalError {
4+
export class TaskRunnerExecutionTimeoutError extends OperationalError {
55
description: string;
66

77
constructor({

packages/cli/src/task-runners/task-broker/task-broker.service.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ import config from '@/config';
1414
import { Time } from '@/constants';
1515
import { TaskDeferredError } from '@/task-runners/task-broker/errors/task-deferred.error';
1616
import { TaskRejectError } from '@/task-runners/task-broker/errors/task-reject.error';
17-
import { TaskRunnerTimeoutError } from '@/task-runners/task-broker/errors/task-runner-timeout.error';
17+
import { TaskRunnerAcceptTimeoutError } from '@/task-runners/task-broker/errors/task-runner-accept-timeout.error';
18+
import { TaskRunnerExecutionTimeoutError } from '@/task-runners/task-broker/errors/task-runner-execution-timeout.error';
1819
import { TaskRunnerLifecycleEvents } from '@/task-runners/task-runner-lifecycle-events';
1920

2021
export interface TaskRunner {
@@ -468,7 +469,7 @@ export class TaskBroker {
468469

469470
await this.taskErrorHandler(
470471
taskId,
471-
new TaskRunnerTimeoutError({
472+
new TaskRunnerExecutionTimeoutError({
472473
taskTimeout,
473474
isSelfHosted: config.getEnv('deployment.type') !== 'cloud',
474475
mode,
@@ -513,7 +514,7 @@ export class TaskBroker {
513514

514515
// TODO: customisable timeout
515516
setTimeout(() => {
516-
reject('Runner timed out');
517+
reject(new TaskRunnerAcceptTimeoutError(taskId, offer.runnerId));
517518
}, 2000);
518519
});
519520

@@ -535,6 +536,10 @@ export class TaskBroker {
535536
this.pendingTaskRequests.push(request); // will settle on receiving task offer from runner
536537
return;
537538
}
539+
if (e instanceof TaskRunnerAcceptTimeoutError) {
540+
this.logger.warn(e.message);
541+
return;
542+
}
538543
throw e;
539544
}
540545

0 commit comments

Comments
 (0)