Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions packages/cli/src/modules/data-store/data-store-proxy.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import type { DataStoreListOptions } from '@n8n/api-types';
import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import {
AddDataStoreColumnOptions,
CreateDataStoreOptions,
DataStore,
DataStoreColumn,
DataStoreRows,
IDataStoreProjectService,
INode,
ListDataStoreOptions,
ListDataStoreRowsOptions,
MoveDataStoreColumnOptions,
UpdateDataStoreOptions,
UpsertDataStoreRowsOptions,
Workflow,
} from 'n8n-workflow';

// Aggregate operations (project-level, don't require specific dataStoreId)
type DataStoreAggregateOperations = Pick<
IDataStoreProjectService,
'getManyAndCount' | 'createDataStore' | 'deleteDataStoreAll'
>;

import { OwnershipService } from '@/services/ownership.service';

import { DataStoreService } from './data-store.service';

@Service()
export class DataStoreProxyService {
constructor(
private readonly dataStoreService: DataStoreService,
private readonly ownershipService: OwnershipService,
private readonly logger: Logger,
) {
this.logger = this.logger.scoped('data-store');
}

async getDataStoreProxy(
workflow: Workflow,
node: INode,
dataStoreId?: undefined,
): Promise<DataStoreAggregateOperations>;
async getDataStoreProxy(
workflow: Workflow,
node: INode,
dataStoreId?: string,
): Promise<IDataStoreProjectService>;
async getDataStoreProxy<T extends string | undefined>(
workflow: Workflow,
node: INode,
dataStoreId?: T,
): Promise<IDataStoreProjectService | DataStoreAggregateOperations> {
if (node.type !== 'n8n-nodes-base.dataStore') {
throw new Error('This proxy is only available for data store nodes');
}

const homeProject = await this.ownershipService.getWorkflowProjectCached(workflow.id);
const projectId = homeProject.id;
const dataStoreService = this.dataStoreService;

const aggregateOperations = {
async getManyAndCount(options: ListDataStoreOptions = {}) {
const serviceOptions: DataStoreListOptions = {
...options,
filter: { projectId, ...(options.filter ?? {}) },
};
return await dataStoreService.getManyAndCount(serviceOptions);
},

async createDataStore(options: CreateDataStoreOptions): Promise<DataStore> {
return await dataStoreService.createDataStore(projectId, options);
},

async deleteDataStoreAll(): Promise<boolean> {
return await dataStoreService.deleteDataStoreByProjectId(projectId);
},
};

const dataStoreSpecificOperations =
dataStoreId === undefined
? {}
: {
// DataStore management
async updateDataStore(options: UpdateDataStoreOptions): Promise<boolean> {
return await dataStoreService.updateDataStore(dataStoreId, options);
},

async deleteDataStore(): Promise<boolean> {
return await dataStoreService.deleteDataStore(dataStoreId);
},

// Column operations
async getColumns(): Promise<DataStoreColumn[]> {
return await dataStoreService.getColumns(dataStoreId);
},

async addColumn(options: AddDataStoreColumnOptions): Promise<DataStoreColumn> {
return await dataStoreService.addColumn(dataStoreId, options);
},

async moveColumn(
columnId: string,
options: MoveDataStoreColumnOptions,
): Promise<boolean> {
return await dataStoreService.moveColumn(dataStoreId, columnId, options);
},

async deleteColumn(columnId: string): Promise<boolean> {
return await dataStoreService.deleteColumn(dataStoreId, columnId);
},

// Row operations
async getManyRowsAndCount(options: Partial<ListDataStoreRowsOptions>) {
return await dataStoreService.getManyRowsAndCount(dataStoreId, options);
},

async insertRows(rows: DataStoreRows) {
return await dataStoreService.insertRows(dataStoreId, rows);
},

async upsertRows(options: UpsertDataStoreRowsOptions) {
return await dataStoreService.upsertRows(dataStoreId, options);
},
};

return {
...aggregateOperations,
...dataStoreSpecificOperations,
};
}
}
2 changes: 2 additions & 0 deletions packages/cli/src/workflow-execute-additional-data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { TaskRequester } from '@/task-runners/task-managers/task-requester';
import { findSubworkflowStart } from '@/utils';
import { objectToError } from '@/utils/object-to-error';
import * as WorkflowHelpers from '@/workflow-helpers';
import { DataStoreProxyService } from './modules/data-store/data-store-proxy.service';

