Skip to content

Commit 8170bc5

Browse files
committed
init impl for special behaviour for temporal prefixes. Default signal test needs to be fixed, need to add behaviour reserving prefixes from workflows, and waiting for default update to be merged to add behaviour preventing default update handler to be called with reserved names
1 parent 4155dad commit 8170bc5

File tree

9 files changed

+217
-9
lines changed

9 files changed

+217
-9
lines changed

packages/common/src/reserved.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
export const TEMPORAL_RESERVED_PREFIX = '__temporal_'
2+
export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace'
3+
export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace'
4+
5+
export const reservedPrefixes = [TEMPORAL_RESERVED_PREFIX, STACK_TRACE_RESERVED_PREFIX, ENHANCED_STACK_TRACE_RESERVED_PREFIX]
6+
7+
export function throwIfReservedName(type: string, name: string) {
8+
const prefix = isReservedName(name)
9+
if (prefix) {
10+
throw Error(`Cannot register ${type} name: '${name}', with reserved prefix: '${prefix}'`)
11+
}
12+
}
13+
14+
export function isReservedName(name: string): string | undefined {
15+
for (const prefix of reservedPrefixes) {
16+
if (name.startsWith(prefix)) {
17+
return prefix
18+
}
19+
}
20+
}

packages/test/src/test-integration-split-two.ts

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ import {
1414
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
1515
import { decode as payloadDecode, decodeFromPayloadsAtIndex } from '@temporalio/common/lib/internal-non-workflow';
1616

17-
import { condition, defineQuery, defineSignal, setDefaultQueryHandler, setHandler, sleep } from '@temporalio/workflow';
17+
import { condition, defineQuery, defineSignal, defineUpdate, setDefaultQueryHandler, setDefaultSignalHandler, setHandler, sleep } from '@temporalio/workflow';
1818
import { configurableHelpers, createTestWorkflowBundle } from './helpers-integration';
1919
import * as activities from './activities';
2020
import * as workflows from './workflows';
2121
import { makeTestFn, configMacro } from './helpers-integration-multi-codec';
22+
import { reservedPrefixes } from '@temporalio/common/src/reserved';
2223

2324
// Note: re-export shared workflows (or long workflows)
2425
// - review the files where these workflows are shared
@@ -751,3 +752,78 @@ test('default query handler is not used if requested query exists', configMacro,
751752
t.deepEqual(result, { name: definedQuery.name, args });
752753
});
753754
});
755+
756+
test('Cannot register activities using reserved prefixes', configMacro, async (t, config) => {
757+
const { createWorkerWithDefaults } = config;
758+
759+
for (const prefix of reservedPrefixes) {
760+
const activityName = prefix + "_test"
761+
await t.throwsAsync(createWorkerWithDefaults(t, {
762+
activities: { [activityName]: () => {} }
763+
}), { instanceOf: Error, message: `Cannot register activity name: '${activityName}', with reserved prefix: '${prefix}'`})
764+
}
765+
})
766+
767+
test('Cannot register task queues using reserved prefixes', configMacro, async (t, config) => {
768+
const { createWorkerWithDefaults } = config;
769+
770+
for (const prefix of reservedPrefixes) {
771+
const taskQueue = prefix + "_test"
772+
773+
await t.throwsAsync(createWorkerWithDefaults(t, {
774+
taskQueue,
775+
}), { instanceOf: Error, message: `Cannot register task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`})
776+
}
777+
})
778+
779+
interface HandlerError {
780+
name: string;
781+
message: string;
782+
}
783+
784+
export async function workflowBadPrefixHandler(prefix: string): Promise<HandlerError[]> {
785+
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
786+
const expectedErrors: HandlerError[] = []
787+
try {
788+
setHandler(defineSignal(prefix + '_signal'), () => {})
789+
} catch (e) {
790+
if (e instanceof Error) {
791+
expectedErrors.push({ name: e.name, message: e.message })
792+
}
793+
}
794+
try {
795+
setHandler(defineUpdate(prefix + '_update'), () => {})
796+
} catch (e) {
797+
if (e instanceof Error) {
798+
expectedErrors.push({ name: e.name, message: e.message })
799+
}
800+
}
801+
try {
802+
setHandler(defineQuery(prefix + '_query'), () => {})
803+
} catch (e) {
804+
if (e instanceof Error) {
805+
expectedErrors.push({ name: e.name, message: e.message })
806+
}
807+
}
808+
return expectedErrors
809+
}
810+
811+
test('Workflow failure if define signals/updates/queries with reserved prefixes', configMacro, async (t, config) => {
812+
const { env, createWorkerWithDefaults } = config;
813+
const { executeWorkflow } = configurableHelpers(t, t.context.workflowBundle, env);
814+
const worker = await createWorkerWithDefaults(t);
815+
await worker.runUntil(async () => {
816+
const prefix = reservedPrefixes[0]
817+
// for (const prefix of reservedPrefixes) {
818+
const result = await executeWorkflow(workflowBadPrefixHandler, {
819+
args: [prefix]
820+
});
821+
console.log("result", result)
822+
t.deepEqual(result, [
823+
{ name: 'Error', message: `Cannot register signal name: '${prefix}_signal', with reserved prefix: '${prefix}'`},
824+
{ name: 'Error', message: `Cannot register update name: '${prefix}_update', with reserved prefix: '${prefix}'`},
825+
{ name: 'Error', message: `Cannot register query name: '${prefix}_query', with reserved prefix: '${prefix}'`},
826+
])
827+
// }
828+
})
829+
})

