Skip to content

Commit 2175408

Browse files
fix(data-retention): re-mask offloaded large-value refs on resume + don't lock out granular saves
- Resume/run-from-block restore now hydrates → masks → re-stores large-value refs in restored blockStates (not just inline strings), so a value offloaded before the block-output stage was enabled can't warm raw PII into downstream blocks. Fails fast. - pii-large-values: add onFailure mode (throw on the execution path, scrub for logs) and redactLargeValueRefsInValue for arbitrary (non-RedactablePayload) values - Granular flag gate now rejects only NEW off→on granular enablement, so orgs that already configured granular stages can still save retention settings when the flag is off
1 parent e7b822a commit 2175408

4 files changed

Lines changed: 164 additions & 31 deletions

File tree

apps/sim/app/api/organizations/[id]/data-retention/route.ts

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,27 @@ function normalizeConfigured(
6565
}
6666
}
6767

68+
/**
69+
* Which granular stages (`input`/`blockOutputs`) are already enabled per rule
70+
* target (`workspaceId ?? ''` = the org default). Used to gate the
71+
* `pii-granular-redaction` flag on *new* enablement only: when the flag is off,
72+
* an org that already configured granular stages must still be able to re-save
73+
* unrelated settings (the UI re-sends the full PII snapshot every save), so we
74+
* reject only a stage transitioning off→on, never a preserved one.
75+
*/
76+
function granularStageEnablement(
77+
settings: OrganizationRetentionValues['piiRedaction']
78+
): Map<string, { input: boolean; blockOutputs: boolean }> {
79+
const map = new Map<string, { input: boolean; blockOutputs: boolean }>()
80+
for (const rule of settings?.rules ?? []) {
81+
map.set(rule.workspaceId ?? '', {
82+
input: rule.stages?.input?.enabled === true,
83+
blockOutputs: rule.stages?.blockOutputs?.enabled === true,
84+
})
85+
}
86+
return map
87+
}
88+
6889
/**
6990
* GET /api/organizations/[id]/data-retention
7091
* Returns the organization's data retention settings.
@@ -210,18 +231,28 @@ export const PUT = withRouteHandler(
210231
{ status: 403 }
211232
)
212233
}
213-
const enablesGranularStage = (body.piiRedaction?.rules ?? []).some(
214-
(rule) =>
215-
rule.stages?.input?.enabled === true || rule.stages?.blockOutputs?.enabled === true
216-
)
217-
if (!piiGranularRedactionEnabled && enablesGranularStage) {
218-
return NextResponse.json(
219-
{
220-
error:
221-
'Granular PII redaction (workflow input and block outputs) is not enabled for this organization',
222-
},
223-
{ status: 403 }
224-
)
234+
if (!piiGranularRedactionEnabled) {
235+
// Reject only a granular stage transitioning off→on; a body that merely
236+
// preserves already-enabled granular stages must still save (the UI
237+
// re-sends the full snapshot on every save), so existing orgs aren't
238+
// locked out of unrelated retention changes when the flag is off.
239+
const currentGranular = granularStageEnablement(current.piiRedaction)
240+
const newlyEnablesGranular = (body.piiRedaction?.rules ?? []).some((rule) => {
241+
const cur = currentGranular.get(rule.workspaceId ?? '')
242+
return (
243+
(rule.stages?.input?.enabled === true && !cur?.input) ||
244+
(rule.stages?.blockOutputs?.enabled === true && !cur?.blockOutputs)
245+
)
246+
})
247+
if (newlyEnablesGranular) {
248+
return NextResponse.json(
249+
{
250+
error:
251+
'Granular PII redaction (workflow input and block outputs) is not enabled for this organization',
252+
},
253+
{ status: 403 }
254+
)
255+
}
225256
}
226257
merged.piiRedaction = body.piiRedaction
227258
}

apps/sim/lib/logs/execution/pii-large-values.test.ts

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,11 @@ vi.mock('@/lib/guardrails/mask-client', () => ({
2828
maskPIIBatchViaHttp: mockMaskBatch,
2929
}))
3030

31-
import { redactLargeValueRefs } from '@/lib/logs/execution/pii-large-values'
31+
import {
32+
redactLargeValueRefs,
33+
redactLargeValueRefsInValue,
34+
} from '@/lib/logs/execution/pii-large-values'
35+
import { PiiRedactionError } from '@/lib/logs/execution/pii-redaction'
3236

3337
const REF = {
3438
__simLargeValueRef: true,
@@ -95,3 +99,48 @@ describe('redactLargeValueRefs', () => {
9599
expect(mockMaterializeRef).not.toHaveBeenCalled()
96100
})
97101
})
102+
103+
describe('redactLargeValueRefsInValue (arbitrary blockStates)', () => {
104+
beforeEach(() => {
105+
vi.clearAllMocks()
106+
mockMaskBatch.mockImplementation(async (texts: string[]) => texts.map((t) => `MASKED(${t})`))
107+
mockCompact.mockImplementation(async (value: unknown) => value)
108+
})
109+
110+
it('hydrates + re-stores a ref nested in a non-RedactablePayload shape', async () => {
111+
mockMaterializeRef.mockResolvedValue({ note: 'contact bob' })
112+
const blockStates = { 'block-1': { output: REF }, 'block-2': { output: { plain: 'hi' } } }
113+
114+
const result = await redactLargeValueRefsInValue(blockStates, {
115+
entityTypes: ['PERSON'],
116+
language: 'en',
117+
store: STORE,
118+
})
119+
120+
expect((result as any)['block-1'].output).toEqual({ note: 'MASKED(contact bob)' })
121+
expect((result as any)['block-2'].output).toEqual({ plain: 'hi' })
122+
})
123+
124+
it('throws PiiRedactionError on failure when onFailure is throw (aborts resume, no marker)', async () => {
125+
mockMaterializeRef.mockResolvedValue(undefined)
126+
127+
await expect(
128+
redactLargeValueRefsInValue(
129+
{ 'block-1': { output: REF } },
130+
{ entityTypes: [], language: 'en', store: STORE, onFailure: 'throw' }
131+
)
132+
).rejects.toBeInstanceOf(PiiRedactionError)
133+
})
134+
135+
it('rethrows a re-store failure as PiiRedactionError under throw mode', async () => {
136+
mockMaterializeRef.mockResolvedValue({ note: 'secret@x.com' })
137+
mockCompact.mockRejectedValueOnce(new Error('s3 down'))
138+
139+
await expect(
140+
redactLargeValueRefsInValue(
141+
{ 'block-1': { output: REF } },
142+
{ entityTypes: [], language: 'en', store: STORE, onFailure: 'throw' }
143+
)
144+
).rejects.toBeInstanceOf(PiiRedactionError)
145+
})
146+
})

apps/sim/lib/logs/execution/pii-large-values.ts

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { compactExecutionPayload } from '@/lib/execution/payloads/serializer'
88
import type { LargeValueStoreContext } from '@/lib/execution/payloads/store'
99
import { materializeLargeValueRef } from '@/lib/execution/payloads/store'
1010
import {
11+
PiiRedactionError,
12+
type PiiRedactionFailureMode,
1113
REDACTION_FAILED_MARKER,
1214
type RedactablePayload,
1315
redactObjectStrings,
@@ -21,6 +23,13 @@ export interface RedactLargeValueRefsOptions {
2123
language: string
2224
/** Storage scope for materializing and re-storing the masked values. */
2325
store: LargeValueStoreContext
26+
/**
27+
* How to handle a ref that can't be materialized/masked/re-stored. Defaults to
28+
* `'scrub'` (marker) — safe for the logs path. The execution-altering restore
29+
* path passes `'throw'` so an unmaskable restored value aborts the run rather
30+
* than feeding a marker (or leaking raw bytes) downstream.
31+
*/
32+
onFailure?: PiiRedactionFailureMode
2433
}
2534

