Skip to content

Special behaviour for temporal prefixes #1644

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions packages/common/src/reserved.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
export const TEMPORAL_RESERVED_PREFIX = '__temporal_';
export const STACK_TRACE_RESERVED_PREFIX = '__stack_trace';
export const ENHANCED_STACK_TRACE_RESERVED_PREFIX = '__enhanced_stack_trace';

export const reservedPrefixes = [
TEMPORAL_RESERVED_PREFIX,
STACK_TRACE_RESERVED_PREFIX,
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
];

export class ReservedPrefixError extends Error {
constructor(type: string, name: string, prefix: string) {
super(`Cannot use ${type} name: '${name}', with reserved prefix: '${prefix}'`);
this.name = 'ReservedPrefixError';
}
}

export function throwIfReservedName(type: string, name: string): void {
const prefix = maybeGetReservedPrefix(name);
if (prefix) {
throw new ReservedPrefixError(type, name, prefix);
}
}

export function maybeGetReservedPrefix(name: string): string | undefined {
for (const prefix of reservedPrefixes) {
if (name.startsWith(prefix)) {
return prefix;
}
}
}
197 changes: 195 additions & 2 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import { setTimeout as setTimeoutPromise } from 'timers/promises';
import { randomUUID } from 'crypto';
import asyncRetry from 'async-retry';
import { ExecutionContext } from 'ava';
import { firstValueFrom, Subject } from 'rxjs';
import { WorkflowFailedError } from '@temporalio/client';
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
import * as activity from '@temporalio/activity';
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
import { TestWorkflowEnvironment } from '@temporalio/testing';
import { CancelReason } from '@temporalio/worker/lib/activity';
import * as workflow from '@temporalio/workflow';
import { defineQuery, defineSignal } from '@temporalio/workflow';
import {
condition,
defineQuery,
defineSignal,
defineUpdate,
setDefaultQueryHandler,
setDefaultSignalHandler,
setDefaultUpdateHandler,
setHandler,
} from '@temporalio/workflow';
import { SdkFlags } from '@temporalio/workflow/lib/flags';
import {
ActivityCancellationType,
Expand All @@ -19,6 +29,7 @@ import {
TypedSearchAttributes,
WorkflowExecutionAlreadyStartedError,
} from '@temporalio/common';
import { reservedPrefixes } from '@temporalio/common/lib/reserved';
import { signalSchedulingWorkflow } from './activities/helpers';
import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
Expand Down Expand Up @@ -1414,3 +1425,185 @@ test('Workflow can return root workflow', async (t) => {
t.deepEqual(result, 'empty test-root-workflow-length');
});
});

test('Cannot register activities using reserved prefixes', async (t) => {
const { createWorker } = helpers(t);

for (const prefix of reservedPrefixes) {
const activityName = prefix + '_test';
await t.throwsAsync(
createWorker({
activities: { [activityName]: () => {} },
}),
{
name: 'ReservedPrefixError',
message: `Cannot use activity name: '${activityName}', with reserved prefix: '${prefix}'`,
}
);
}
});

test('Cannot register task queues using reserved prefixes', async (t) => {
const { createWorker } = helpers(t);

for (const prefix of reservedPrefixes) {
const taskQueue = prefix + '_test';

await t.throwsAsync(
createWorker({
taskQueue,
}),
{
name: 'ReservedPrefixError',
message: `Cannot use task queue name: '${taskQueue}', with reserved prefix: '${prefix}'`,
}
);
}
});

interface HandlerError {
name: string;
message: string;
}

export async function workflowBadPrefixHandler(prefix: string): Promise<HandlerError[]> {
// Re-package errors, default payload converter has trouble converting native errors (no 'data' field).
const expectedErrors: HandlerError[] = [];
try {
setHandler(defineSignal(prefix + '_signal'), () => {});
} catch (e) {
if (e instanceof Error) {
expectedErrors.push({ name: e.name, message: e.message });
}
}
try {
setHandler(defineUpdate(prefix + '_update'), () => {});
} catch (e) {
if (e instanceof Error) {
expectedErrors.push({ name: e.name, message: e.message });
}
}
try {
setHandler(defineQuery(prefix + '_query'), () => {});
} catch (e) {
if (e instanceof Error) {
expectedErrors.push({ name: e.name, message: e.message });
}
}
return expectedErrors;
}

