Add redaction for live data send to FE
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { mockInstance } from '@n8n/backend-test-utils';
|
||||
import type { Project } from '@n8n/db';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { ExecutionRepository, UserRepository } from '@n8n/db';
|
||||
import { mock } from 'jest-mock-extended';
|
||||
import {
|
||||
BinaryDataService,
|
||||
@@ -24,6 +24,7 @@ import type {
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { EventService } from '@/events/event.service';
|
||||
import { ExecutionRedactionServiceProxy } from '@/executions/execution-redaction-proxy.service';
|
||||
import { ExecutionPersistence } from '@/executions/execution-persistence';
|
||||
import { ExternalHooks } from '@/external-hooks';
|
||||
import { Push } from '@/push';
|
||||
@@ -55,6 +56,8 @@ describe('Execution Lifecycle Hooks', () => {
|
||||
const binaryDataService = mockInstance(BinaryDataService);
|
||||
const ownershipService = mockInstance(OwnershipService);
|
||||
const workflowExecutionService = mockInstance(WorkflowExecutionService);
|
||||
mockInstance(UserRepository);
|
||||
mockInstance(ExecutionRedactionServiceProxy);
|
||||
|
||||
const nodeName = 'Test Node';
|
||||
const nodeType = 'n8n-nodes-base.testNode';
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
import { ExecutionRepository } from '@n8n/db';
|
||||
import { ExecutionRepository, UserRepository } from '@n8n/db';
|
||||
import type { User } from '@n8n/db';
|
||||
import { LifecycleMetadata } from '@n8n/decorators';
|
||||
import { Container, Service } from '@n8n/di';
|
||||
import { stringify } from 'flatted';
|
||||
@@ -14,6 +15,8 @@ import {
|
||||
import { ExecutionPersistence } from '@/executions/execution-persistence';
|
||||
import type {
|
||||
IRun,
|
||||
IRunData,
|
||||
IRunExecutionData,
|
||||
IWorkflowBase,
|
||||
RelatedExecution,
|
||||
WorkflowExecuteMode,
|
||||
@@ -21,6 +24,8 @@ import type {
|
||||
} from 'n8n-workflow';
|
||||
|
||||
import { EventService } from '@/events/event.service';
|
||||
import type { RedactableExecution } from '@/executions/execution-redaction';
|
||||
import { ExecutionRedactionServiceProxy } from '@/executions/execution-redaction-proxy.service';
|
||||
import { ExternalHooks } from '@/external-hooks';
|
||||
import { Push } from '@/push';
|
||||
import { WorkflowStatisticsService } from '@/services/workflow-statistics.service';
|
||||
@@ -181,16 +186,54 @@ function hookFunctionsNodeEvents(hooks: ExecutionLifecycleHooks) {
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a minimal RedactableExecution from push event data so the existing
|
||||
* redaction pipeline can be reused for real-time push events.
|
||||
*/
|
||||
function buildRedactableExecution(
|
||||
hooks: ExecutionLifecycleHooks,
|
||||
runData: IRunData,
|
||||
executionData?: IRunExecutionData,
|
||||
): RedactableExecution {
|
||||
return {
|
||||
id: hooks.executionId,
|
||||
mode: hooks.mode,
|
||||
workflowId: hooks.workflowData.id,
|
||||
data: {
|
||||
resultData: { runData },
|
||||
executionData: executionData?.executionData,
|
||||
} as IRunExecutionData,
|
||||
workflowData: {
|
||||
settings: hooks.workflowData.settings,
|
||||
nodes: hooks.workflowData.nodes,
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns hook functions to push data to Editor-UI
|
||||
*/
|
||||
function hookFunctionsPush(
|
||||
hooks: ExecutionLifecycleHooks,
|
||||
{ pushRef, retryOf }: HooksSetupParameters,
|
||||
userId?: string,
|
||||
) {
|
||||
if (!pushRef) return;
|
||||
const logger = Container.get(Logger);
|
||||
const pushInstance = Container.get(Push);
|
||||
const redactionProxy = Container.get(ExecutionRedactionServiceProxy);
|
||||
const userRepository = Container.get(UserRepository);
|
||||
|
||||
// Lazy user resolution — resolved once, reused across all node events in this execution
|
||||
let resolvedUser: User | null | undefined; // undefined = not yet resolved
|
||||
async function getUser(): Promise<User | null> {
|
||||
if (resolvedUser !== undefined) return resolvedUser;
|
||||
resolvedUser = userId
|
||||
? await userRepository.findOne({ where: { id: userId }, relations: ['role'] })
|
||||
: null;
|
||||
return resolvedUser;
|
||||
}
|
||||
|
||||
hooks.addHandler('nodeExecuteBefore', function (nodeName, data) {
|
||||
const { executionId } = this;
|
||||
// Push data to session which started workflow before each
|
||||
@@ -206,7 +249,7 @@ function hookFunctionsPush(
|
||||
pushRef,
|
||||
);
|
||||
});
|
||||
hooks.addHandler('nodeExecuteAfter', function (nodeName, data) {
|
||||
hooks.addHandler('nodeExecuteAfter', async function (nodeName, data, executionData) {
|
||||
const { executionId } = this;
|
||||
// Push data to session which started workflow after each rendered node
|
||||
logger.debug(`Executing hook on node "${nodeName}" (hookFunctionsPush)`, {
|
||||
@@ -226,6 +269,22 @@ function hookFunctionsPush(
|
||||
pushRef,
|
||||
);
|
||||
|
||||
// Apply copy-on-write redaction before sending binary data.
|
||||
// Returns the original data if no redaction is needed (zero-copy),
|
||||
// or a structuredClone with redaction applied.
|
||||
let dataToSend = data;
|
||||
const user = await getUser();
|
||||
if (user) {
|
||||
const dummy = buildRedactableExecution(this, { [nodeName]: [data] }, executionData);
|
||||
const result = await redactionProxy.processExecution(dummy, {
|
||||
user,
|
||||
copyOnWrite: true,
|
||||
});
|
||||
if (result !== dummy) {
|
||||
dataToSend = result.data.resultData.runData[nodeName][0];
|
||||
}
|
||||
}
|
||||
|
||||
// We send the node execution data as a WS binary message to the FE. Not
|
||||
// because it's more efficient on the wire: the content is a JSON string
|
||||
// so both text and binary would end the same on the wire. The reason
|
||||
@@ -236,13 +295,13 @@ function hookFunctionsPush(
|
||||
pushInstance.send(
|
||||
{
|
||||
type: 'nodeExecuteAfterData',
|
||||
data: { executionId, nodeName, itemCountByConnectionType, data },
|
||||
data: { executionId, nodeName, itemCountByConnectionType, data: dataToSend },
|
||||
},
|
||||
pushRef,
|
||||
asBinary,
|
||||
);
|
||||
});
|
||||
hooks.addHandler('workflowExecuteBefore', function (_workflow, data) {
|
||||
hooks.addHandler('workflowExecuteBefore', async function (_workflow, data) {
|
||||
const { executionId } = this;
|
||||
const { id: workflowId, name: workflowName } = this.workflowData;
|
||||
logger.debug('Executing hook (hookFunctionsPush)', {
|
||||
@@ -250,6 +309,21 @@ function hookFunctionsPush(
|
||||
pushRef,
|
||||
workflowId,
|
||||
});
|
||||
|
||||
// Apply copy-on-write redaction to flattedRunData when retrying/resuming
|
||||
let runDataToStringify = data?.resultData.runData ?? {};
|
||||
const user = await getUser();
|
||||
if (user && data?.resultData.runData && Object.keys(data.resultData.runData).length > 0) {
|
||||
const dummy = buildRedactableExecution(this, data.resultData.runData, data);
|
||||
const result = await redactionProxy.processExecution(dummy, {
|
||||
user,
|
||||
copyOnWrite: true,
|
||||
});
|
||||
if (result !== dummy) {
|
||||
runDataToStringify = result.data.resultData.runData;
|
||||
}
|
||||
}
|
||||
|
||||
// Push data to session which started the workflow
|
||||
pushInstance.send(
|
||||
{
|
||||
@@ -261,9 +335,7 @@ function hookFunctionsPush(
|
||||
retryOf,
|
||||
workflowId,
|
||||
workflowName,
|
||||
flattedRunData: data?.resultData.runData
|
||||
? stringify(data.resultData.runData)
|
||||
: stringify({}),
|
||||
flattedRunData: stringify(runDataToStringify),
|
||||
},
|
||||
},
|
||||
pushRef,
|
||||
@@ -582,7 +654,7 @@ export function getLifecycleHooksForScalingWorker(
|
||||
hookFunctionsExternalHooks(hooks);
|
||||
|
||||
if (executionMode === 'manual' && Container.get(InstanceSettings).isWorker) {
|
||||
hookFunctionsPush(hooks, optionalParameters);
|
||||
hookFunctionsPush(hooks, optionalParameters, data.userId);
|
||||
}
|
||||
|
||||
Container.get(ModulesHooksRegistry).addHooks(hooks);
|
||||
@@ -675,7 +747,7 @@ export function getLifecycleHooksForRegularMain(
|
||||
hookFunctionsNodeEvents(hooks);
|
||||
hookFunctionsFinalizeExecutionStatus(hooks);
|
||||
hookFunctionsSave(hooks, optionalParameters);
|
||||
hookFunctionsPush(hooks, optionalParameters);
|
||||
hookFunctionsPush(hooks, optionalParameters, userId);
|
||||
hookFunctionsSaveProgress(hooks, optionalParameters);
|
||||
hookFunctionsStatistics(hooks);
|
||||
hookFunctionsExternalHooks(hooks);
|
||||
|
||||
@@ -6,6 +6,10 @@ export type ExecutionRedactionOptions = {
|
||||
user: User;
|
||||
ipAddress?: string;
|
||||
userAgent?: string;
|
||||
/** When true, the original execution is never mutated. If redaction is needed,
|
||||
* a structuredClone is created and returned. If no redaction is needed, the
|
||||
* original is returned as-is (caller can check referential equality). */
|
||||
copyOnWrite?: boolean;
|
||||
} & Pick<ExecutionRedactionQueryDto, 'redactExecutionData'>;
|
||||
|
||||
export interface ExecutionRedaction {
|
||||
|
||||
@@ -27,4 +27,7 @@ export interface RedactionContext {
|
||||
export interface IExecutionRedactionStrategy {
|
||||
readonly name: string;
|
||||
apply(execution: RedactableExecution, context: RedactionContext): Promise<void>;
|
||||
/** Returns true if apply() would mutate the execution data. Used by the
|
||||
* copy-on-write path to avoid unnecessary cloning. */
|
||||
wouldModify(execution: RedactableExecution, context: RedactionContext): boolean;
|
||||
}
|
||||
|
||||
@@ -48,13 +48,19 @@ export class ExecutionRedactionService implements ExecutionRedaction {
|
||||
|
||||
/**
|
||||
* Thin wrapper around `processExecutions` for single-execution callers.
|
||||
*
|
||||
* With `copyOnWrite: true`, the original execution is never mutated. Returns
|
||||
* either the original (if no redaction needed) or a structuredClone with
|
||||
* redaction applied. Callers can check referential equality to determine
|
||||
* whether redaction occurred.
|
||||
*/
|
||||
async processExecution(
|
||||
execution: RedactableExecution,
|
||||
options: ExecutionRedactionOptions,
|
||||
): Promise<RedactableExecution> {
|
||||
await this.processExecutions([execution], options);
|
||||
return execution;
|
||||
const executions = [execution];
|
||||
await this.processExecutions(executions, options);
|
||||
return executions[0];
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -113,7 +119,8 @@ export class ExecutionRedactionService implements ExecutionRedaction {
|
||||
// Unified pipeline execution. buildPipeline excludes FullItemRedactionStrategy on the
|
||||
// reveal path (redactExecutionData === false). NodeDefinedFieldRedactionStrategy
|
||||
// always runs — node-declared sensitive fields are never revealable.
|
||||
for (const execution of executions) {
|
||||
for (let i = 0; i < executions.length; i++) {
|
||||
const execution = executions[i];
|
||||
const hasDynCreds = this.hasDynamicCredentials(execution);
|
||||
const policyAllowsReveal = this.policyAllowsReveal(execution);
|
||||
// Dynamic credential executions can never be revealed regardless of permissions
|
||||
@@ -127,14 +134,23 @@ export class ExecutionRedactionService implements ExecutionRedaction {
|
||||
hasDynamicCredentials: hasDynCreds,
|
||||
};
|
||||
const pipeline = this.buildPipeline(execution, context, policyAllowsReveal, hasDynCreds);
|
||||
|
||||
let target = execution;
|
||||
if (options.copyOnWrite) {
|
||||
const needsClone = pipeline.some((s) => s.wouldModify(execution, context));
|
||||
if (!needsClone) continue;
|
||||
target = structuredClone(execution);
|
||||
executions[i] = target;
|
||||
}
|
||||
|
||||
for (const strategy of pipeline) {
|
||||
await strategy.apply(execution, context);
|
||||
await strategy.apply(target, context);
|
||||
}
|
||||
|
||||
// runtimeData.credentials contains encrypted credential context that
|
||||
// must never be exposed in API responses
|
||||
if (hasDynCreds && execution.data.executionData?.runtimeData) {
|
||||
delete execution.data.executionData.runtimeData.credentials;
|
||||
if (hasDynCreds && target.data.executionData?.runtimeData) {
|
||||
delete target.data.executionData.runtimeData.credentials;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,11 @@ import type {
|
||||
export class FullItemRedactionStrategy implements IExecutionRedactionStrategy {
|
||||
readonly name = 'full-item-redaction';
|
||||
|
||||
wouldModify(_execution: RedactableExecution, _context: RedactionContext): boolean {
|
||||
// If this strategy is in the pipeline, it always modifies (clears all items).
|
||||
return true;
|
||||
}
|
||||
|
||||
async apply(execution: RedactableExecution, context: RedactionContext): Promise<void> {
|
||||
const runData = execution.data.resultData.runData;
|
||||
if (!runData) return;
|
||||
|
||||
@@ -19,6 +19,11 @@ export class NodeDefinedFieldRedactionStrategy implements IExecutionRedactionStr
|
||||
private readonly nodeTypes: NodeTypes,
|
||||
) {}
|
||||
|
||||
wouldModify(execution: RedactableExecution, _context: RedactionContext): boolean {
|
||||
const { sensitiveFields, unknownNodes } = this.buildSensitiveFieldsMap(execution);
|
||||
return sensitiveFields.size > 0 || unknownNodes.size > 0;
|
||||
}
|
||||
|
||||
async apply(execution: RedactableExecution, _context: RedactionContext): Promise<void> {
|
||||
const { sensitiveFields, unknownNodes } = this.buildSensitiveFieldsMap(execution);
|
||||
if (sensitiveFields.size === 0 && unknownNodes.size === 0) return;
|
||||
|
||||
Reference in New Issue
Block a user