export async function getRunData(
workflowData: IWorkflowBase,
Expand Down Expand Up @@ -378,6 +379,7 @@ export async function getBase(
const eventService = Container.get(EventService);

return {
dataStoreProxy: Container.get(DataStoreProxyService),
currentNodeExecutionIndex: 0,
credentialsHelper: Container.get(CredentialsHelper),
executeWorkflow,
Expand Down
3 changes: 2 additions & 1 deletion packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
"dist",
"bin"
],
"devDependencies": {
"devDependencies": {
"@n8n/api-types": "workspace:*",
"@n8n/errors": "workspace:^",
"@n8n/typescript-config": "workspace:*",
"@types/express": "catalog:",
Expand Down
3 changes: 3 additions & 0 deletions packages/core/src/execution-engine/index.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type { DataStoreProxy } from 'n8n-workflow';

import type { ExecutionLifecycleHooks } from './execution-lifecycle-hooks';
import type { ExternalSecretsProxy } from './external-secrets-proxy';

declare module 'n8n-workflow' {
interface IWorkflowExecuteAdditionalData {
hooks?: ExecutionLifecycleHooks;
externalSecretsProxy: ExternalSecretsProxy;
dataStoreProxy: DataStoreProxy;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
} from './utils/binary-helper-functions';
import { constructExecutionMetaData } from './utils/construct-execution-metadata';
import { copyInputItems } from './utils/copy-input-items';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions';
import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions';
import { getInputConnectionData } from './utils/get-input-connection-data';
Expand Down Expand Up @@ -94,6 +95,7 @@ export class ExecuteContext extends BaseExecuteContext implements IExecuteFuncti
connectionInputData,
),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getDataStoreHelperFunctions(additionalData, workflow, node),
...getSSHTunnelFunctions(),
...getFileSystemHelperFunctions(node),
...getDeduplicationHelperFunctions(workflow, node),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import type {
} from 'n8n-workflow';

import { NodeExecutionContext } from './node-execution-context';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { extractValue } from './utils/extract-value';
import { getRequestHelperFunctions } from './utils/request-helper-functions';
import { getSSHTunnelFunctions } from './utils/ssh-tunnel-helper-functions';
Expand All @@ -28,6 +29,7 @@ export class LoadOptionsContext extends NodeExecutionContext implements ILoadOpt
this.helpers = {
...getSSHTunnelFunctions(),
...getRequestHelperFunctions(workflow, node, additionalData),
...getDataStoreHelperFunctions(additionalData, workflow, node),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {
} from './utils/binary-helper-functions';
import { constructExecutionMetaData } from './utils/construct-execution-metadata';
import { copyInputItems } from './utils/copy-input-items';
import { getDataStoreHelperFunctions } from './utils/data-store-helper-functions';
import { getDeduplicationHelperFunctions } from './utils/deduplication-helper-functions';
import { getFileSystemHelperFunctions } from './utils/file-system-helper-functions';
// eslint-disable-next-line import-x/no-cycle
Expand Down Expand Up @@ -88,6 +89,7 @@ export class SupplyDataContext extends BaseExecuteContext implements ISupplyData
...getSSHTunnelFunctions(),
...getFileSystemHelperFunctions(node),
...getBinaryHelperFunctions(additionalData, workflow.id),
...getDataStoreHelperFunctions(additionalData, workflow, node),
...getDeduplicationHelperFunctions(workflow, node),
assertBinaryData: (itemIndex, propertyName) =>
assertBinaryData(inputData, node, itemIndex, propertyName, 0),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import type {
DataStoreProxyFunctions,
INode,
Workflow,
IWorkflowExecuteAdditionalData,
} from 'n8n-workflow';

export function getDataStoreHelperFunctions(
additionalData: IWorkflowExecuteAdditionalData,
workflow: Workflow,
node: INode,
): DataStoreProxyFunctions {
return {
getDataStoreProxy: async <T extends string | undefined>(dataStoreId?: T) =>
await additionalData.dataStoreProxy.getDataStoreProxy(workflow, node, dataStoreId),
};
}
18 changes: 18 additions & 0 deletions packages/nodes-base/nodes/DataStore/DataStore.node.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"node": "n8n-nodes-base.dataStore",
"nodeVersion": "1.0",
"codexVersion": "1.0",
"details": "Data Store",
"categories": ["Core Nodes"],
"resources": {
"primaryDocumentation": [
{
"url": "https://docs.n8n.io/integrations/builtin/core-nodes/n8n-nodes-base.dataStore/"
}
]
},
"alias": ["data", "table", "knowledge"],
"subcategories": {
"Core Nodes": ["Data Transformation"]
}
}
Loading
Loading