test('Workflow failure if define signals/updates/queries with reserved prefixes', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker();
await worker.runUntil(async () => {
for (const prefix of reservedPrefixes) {
const result = await executeWorkflow(workflowBadPrefixHandler, {
args: [prefix],
});
t.deepEqual(result, [
{
name: 'ReservedPrefixError',
message: `Cannot use signal name: '${prefix}_signal', with reserved prefix: '${prefix}'`,
},
{
name: 'ReservedPrefixError',
message: `Cannot use update name: '${prefix}_update', with reserved prefix: '${prefix}'`,
},
{
name: 'ReservedPrefixError',
message: `Cannot use query name: '${prefix}_query', with reserved prefix: '${prefix}'`,
},
]);
}
});
});

export const wfReadyQuery = defineQuery<boolean>('wf-ready');
export async function workflowWithDefaultHandlers(): Promise<void> {
let unblocked = false;
setHandler(defineSignal('unblock'), () => {
unblocked = true;
});

setDefaultQueryHandler(() => {});
setDefaultSignalHandler(() => {});
setDefaultUpdateHandler(() => {});
setHandler(wfReadyQuery, () => true);

await condition(() => unblocked);
}

test('Default handlers fail given reserved prefix', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();

const assertWftFailure = async (handle: WorkflowHandle, errMsg: string) => {
await asyncRetry(
async () => {
const history = await handle.fetchHistory();
const wftFailedEvent = history.events?.findLast((ev) => ev.workflowTaskFailedEventAttributes);
if (wftFailedEvent === undefined) {
throw new Error('No WFT failed event found');
}
const { failure } = wftFailedEvent.workflowTaskFailedEventAttributes ?? {};
if (!failure) {
return t.fail('Expected failure in workflowTaskFailedEventAttributes');
}
t.is(failure.message, errMsg);
},
{ minTimeout: 300, factor: 1, retries: 10 }
);
};

await worker.runUntil(async () => {
for (const prefix of reservedPrefixes) {
// Reserved query
let handle = await startWorkflow(workflowWithDefaultHandlers);
await asyncRetry(async () => {
if (!(await handle.query(wfReadyQuery))) {
throw new Error('Workflow not ready yet');
}
});
const queryName = `${prefix}_query`;
await t.throwsAsync(
handle.query(queryName),
{
// ReservedPrefixError transforms to a QueryNotRegisteredError on the way back from server
name: 'QueryNotRegisteredError',
message: `Cannot use query name: '${queryName}', with reserved prefix: '${prefix}'`,
},
`Query ${queryName} should fail`
);
await handle.terminate();

// Reserved signal
handle = await startWorkflow(workflowWithDefaultHandlers);
await asyncRetry(async () => {
if (!(await handle.query(wfReadyQuery))) {
throw new Error('Workflow not ready yet');
}
});
const signalName = `${prefix}_signal`;
await handle.signal(signalName);
await assertWftFailure(handle, `Cannot use signal name: '${signalName}', with reserved prefix: '${prefix}'`);
await handle.terminate();

// Reserved update
handle = await startWorkflow(workflowWithDefaultHandlers);
await asyncRetry(async () => {
if (!(await handle.query(wfReadyQuery))) {
throw new Error('Workflow not ready yet');
}
});
const updateName = `${prefix}_update`;
handle.executeUpdate(updateName).catch(() => {
// Expect failure. The error caught here is a WorkflowNotFound because
// the workflow will have already failed, so the update cannot go through.
// We assert on the expected failure below.
});
await assertWftFailure(handle, `Cannot use update name: '${updateName}', with reserved prefix: '${prefix}'`);
await handle.terminate();
}
});
});
4 changes: 4 additions & 0 deletions packages/worker/src/worker-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { loadDataConverter } from '@temporalio/common/lib/internal-non-workflow'
import { LoggerSinks } from '@temporalio/workflow';
import { Context } from '@temporalio/activity';
import { native } from '@temporalio/core-bridge';
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
import { ActivityInboundLogInterceptor } from './activity-log-interceptor';
import { NativeConnection } from './connection';
import { CompiledWorkerInterceptors, WorkerInterceptors } from './interceptors';
Expand Down Expand Up @@ -953,6 +954,9 @@ export function compileWorkerOptions(
}

const activities = new Map(Object.entries(opts.activities ?? {}).filter(([_, v]) => typeof v === 'function'));
for (const activityName of activities.keys()) {
throwIfReservedName('activity', activityName);
}
const tuner = asNativeTuner(opts.tuner, logger);

