Skip to content

Commit 3ee7227

Browse files
committed
simplify cancellation details tests, reduce flakiness
1 parent 9569714 commit 3ee7227

File tree

3 files changed

+78
-61
lines changed

3 files changed

+78
-61
lines changed

packages/test/src/activities/heartbeat-cancellation-details.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ export async function heartbeatCancellationDetailsActivity(
1111
// eslint-disable-next-line no-constant-condition
1212
while (true) {
1313
try {
14-
activity.heartbeat();
14+
activity.heartbeat('heartbeated');
1515
await activity.sleep(300);
1616
} catch (err) {
1717
if (err instanceof activity.CancelledFailure && catchErr) {

packages/test/src/helpers-integration.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,28 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId:
311311
}, 15000);
312312
}
313313

314+
// Helper function to check if an activity has heartbeated
315+
export async function hasActivityHeartbeat(
316+
handle: WorkflowHandle,
317+
activityId: string,
318+
expectedContent?: string
319+
): Promise<boolean> {
320+
const { raw } = await handle.describe();
321+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
322+
const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data;
323+
if (!heartbeatData) return false;
324+
325+
// If no expected content specified, just check that heartbeat data exists
326+
if (!expectedContent) return true;
327+
328+
try {
329+
const decoded = Buffer.from(heartbeatData).toString();
330+
return decoded.includes(expectedContent);
331+
} catch {
332+
return false;
333+
}
334+
}
335+
314336
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
315337
return configurableHelpers(t, t.context.workflowBundle, testEnv);
316338
}

packages/test/src/test-integration-workflows.ts

Lines changed: 55 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises';
22
import { randomUUID } from 'crypto';
33
import { ExecutionContext } from 'ava';
44
import { firstValueFrom, Subject } from 'rxjs';
5-
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
5+
import { WorkflowFailedError } from '@temporalio/client';
66
import * as activity from '@temporalio/activity';
77
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
88
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -27,6 +27,7 @@ import * as workflows from './workflows';
2727
import {
2828
Context,
2929
createLocalTestEnvironment,
30+
hasActivityHeartbeat,
3031
helpers,
3132
makeTestFunction,
3233
setActivityPauseState,
@@ -1449,6 +1450,55 @@ test('Workflow can return root workflow', async (t) => {
14491450
});
14501451
});
14511452

1453+
export async function heartbeatPauseWorkflowBasic(
1454+
activityId: string,
1455+
catchErr: boolean
1456+
): Promise<ActivityCancellationDetails | undefined> {
1457+
const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({
1458+
startToCloseTimeout: '5s',
1459+
activityId,
1460+
retry: {
1461+
maximumAttempts: 1,
1462+
},
1463+
heartbeatTimeout: '1s',
1464+
});
1465+
1466+
return await heartbeatCancellationDetailsActivity(catchErr);
1467+
}
1468+
1469+
test('Activity pause returns expected cancellation details', async (t) => {
1470+
const { createWorker, startWorkflow } = helpers(t);
1471+
1472+
const worker = await createWorker({
1473+
activities: {
1474+
heartbeatCancellationDetailsActivity,
1475+
},
1476+
});
1477+
1478+
await worker.runUntil(async () => {
1479+
const testActivityId = randomUUID();
1480+
const handle = await startWorkflow(heartbeatPauseWorkflowBasic, {
1481+
args: [testActivityId, true],
1482+
});
1483+
1484+
// Wait for activity to start heartbeating
1485+
await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'heartbeated'), 5000);
1486+
// Now pause the activity
1487+
await setActivityPauseState(handle, testActivityId, true);
1488+
// Get the result - should contain pause cancellation details
1489+
const result = await handle.result();
1490+
1491+
t.deepEqual(result, {
1492+
cancelRequested: false,
1493+
notFound: false,
1494+
paused: true,
1495+
timedOut: false,
1496+
workerShutdown: false,
1497+
reset: false,
1498+
});
1499+
});
1500+
});
1501+
14521502
export const activityStartedQuery = workflow.defineQuery<boolean, [number]>('activityStarted');
14531503
export const proceedSignal = workflow.defineSignal<[]>('proceed');
14541504

