Skip to content

Commit 9facf85

Browse files
authored
feat(worker): Expose Poller Automation (#1704)
1 parent 94aad7d commit 9facf85

File tree

7 files changed

+262
-20
lines changed

7 files changed

+262
-20
lines changed

packages/core-bridge/src/worker.rs

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -410,8 +410,9 @@ mod config {
410410
ResourceBasedSlotsOptions, ResourceBasedSlotsOptionsBuilder, ResourceSlotOptions,
411411
SlotSupplierOptions as CoreSlotSupplierOptions, TunerHolder, TunerHolderOptionsBuilder,
412412
api::worker::{
413-
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior, SlotKind, WorkerConfig,
414-
WorkerConfigBuilder, WorkerConfigBuilderError, WorkflowSlotKind,
413+
ActivitySlotKind, LocalActivitySlotKind, PollerBehavior as CorePollerBehavior,
414+
SlotKind, WorkerConfig, WorkerConfigBuilder, WorkerConfigBuilderError,
415+
WorkflowSlotKind,
415416
},
416417
};
417418

@@ -428,8 +429,8 @@ mod config {
428429
namespace: String,
429430
tuner: WorkerTuner,
430431
non_sticky_to_sticky_poll_ratio: f32,
431-
max_concurrent_workflow_task_polls: usize,
432-
max_concurrent_activity_task_polls: usize,
432+
workflow_task_poller_behavior: PollerBehavior,
433+
activity_task_poller_behavior: PollerBehavior,
433434
enable_non_local_activities: bool,
434435
sticky_queue_schedule_to_start_timeout: Duration,
435436
max_cached_workflows: usize,
@@ -440,6 +441,18 @@ mod config {
440441
shutdown_grace_time: Option<Duration>,
441442
}
442443

444+
#[derive(TryFromJs)]
445+
pub enum PollerBehavior {
446+
SimpleMaximum {
447+
maximum: usize,
448+
},
449+
Autoscaling {
450+
minimum: usize,
451+
maximum: usize,
452+
initial: usize,
453+
},
454+
}
455+
443456
impl BridgeWorkerOptions {
444457
pub(crate) fn into_core_config(self) -> Result<WorkerConfig, WorkerConfigBuilderError> {
445458
// Set all other options
@@ -452,12 +465,8 @@ mod config {
452465
.namespace(self.namespace)
453466
.tuner(self.tuner.into_core_config()?)
454467
.nonsticky_to_sticky_poll_ratio(self.non_sticky_to_sticky_poll_ratio)
455-
.workflow_task_poller_behavior(PollerBehavior::SimpleMaximum(
456-
self.max_concurrent_workflow_task_polls,
457-
))
458-
.activity_task_poller_behavior(PollerBehavior::SimpleMaximum(
459-
self.max_concurrent_activity_task_polls,
460-
))
468+
.workflow_task_poller_behavior(self.workflow_task_poller_behavior)
469+
.activity_task_poller_behavior(self.activity_task_poller_behavior)
461470
.no_remote_activities(!self.enable_non_local_activities)
462471
.sticky_queue_schedule_to_start_timeout(self.sticky_queue_schedule_to_start_timeout)
463472
.max_cached_workflows(self.max_cached_workflows)
@@ -470,6 +479,25 @@ mod config {
470479
}
471480
}
472481

482+
impl From<PollerBehavior> for CorePollerBehavior {
483+
fn from(val: PollerBehavior) -> Self {
484+
match val {
485+
PollerBehavior::SimpleMaximum { maximum } => {
486+
CorePollerBehavior::SimpleMaximum(maximum)
487+
}
488+
PollerBehavior::Autoscaling {
489+
minimum,
490+
maximum,
491+
initial,
492+
} => CorePollerBehavior::Autoscaling {
493+
minimum,
494+
maximum,
495+
initial,
496+
},
497+
}
498+
}
499+
}
500+
473501
#[derive(TryFromJs)]
474502
#[allow(clippy::struct_field_names)]
475503
pub(super) struct WorkerTuner {

packages/core-bridge/ts/native.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,8 @@ export interface WorkerOptions {
179179
namespace: string;
180180
tuner: WorkerTunerOptions;
181181
nonStickyToStickyPollRatio: number;
182-
maxConcurrentWorkflowTaskPolls: number;
183-
maxConcurrentActivityTaskPolls: number;
182+
workflowTaskPollerBehavior: PollerBehavior;
183+
activityTaskPollerBehavior: PollerBehavior;
184184
enableNonLocalActivities: boolean;
185185
stickyQueueScheduleToStartTimeout: number;
186186
maxCachedWorkflows: number;
@@ -191,6 +191,18 @@ export interface WorkerOptions {
191191
shutdownGraceTime: number;
192192
}
193193

194+
export type PollerBehavior =
195+
| {
196+
type: 'simple-maximum';
197+
maximum: number;
198+
}
199+
| {
200+
type: 'autoscaling';
201+
minimum: number;
202+
maximum: number;
203+
initial: number;
204+
};
205+
194206
////////////////////////////////////////////////////////////////////////////////////////////////////
195207
// Worker Tuner
196208
////////////////////////////////////////////////////////////////////////////////////////////////////

packages/test/src/test-bridge.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -277,8 +277,16 @@ const GenericConfigs = {
277277
},
278278
},
279279
nonStickyToStickyPollRatio: 0.5,
280-
maxConcurrentWorkflowTaskPolls: 1,
281-
maxConcurrentActivityTaskPolls: 1,
280+
workflowTaskPollerBehavior: {
281+
type: 'simple-maximum',
282+
maximum: 1,
283+
},
284+
activityTaskPollerBehavior: {
285+
type: 'autoscaling',
286+
minimum: 1,
287+
initial: 5,
288+
maximum: 100,
289+
},
282290
enableNonLocalActivities: false,
283291
stickyQueueScheduleToStartTimeout: 1000,
284292
maxCachedWorkflows: 1000,
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import { v4 as uuid } from 'uuid';
2+
import fetch from 'node-fetch';
3+
import test from 'ava';
4+
import { Runtime, Worker } from '@temporalio/worker';
5+
import { getRandomPort, TestWorkflowEnvironment } from './helpers';
6+
import * as activities from './activities';
7+
import * as workflows from './workflows';
8+
9+
test.serial('Can run autoscaling polling worker', async (t) => {
10+
const port = await getRandomPort();
11+
Runtime.install({
12+
telemetryOptions: {
13+
metrics: {
14+
prometheus: {
15+
bindAddress: `127.0.0.1:${port}`,
16+
},
17+
},
18+
},
19+
});
20+
const localEnv = await TestWorkflowEnvironment.createLocal();
21+
22+
try {
23+
const taskQueue = `autoscale-pollers-${uuid()}`;
24+
const worker = await Worker.create({
25+
workflowsPath: require.resolve('./workflows'),
26+
activities,
27+
connection: localEnv.nativeConnection,
28+
taskQueue,
29+
workflowTaskPollerBehavior: {
30+
type: 'autoscaling',
31+
initial: 2,
32+
},
33+
activityTaskPollerBehavior: {
34+
type: 'autoscaling',
35+
initial: 2,
36+
},
37+
});
38+
const workerPromise = worker.run();
39+
40+
// Give pollers a beat to start
41+
await new Promise((resolve) => setTimeout(resolve, 300));
42+
43+
const resp = await fetch(`http://127.0.0.1:${port}/metrics`);
44+
const metricsText = await resp.text();
45+
const metricsLines = metricsText.split('\n');
46+
47+
const matches = metricsLines.filter((l) => l.includes('temporal_num_pollers'));
48+
const activity_pollers = matches.filter((l) => l.includes('activity_task'));
49+
t.is(activity_pollers.length, 1, 'Should have exactly one activity poller metric');
50+
t.true(activity_pollers[0].endsWith('2'), 'Activity poller count should be 2');
51+
const workflow_pollers = matches.filter((l) => l.includes('workflow_task'));
52+
t.is(workflow_pollers.length, 2, 'Should have exactly two workflow poller metrics (sticky and non-sticky)');
53+
54+
// There's sticky & non-sticky pollers, and they may have a count of 1 or 2 depending on
55+
// initialization timing.
56+
t.true(
57+
workflow_pollers[0].endsWith('2') || workflow_pollers[0].endsWith('1'),
58+
'First workflow poller count should be 1 or 2'
59+
);
60+
t.true(
61+
workflow_pollers[1].endsWith('2') || workflow_pollers[1].endsWith('1'),
62+
'Second workflow poller count should be 1 or 2'
63+
);
64+
65+
const workflowPromises = Array(20)
66+
.fill(0)
67+
.map(async (_) => {
68+
const handle = await localEnv.client.workflow.start(workflows.waitOnSignalThenActivity, {
69+
taskQueue,
70+
workflowId: `resource-based-${uuid()}`,
71+
});
72+
await handle.signal('my-signal', 'finish');
73+
return handle.result();
74+
});
75+
76+
await Promise.all(workflowPromises);
77+
worker.shutdown();
78+
await workerPromise;
79+
t.pass();
80+
} finally {
81+
await localEnv.teardown();
82+
}
83+
});

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,4 @@ export * from './wait-on-user';
9191
export * from './workflow-cancellation-scenarios';
9292
export * from './upsert-and-read-memo';
9393
export * from './updates-ordering';
94+
export * from './wait-on-signal-then-activity';
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import * as wf from '@temporalio/workflow';
2+
import type * as activities from '../activities';
3+
4+
const { echo } = wf.proxyActivities<typeof activities>({ startToCloseTimeout: '5s' });
5+
export const mySignal = wf.defineSignal<[string]>('my-signal');
6+
export async function waitOnSignalThenActivity(): Promise<void> {
7+
let lastSignal = '<none>';
8+
9+
wf.setHandler(mySignal, (value: string) => {
10+
lastSignal = value;
11+
});
12+
13+
await wf.condition(() => lastSignal === 'finish');
14+
await echo('hi');
15+
}

0 commit comments

Comments
 (0)