2635
/**
@@ -51,6 +60,19 @@ export async function redactLargeValueRefs(
5160
return result
5261
}
5362

63+
/**
64+
* Hydrate, mask, and re-store offloaded large values inside an arbitrary value
65+
* (e.g. resumed snapshot `blockStates`) — the same walk as
66+
* {@link redactLargeValueRefs}, but not bound to the {@link RedactablePayload}
67+
* key set. Structure is preserved; only refs/manifests are replaced.
68+
*/
69+
export async function redactLargeValueRefsInValue<T>(
70+
value: T,
71+
options: RedactLargeValueRefsOptions
72+
): Promise<T> {
73+
return (await redactValueRefs(value, options)) as T
74+
}
75+
5476
/** Sync-collect every ref/manifest in `value`, then async-replace each, then sync-substitute. */
5577
async function redactValueRefs(
5678
value: unknown,
@@ -119,11 +141,26 @@ async function maskAndReStore(
119141
const masked = await redactObjectStrings(nested, {
120142
entityTypes: options.entityTypes,
121143
language: options.language,
122-
onFailure: 'scrub',
144+
onFailure: options.onFailure ?? 'scrub',
123145
})
124146
return compactExecutionPayload(masked, { ...options.store, requireDurable: true })
125147
}
126148

149+
/** Rethrow (as {@link PiiRedactionError}) or scrub-to-marker, per `onFailure`. */
150+
function onRefFailure(
151+
error: unknown,
152+
options: RedactLargeValueRefsOptions,
153+
context: string
154+
): never | string {
155+
if ((options.onFailure ?? 'scrub') === 'throw') {
156+
throw error instanceof PiiRedactionError
157+
? error
158+
: new PiiRedactionError(`${context}: ${getErrorMessage(error)}`)
159+
}
160+
logger.error(`${context}; scrubbing`, { error: getErrorMessage(error) })
161+
return REDACTION_FAILED_MARKER
162+
}
163+
127164
async function redactRef(
128165
ref: LargeValueRef,
129166
options: RedactLargeValueRefsOptions
@@ -133,11 +170,16 @@ async function redactRef(
133170
...options.store,
134171
trackReference: false,
135172
})
136-
if (materialized === undefined) return REDACTION_FAILED_MARKER
173+
if (materialized === undefined) {
174+
return onRefFailure(
175+
new Error('large value could not be materialized'),
176+
options,
177+
'Failed to redact large value ref'
178+
)
179+
}
137180
return await maskAndReStore(materialized, options)
138181
} catch (error) {
139-
logger.error('Failed to redact large value ref; scrubbing', { error: getErrorMessage(error) })
140-
return REDACTION_FAILED_MARKER
182+
return onRefFailure(error, options, 'Failed to redact large value ref')
141183
}
142184
}
143185