@@ -1487,79 +1537,24 @@ export async function heartbeatPauseWorkflow(
14871537
proceed = true;
14881538
});
14891539

1490-
activity1Started = true;
14911540
const promise1 = heartbeatCancellationDetailsActivity(catchErr);
1541+
activity1Started = true;
14921542

14931543
// Wait for the test to pause activity 1 and signal us to continue
14941544
await workflow.condition(() => proceed);
14951545
proceed = false; // reset for next step
14961546

1497-
activity2Started = true;
14981547
const promise2 = heartbeatCancellationDetailsActivity2(catchErr);
1548+
activity2Started = true;
14991549

15001550
// Wait for the test to pause activity 2 and signal us to continue
15011551
await workflow.condition(() => proceed);
15021552

15031553
return Promise.all([promise1, promise2]);
15041554
}
15051555

1506-
test('Activity pause returns expected cancellation details', async (t) => {
1507-
const { createWorker, startWorkflow } = helpers(t);
1508-
const worker = await createWorker({
1509-
activities: {
1510-
heartbeatCancellationDetailsActivity,
1511-
heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity,
1512-
},
1513-
});
1514-
1515-
await worker.runUntil(async () => {
1516-
const testActivityId = randomUUID();
1517-
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
1518-
1519-
await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000);
1520-
await setActivityPauseState(handle, testActivityId, true);
1521-
await handle.signal(proceedSignal);
1522-
1523-
await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000);
1524-
await setActivityPauseState(handle, `${testActivityId}-2`, true);
1525-
await handle.signal(proceedSignal);
1526-
1527-
const result = await handle.result();
1528-
t.deepEqual(result[0], {
1529-
cancelRequested: false,
1530-
notFound: false,
1531-
paused: true,
1532-
timedOut: false,
1533-
workerShutdown: false,
1534-
reset: false,
1535-
});
1536-
t.deepEqual(result[1], {
1537-
cancelRequested: false,
1538-
notFound: false,
1539-
paused: true,
1540-
timedOut: false,
1541-
workerShutdown: false,
1542-
reset: false,
1543-
});
1544-
});
1545-
});
1546-
15471556
test('Activity can pause and unpause', async (t) => {
15481557
const { createWorker, startWorkflow } = helpers(t);
1549-
async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string): Promise<boolean> {
1550-
const { raw } = await handle.describe();
1551-
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
1552-
if (!activityInfo) return false;
1553-
1554-
if (activityInfo.heartbeatDetails?.payloads) {
1555-
for (const payload of activityInfo.heartbeatDetails.payloads) {
1556-
if (payload.data && payload.data.length > 0) {
1557-
return true;
1558-
}
1559-
}
1560-
}
1561-
return false;
1562-
}
15631558

15641559
const worker = await createWorker({
15651560
activities: {
@@ -1574,13 +1569,13 @@ test('Activity can pause and unpause', async (t) => {
15741569

15751570
await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000);
15761571
await setActivityPauseState(handle, testActivityId, true);
1577-
await waitUntil(async () => checkHeartbeatDetailsExist(handle, testActivityId), 5000);
1572+
await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 5000);
15781573
await setActivityPauseState(handle, testActivityId, false);
15791574
await handle.signal(proceedSignal);
15801575

15811576
await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000);
15821577
await setActivityPauseState(handle, `${testActivityId}-2`, true);
1583-
await waitUntil(async () => checkHeartbeatDetailsExist(handle, `${testActivityId}-2`), 5000);
1578+
await waitUntil(async () => hasActivityHeartbeat(handle, `${testActivityId}-2`, 'finally-complete'), 5000);
15841579
await setActivityPauseState(handle, `${testActivityId}-2`, false);
15851580
await handle.signal(proceedSignal);
15861581

0 commit comments

Comments
 (0)