return {
Expand Down
2 changes: 2 additions & 0 deletions packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import { workflowLogAttributes } from '@temporalio/workflow/lib/logs';
import { native } from '@temporalio/core-bridge';
import { coresdk, temporal } from '@temporalio/proto';
import { type SinkCall, type WorkflowInfo } from '@temporalio/workflow';
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
import { Activity, CancelReason, activityLogAttributes } from './activity';
import { extractNativeClient, extractReferenceHolders, InternalNativeConnection, NativeConnection } from './connection';
import { ActivityExecuteInput } from './interceptors';
Expand Down Expand Up @@ -467,6 +468,7 @@ export class Worker {
* This method initiates a connection to the server and will throw (asynchronously) on connection failure.
*/
public static async create(options: WorkerOptions): Promise<Worker> {
throwIfReservedName('task queue', options.taskQueue);
const runtime = Runtime.instance();
const logger = LoggerWithComposedMetadata.compose(runtime.logger, {
sdkComponent: SdkComponent.worker,
Expand Down
39 changes: 32 additions & 7 deletions packages/workflow/src/internals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ import {
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { makeProtoEnumConverters } from '@temporalio/common/lib/internal-workflow';
import type { coresdk, temporal } from '@temporalio/proto';
import {
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
ReservedPrefixError,
STACK_TRACE_RESERVED_PREFIX,
maybeGetReservedPrefix,
} from '@temporalio/common/lib/reserved';
import { alea, RNG } from './alea';
import { RootCancellationScope } from './cancellation-scope';
import { UpdateScope } from './update-scope';
Expand Down Expand Up @@ -260,7 +266,7 @@ export class Activator implements ActivationHandler {
*/
public readonly queryHandlers = new Map<string, WorkflowQueryAnnotatedType>([
[
'__stack_trace',
STACK_TRACE_RESERVED_PREFIX,
{
handler: () => {
return this.getStackTraces()
Expand All @@ -271,7 +277,7 @@ export class Activator implements ActivationHandler {
},
],
[
'__enhanced_stack_trace',
ENHANCED_STACK_TRACE_RESERVED_PREFIX,
{
handler: (): EnhancedStackTrace => {
const { sourceMap } = this;
Expand Down Expand Up @@ -679,11 +685,17 @@ export class Activator implements ActivationHandler {
throw new TypeError('Missing query activation attributes');
}

const execute = composeInterceptors(
this.interceptors.inbound,
'handleQuery',
this.queryWorkflowNextHandler.bind(this)
);
const reservedPrefix = maybeGetReservedPrefix(queryType);
if (reservedPrefix) {
// Must have (internal) query handler for reserved query.
if (!this.queryHandlers.has(queryType)) {
throw new ReservedPrefixError('query', queryType, reservedPrefix);
}
}

// Skip interceptors if it is an internal query
const interceptors = reservedPrefix ? [] : this.interceptors.inbound;
const execute = composeInterceptors(interceptors, 'handleQuery', this.queryWorkflowNextHandler.bind(this));
execute({
queryName: queryType,
args: arrayFromPayloads(this.payloadConverter, activation.arguments),
Expand All @@ -706,6 +718,11 @@ export class Activator implements ActivationHandler {
if (!protocolInstanceId) {
throw new TypeError('Missing activation update protocolInstanceId');
}
const reservedPrefix = maybeGetReservedPrefix(name);
if (reservedPrefix && !this.updateHandlers.get(name)) {
// Must have (internal) update handler for reserved update.
throw new ReservedPrefixError('update', name, reservedPrefix);
}

const entry =
this.updateHandlers.get(name) ??
Expand Down Expand Up @@ -859,6 +876,14 @@ export class Activator implements ActivationHandler {
throw new TypeError('Missing activation signalName');
}

const reservedPrefix = maybeGetReservedPrefix(signalName);
if (reservedPrefix) {
if (!this.signalHandlers.has(signalName)) {
// Must have (internal) signal handler for reserved signal.
throw new ReservedPrefixError('signal', signalName, reservedPrefix);
}
}

if (!this.signalHandlers.has(signalName) && !this.defaultSignalHandler) {
this.bufferedSignals.push(activation);
return;
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { versioningIntentToProto } from '@temporalio/common/lib/versioning-inten
import { Duration, msOptionalToTs, msToNumber, msToTs, requiredTsToMs } from '@temporalio/common/lib/time';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { temporal } from '@temporalio/proto';
import { throwIfReservedName } from '@temporalio/common/lib/reserved';
import { CancellationScope, registerSleepImplementation } from './cancellation-scope';
import { UpdateScope } from './update-scope';
import {
Expand Down Expand Up @@ -1272,6 +1273,8 @@ export function setHandler<
options?: QueryHandlerOptions | SignalHandlerOptions | UpdateHandlerOptions<Args>
): void {
const activator = assertInWorkflowContext('Workflow.setHandler(...) may only be used from a Workflow Execution.');
// Cannot register handler for reserved names
throwIfReservedName(def.type, def.name);
const description = options?.description;
if (def.type === 'update') {
if (typeof handler === 'function') {
Expand Down
Loading