Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions packages/backend/src/managers/TaskRunner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/**********************************************************************
* Copyright (C) 2025 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
***********************************************************************/

import type { TaskRegistry } from '../registries/TaskRegistry';

export interface RunAsTaskOptions {
loadingLabel: string;
successLabel?: string;
errorLabel?: string;
errorMsg: (err: unknown) => string;
}

export interface TaskRunnerTools {
updateLabels: (f: (labels: Record<string, string>) => Record<string, string>) => void;
}

export class TaskRunner {
constructor(private taskRegistry: TaskRegistry) {}

async runAsTask<T>(
labels: Record<string, string>,
options: RunAsTaskOptions,
run: (tools: TaskRunnerTools) => Promise<T>,
): Promise<T> {
const tools = {
updateLabels: (f: (labels: Record<string, string>) => Record<string, string>): void => {
task.labels = f(labels);
this.taskRegistry.updateTask(task);
},
};

const task = this.taskRegistry.createTask(options.loadingLabel, 'loading', labels);
try {
const result = await run(tools);
task.state = 'success';
if (options.successLabel) {
task.name = options.successLabel;
}
return result;
} catch (err: unknown) {
task.state = 'error';
task.error = options.errorMsg(err);
if (options.errorLabel) {
task.name = options.errorLabel;
}
throw err;
} finally {
task.progress = undefined;
this.taskRegistry.updateTask(task);
}
}
}
199 changes: 83 additions & 116 deletions packages/backend/src/managers/application/applicationManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ import {
import { VMType } from '@shared/models/IPodman';
import { RECIPE_START_ROUTE } from '../../registries/NavigationRegistry';
import type { RpcExtension } from '@shared/messages/MessageProxy';
import { TaskRunner } from '../TaskRunner';

export class ApplicationManager extends Publisher<ApplicationState[]> implements Disposable {
#applications: ApplicationRegistry<ApplicationState>;
protectTasks: Set<string> = new Set();
#disposables: Disposable[];
#taskRunner: TaskRunner;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if task runner is stored like that it should be disposable, and be disposed if the class is disposed, cancelling any pending tasks it is managing ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a good enhancement, but as this is only a refactoring, I don't want to change any behaviour


constructor(
private taskRegistry: TaskRegistry,
Expand All @@ -71,6 +73,7 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
) {
super(rpcExtension, MSG_APPLICATIONS_STATE_UPDATE, () => this.getApplicationsState());
this.#applications = new ApplicationRegistry<ApplicationState>();
this.#taskRunner = new TaskRunner(this.taskRegistry);
this.#disposables = [];
}

Expand All @@ -86,33 +89,30 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
trackingId: trackingId,
};

const task = this.taskRegistry.createTask(`Pulling ${recipe.name} recipe`, 'loading', {
...labels,
'recipe-pulling': recipe.id, // this label should only be on the master task
});

window
.withProgress(
this.#taskRunner
.runAsTask(
{
location: ProgressLocation.TASK_WIDGET,
title: `Pulling ${recipe.name}.`,
details: {
routeId: RECIPE_START_ROUTE,
routeArgs: [recipe.id, trackingId],
},
...labels,
'recipe-pulling': recipe.id, // this label should only be on the master task
},
() => this.pullApplication(connection, recipe, model, labels),
{
loadingLabel: `Pulling ${recipe.name} recipe`,
errorMsg: err => `Something went wrong while pulling ${recipe.name}: ${String(err)}`,
},
() =>
window.withProgress(
{
location: ProgressLocation.TASK_WIDGET,
title: `Pulling ${recipe.name}.`,
details: {
routeId: RECIPE_START_ROUTE,
routeArgs: [recipe.id, trackingId],
},
},
() => this.pullApplication(connection, recipe, model, labels),
),
)
.then(() => {
task.state = 'success';
})
.catch((err: unknown) => {
task.state = 'error';
task.error = `Something went wrong while pulling ${recipe.name}: ${String(err)}`;
})
.finally(() => {
this.taskRegistry.updateTask(task);
});
.catch(() => {});

return trackingId;
}
Expand Down Expand Up @@ -223,27 +223,22 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
* @param labels
*/
protected async runApplication(podInfo: PodInfo, labels?: { [key: string]: string }): Promise<void> {
const task = this.taskRegistry.createTask('Starting AI App', 'loading', labels);

// it starts the pod
try {
await this.podManager.startPod(podInfo.engineId, podInfo.Id);

// check if all containers have started successfully
for (const container of podInfo.Containers ?? []) {
await this.waitContainerIsRunning(podInfo.engineId, container);
}

task.state = 'success';
task.name = 'AI App is running';
} catch (err: unknown) {
task.state = 'error';
task.error = String(err);
throw err;
} finally {
this.taskRegistry.updateTask(task);
}

await this.#taskRunner.runAsTask(
labels ?? {},
{
loadingLabel: 'Starting AI App',
successLabel: 'AI App is running',
errorMsg: err => String(err),
},
async () => {
await this.podManager.startPod(podInfo.engineId, podInfo.Id);

// check if all containers have started successfully
for (const container of podInfo.Containers ?? []) {
await this.waitContainerIsRunning(podInfo.engineId, container);
}
},
);
return this.checkPodsHealth();
}