packages/test/src/test-workflows.ts

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { parseWorkflowCode } from '@temporalio/worker/lib/worker';
2323
import * as activityFunctions from './activities';
2424
import { cleanStackTrace, REUSE_V8_CONTEXT, u8 } from './helpers';
2525
import { ProcessedSignal } from './workflows';
26+
import { reservedPrefixes } from '@temporalio/common/src/reserved';
2627

2728
export interface Context {
2829
workflow: VMWorkflow | ReusableVMWorkflow;
@@ -2528,3 +2529,77 @@ test('Signals/Updates/Activities/Timers - Trace promises completion order - 1.11
25282529
);
25292530
}
25302531
});
2532+
2533+
test('Default query handler fail activations with reserved names - workflowWithDefaultHandlers', async (t) => {
2534+
const { workflowType } = t.context;
2535+
2536+
await activate(
2537+
t,
2538+
makeActivation(
2539+
undefined,
2540+
makeInitializeWorkflowJob(workflowType),
2541+
),
2542+
);
2543+
2544+
for (const prefix of reservedPrefixes) {
2545+
const completion = await activate(
2546+
t,
2547+
makeActivation(
2548+
undefined,
2549+
makeQueryWorkflowJob("1", prefix + '_query')
2550+
),
2551+
);
2552+
2553+
compareCompletion(
2554+
t,
2555+
completion,
2556+
{
2557+
failed: {
2558+
failure: {
2559+
...completion.failed?.failure,
2560+
// We only care about the error message.
2561+
message: `Cannot register query name: \'${prefix}_query\', with reserved prefix: \'${prefix}\'`,
2562+
}
2563+
}
2564+
}
2565+
);
2566+
}
2567+
});
2568+
2569+
test('Default signal handler fail activations with reserved names - workflowWithDefaultHandlers', async (t) => {
2570+
const { workflowType } = t.context;
2571+
2572+
await activate(
2573+
t,
2574+
makeActivation(
2575+
undefined,
2576+
makeInitializeWorkflowJob(workflowType),
2577+
),
2578+
);
2579+
2580+
for (const prefix of reservedPrefixes) {
2581+
const job = makeSignalWorkflowJob(prefix + '_signal', []);
2582+
2583+
const completion = await activate(
2584+
t,
2585+
makeActivation(
2586+
undefined,
2587+
job
2588+
),
2589+
);
2590+
2591+
compareCompletion(
2592+
t,
2593+
completion,
2594+
{
2595+
failed: {
2596+
failure: {
2597+
...completion.failed?.failure,
2598+
// We only care about the error message.
2599+
message: `Cannot register signal name: \'${prefix}_signal\', with reserved prefix: \'${prefix}\'`,
2600+
}
2601+
}
2602+
}
2603+
);
2604+
}
2605+
});

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,3 +89,4 @@ export * from './upsert-and-read-search-attributes';
8989
export * from './wait-on-user';
9090
export * from './workflow-cancellation-scenarios';
9191
export * from './upsert-and-read-memo';
92+
export * from './workflow-with-default-handlers';
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { condition, defineSignal, setDefaultQueryHandler, setDefaultSignalHandler, setHandler } from "@temporalio/workflow";
2+
3+
export async function workflowWithDefaultHandlers() {
4+
let complete = true;
5+
setDefaultQueryHandler(() => {});
6+
setDefaultSignalHandler(() => {});
7+
setHandler(defineSignal('completeSignal'), () => {});
8+
9+
condition(() => complete);
10+
}

packages/worker/src/worker-options.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { InjectedSinks } from './sinks';
1818
import { MiB } from './utils';
1919
import { WorkflowBundleWithSourceMap } from './workflow/bundler';
2020
import { asNativeTuner, WorkerTuner } from './worker-tuner';
21+
import { throwIfReservedName } from '@temporalio/common/src/reserved';
2122

2223
export type { WebpackConfiguration };
2324

@@ -822,6 +823,9 @@ export function compileWorkerOptions(rawOpts: WorkerOptions, logger: Logger): Co
822823
}
823824

824825
const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function'));
826+
for (const activityName of activities.keys()) {
827+
throwIfReservedName('activity', activityName)
828+
}
825829
const tuner = asNativeTuner(opts.tuner, logger);
826830