@@ -149,9 +191,6 @@ async function redactManifest(
149191
const materialized = await materializeLargeArrayManifest(manifest, { ...options.store })
150192
return await maskAndReStore(materialized, options)
151193
} catch (error) {
152-
logger.error('Failed to redact large array manifest; scrubbing', {
153-
error: getErrorMessage(error),
154-
})
155-
return REDACTION_FAILED_MARKER
194+
return onRefFailure(error, options, 'Failed to redact large array manifest')
156195
}
157196
}

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { clearExecutionCancellation } from '@/lib/execution/cancellation'
1818
import { warmLargeValueRefs } from '@/lib/execution/payloads/hydration'
1919
import { parseLargeExecutionValue } from '@/lib/execution/payloads/large-execution-value'
2020
import type { LoggingSession } from '@/lib/logs/execution/logging-session'
21+
import { redactLargeValueRefsInValue } from '@/lib/logs/execution/pii-large-values'
2122
import { redactObjectStrings } from '@/lib/logs/execution/pii-redaction'
2223
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
2324
import {
@@ -675,25 +676,38 @@ export async function executeWorkflowCore(
675676
// predate the blockOutputs stage being enabled, re-mask them so downstream
676677
// blocks can't read unredacted PII from restored snapshot state. Masking is
677678
// idempotent, so outputs already masked in the original run are unaffected.
678-
// Limitation: this walks inline strings only — values offloaded to
679-
// large-value storage are still refs here and are not re-masked. In the
680-
// normal flow that is safe (a run with the stage on masks before offload);
681-
// the gap is the narrow case of a run that offloaded a large value while
682-
// the stage was OFF and is resumed after the stage is turned ON.
679+
//
680+
// Two disjoint passes cover the whole state: `redactLargeValueRefsInValue`
681+
// hydrates → masks → re-stores any value offloaded to large-value storage
682+
// (>8MB refs the string walk treats as opaque), then `redactObjectStrings`
683+
// masks the remaining inline string leaves. Both fail-fast (`throw`), so an
684+
// unmaskable restored value aborts the resume rather than warming raw PII
685+
// into `blockStates` for downstream blocks.
683686
const blockOutputOpts = {
684687
entityTypes: piiRedaction.blockOutputs.entityTypes,
685688
language: piiRedaction.blockOutputs.language,
686689
onFailure: 'throw' as const,
687690
}
691+
const largeRefOpts = {
692+
...blockOutputOpts,
693+
store: {
694+
workspaceId: providedWorkspaceId,
695+
workflowId,
696+
executionId,
697+
userId: userId ?? undefined,
698+
},
699+
}
688700
if (snapshot.state?.blockStates) {
689-
snapshot.state.blockStates = await redactObjectStrings(
690-
snapshot.state.blockStates,
691-
blockOutputOpts
692-
)
701+
const hydrated = await redactLargeValueRefsInValue(snapshot.state.blockStates, largeRefOpts)
702+
snapshot.state.blockStates = await redactObjectStrings(hydrated, blockOutputOpts)
693703
}
694704
if (runFromBlock?.sourceSnapshot?.blockStates) {
695-
runFromBlock.sourceSnapshot.blockStates = await redactObjectStrings(
705+
const hydrated = await redactLargeValueRefsInValue(
696706
runFromBlock.sourceSnapshot.blockStates,
707+
largeRefOpts
708+
)
709+
runFromBlock.sourceSnapshot.blockStates = await redactObjectStrings(
710+
hydrated,
697711
blockOutputOpts
698712
)
699713
}

0 commit comments

Comments
 (0)