Expand All @@ -268,38 +263,22 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
modelPath: string,
labels?: { [key: string]: string },
): Promise<PodInfo> {
const task = this.taskRegistry.createTask('Creating AI App', 'loading', labels);

// create empty pod
let podInfo: PodInfo;
try {
podInfo = await this.createPod(connection, recipe, model, components.images);
task.labels = {
...task.labels,
'pod-id': podInfo.Id,
};
} catch (e) {
console.error('error when creating pod', e);
task.state = 'error';
task.error = `Something went wrong while creating pod: ${String(e)}`;
throw e;
} finally {
this.taskRegistry.updateTask(task);
}

try {
await this.createContainerAndAttachToPod(connection, podInfo, components, model, modelPath);
task.state = 'success';
} catch (e) {
console.error(`error when creating pod ${podInfo.Id}`, e);
task.state = 'error';
task.error = `Something went wrong while creating pod: ${String(e)}`;
throw e;
} finally {
this.taskRegistry.updateTask(task);
}

return podInfo;
return this.#taskRunner.runAsTask<PodInfo>(
labels ?? {},
{
loadingLabel: 'Creating AI App',
errorMsg: err => `Something went wrong while creating pod: ${String(err)}`,
},
async ({ updateLabels }): Promise<PodInfo> => {
const podInfo = await this.createPod(connection, recipe, model, components.images);
updateLabels(labels => ({
...labels,
'pod-id': podInfo.Id,
}));
await this.createContainerAndAttachToPod(connection, podInfo, components, model, modelPath);
return podInfo;
},
);
}

protected async createContainerAndAttachToPod(
Expand Down Expand Up @@ -440,28 +419,20 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
const appPod = await this.getApplicationPod(recipeId, modelId);

// if the pod is already stopped skip
if (appPod.Status === 'Exited') {
return appPod;
}

// create a task to follow progress/error
const stoppingTask = this.taskRegistry.createTask(`Stopping AI App`, 'loading', {
'recipe-id': recipeId,
'model-id': modelId,
});

try {
await this.podManager.stopPod(appPod.engineId, appPod.Id);

stoppingTask.state = 'success';
stoppingTask.name = `AI App Stopped`;
} catch (err: unknown) {
stoppingTask.state = 'error';
stoppingTask.error = `Error removing the pod.: ${String(err)}`;
stoppingTask.name = 'Error stopping AI App';
throw err;
} finally {
this.taskRegistry.updateTask(stoppingTask);
if (appPod.Status !== 'Exited') {
await this.#taskRunner.runAsTask(
{
'recipe-id': recipeId,
'model-id': modelId,
},
{
loadingLabel: 'Stopping AI App',
successLabel: 'AI App Stopped',
errorLabel: 'Error stopping AI App',
errorMsg: err => `Error removing the pod.: ${String(err)}`,
},
() => this.podManager.stopPod(appPod.engineId, appPod.Id),
);
await this.checkPodsHealth();
}
return appPod;
Expand Down Expand Up @@ -641,25 +612,21 @@ export class ApplicationManager extends Publisher<ApplicationState[]> implements
async removeApplication(recipeId: string, modelId: string): Promise<void> {
const appPod = await this.stopApplication(recipeId, modelId);

const remoteTask = this.taskRegistry.createTask(`Removing AI App`, 'loading', {
'recipe-id': recipeId,
'model-id': modelId,
});
// protect the task
this.protectTasks.add(appPod.Id);

try {
await this.podManager.removePod(appPod.engineId, appPod.Id);

remoteTask.state = 'success';
remoteTask.name = `AI App Removed`;
// eslint-disable-next-line sonarjs/no-ignored-exceptions
} catch (err: unknown) {
remoteTask.error = 'error removing the pod. Please try to remove the pod manually';
remoteTask.name = 'Error stopping AI App';
} finally {
this.taskRegistry.updateTask(remoteTask);
}
await this.#taskRunner.runAsTask(
{
'recipe-id': recipeId,
'model-id': modelId,
},
{
loadingLabel: 'Removing AI App',
successLabel: 'AI App Removed',
errorLabel: 'Error stopping AI App',
errorMsg: () => 'error removing the pod. Please try to remove the pod manually',
},
() => this.podManager.removePod(appPod.engineId, appPod.Id),
);
}

async restartApplication(connection: ContainerProviderConnection, recipeId: string, modelId: string): Promise<void> {
Expand Down
Loading