827831
return {

packages/worker/src/worker.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ import { ThreadedVMWorkflowCreator } from './workflow/threaded-vm';
8888
import { VMWorkflowCreator } from './workflow/vm';
8989
import { WorkflowBundleWithSourceMapAndFilename } from './workflow/workflow-worker-thread/input';
9090
import { CombinedWorkerRunError, GracefulShutdownPeriodExpiredError, PromiseCompletionTimeoutError } from './errors';
91+
import { throwIfReservedName } from '@temporalio/common/src/reserved';
9192

9293
export { DataConverter, defaultPayloadConverter };
9394

@@ -462,6 +463,7 @@ export class Worker {
462463
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
463464
*/
464465
public static async create(options: WorkerOptions): Promise<Worker> {
466+
throwIfReservedName('task queue', options.taskQueue)
465467
const logger = withMetadata(Runtime.instance().logger, {
466468
sdkComponent: SdkComponent.worker,
467469
taskQueue: options.taskQueue ?? 'default',

packages/workflow/src/internals.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ import { untrackPromise } from './stack-helpers';
4848
import pkg from './pkg';
4949
import { SdkFlag, assertValidFlag } from './flags';
5050
import { executeWithLifecycleLogging, log } from './logs';
51+
import { ENHANCED_STACK_TRACE_RESERVED_PREFIX, isReservedName, throwIfReservedName } from '@temporalio/common/src/reserved';
52+
import { STACK_TRACE_RESERVED_PREFIX } from '../../common/src/reserved';
5153

5254
const StartChildWorkflowExecutionFailedCause = {
5355
WORKFLOW_ALREADY_EXISTS: 'WORKFLOW_ALREADY_EXISTS',
@@ -249,7 +251,7 @@ export class Activator implements ActivationHandler {
249251
*/
250252
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
251253
[
252-
'__stack_trace',
254+
STACK_TRACE_RESERVED_PREFIX,
253255
{
254256
handler: () => {
255257
return this.getStackTraces()
@@ -260,7 +262,7 @@ export class Activator implements ActivationHandler {
260262
},
261263
],
262264
[
263-
'__enhanced_stack_trace',
265+
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
264266
{
265267
handler: (): EnhancedStackTrace => {
266268
const { sourceMap } = this;
@@ -619,6 +621,8 @@ export class Activator implements ActivationHandler {
619621
protected queryWorkflowNextHandler({ queryName, args }: QueryInput): Promise<unknown> {
620622
let fn = this.queryHandlers.get(queryName)?.handler;
621623
if (fn === undefined && this.defaultQueryHandler !== undefined) {
624+
// Do not call default query handler with reserved query name.
625+
throwIfReservedName('query', queryName)
622626
fn = this.defaultQueryHandler.bind(this, queryName);
623627
}
624628
// No handler or default registered, fail.
@@ -649,17 +653,28 @@ export class Activator implements ActivationHandler {
649653
throw new TypeError('Missing query activation attributes');
650654
}
651655

656+
const queryInput = {
657+
queryName: queryType,
658+
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
659+
queryId,
660+
headers: headers ?? {},
661+
}
662+
663+
// Skip interceptors if this is an internal query.
664+
if (isReservedName(queryType)) {
665+
this.queryWorkflowNextHandler(queryInput).then(
666+
(result) => this.completeQuery(queryId, result),
667+
(reason) => this.failQuery(queryId, reason)
668+
);
669+
return
670+
}
671+
652672
const execute = composeInterceptors(
653673
this.interceptors.inbound,
654674
'handleQuery',
655675
this.queryWorkflowNextHandler.bind(this)
656676
);
657-
execute({
658-
queryName: queryType,
659-
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
660-
queryId,
661-
headers: headers ?? {},
662-
}).then(
677+
execute(queryInput).then(
663678
(result) => this.completeQuery(queryId, result),
664679
(reason) => this.failQuery(queryId, reason)
665680
);
@@ -797,6 +812,8 @@ export class Activator implements ActivationHandler {
797812
if (fn) {
798813
return await fn(...args);
799814
} else if (this.defaultSignalHandler) {
815+
// Do not call default signal handler with reserved signal name.
816+
throwIfReservedName('signal', signalName)
800817
return await this.defaultSignalHandler(signalName, ...args);
801818
} else {
802819
throw new IllegalStateError(`No registered signal handler for signal: ${signalName}`);

packages/workflow/src/workflow.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ import { LocalActivityDoBackoff } from './errors';
5656
import { assertInWorkflowContext, getActivator, maybeGetActivator } from './global-attributes';
5757
import { untrackPromise } from './stack-helpers';
5858
import { ChildWorkflowHandle, ExternalWorkflowHandle } from './workflow-handle';
59+
import { throwIfReservedName } from '@temporalio/common/src/reserved';
5960

6061
// Avoid a circular dependency
6162
registerSleepImplementation(sleep);
@@ -1258,6 +1259,8 @@ export function setHandler<
12581259
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
12591260
): void {
12601261
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
1262+
// Cannot register handler for reserved names
1263+
throwIfReservedName(def.type, def.name)
12611264
const description = options?.description;
12621265
if (def.type === 'update') {
12631266
if (typeof handler === 'function') {

0 commit comments

Comments
 (0)