Skip to content

Commit 79eb6a5

Browse files
committed
feat(control-plane): add support for handling multiple events in a single invocation
Currently we restrict the `scale-up` Lambda to only handle a single event at a time. In very busy environments this can prove to be a bottleneck: there are calls to GitHub and AWS APIs that happen each time, and they can end up taking long enough that we can't process job queued events faster than they arrive. In our environment we are also using a pool, and typically we have responded to the alerts generated by this (SQS queue length growing) by expanding the size of the pool. This helps because we will more frequently find that we don't need to scale up, which allows the lambdas to exit a bit earlier, so we can get through the queue faster. But it makes the environment much less responsive to changes in usage patterns. At its core, this Lambda's task is to construct an EC2 `CreateFleet` call to create instances, after working out how many are needed. This is a job that can be batched. We can take any number of events, calculate the diff between our current state and the number of jobs we have, capping at the maximum, and then issue a single call. The thing to be careful about is how to handle partial failures, if EC2 creates some of the instances we wanted but not all of them. Lambda has a configurable function response type which can be set to `ReportBatchItemFailures`. In this mode, we return a list of failed messages from our handler and those are retried. We can make use of this to give back as many events as we failed to process. Now we're potentially processing multiple events in a single Lambda, one thing we should optimise for is not recreating GitHub API clients. We need one client for the app itself, which we use to find out installation IDs, and then one client for each installation which is relevant to the batch of events we are processing. This is done by creating a new client the first time we see an event for a given installation. We also remove the same `batch_size = 1` constraint from the `job-retry` Lambda and make it configurable instead, using AWS's default of 10 for SQS if not configured. This Lambda is used to retry events that previously failed. However, instead of reporting failures to be retried, here we maintain the pre-existing fault-tolerant behaviour where errors are logged but explicitly do not cause message retries, avoiding infinite loops from persistent GitHub API issues or malformed events. Tests are added for all of this.
1 parent f2dd8a4 commit 79eb6a5

File tree

22 files changed

+1468
-376
lines changed

22 files changed

+1468
-376
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ Join our discord community via [this invite link](https://discord.gg/bxgXW8jJGh)
155155
| <a name="input_key_name"></a> [key\_name](#input\_key\_name) | Key pair name | `string` | `null` | no |
156156
| <a name="input_kms_key_arn"></a> [kms\_key\_arn](#input\_kms\_key\_arn) | Optional CMK Key ARN to be used for Parameter Store. This key must be in the current account. | `string` | `null` | no |
157157
| <a name="input_lambda_architecture"></a> [lambda\_architecture](#input\_lambda\_architecture) | AWS Lambda architecture. Lambda functions using Graviton processors ('arm64') tend to have better price/performance than 'x86\_64' functions. | `string` | `"arm64"` | no |
158+
| <a name="input_lambda_event_source_mapping_batch_size"></a> [lambda\_event\_source\_mapping\_batch\_size](#input\_lambda\_event\_source\_mapping\_batch\_size) | Maximum number of records to pass to the lambda function in a single batch for the event source mapping. When not set, the AWS default of 10 events will be used. | `number` | `10` | no |
159+
| <a name="input_lambda_event_source_mapping_maximum_batching_window_in_seconds"></a> [lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds](#input\_lambda\_event\_source\_mapping\_maximum\_batching\_window\_in\_seconds) | Maximum amount of time to gather records before invoking the lambda function, in seconds. AWS requires this to be greater than 0 if batch\_size is greater than 10. Defaults to 0. | `number` | `0` | no |
158160
| <a name="input_lambda_principals"></a> [lambda\_principals](#input\_lambda\_principals) | (Optional) add extra principals to the role created for execution of the lambda, e.g. for local testing. | <pre>list(object({<br/> type = string<br/> identifiers = list(string)<br/> }))</pre> | `[]` | no |
159161
| <a name="input_lambda_runtime"></a> [lambda\_runtime](#input\_lambda\_runtime) | AWS Lambda runtime. | `string` | `"nodejs22.x"` | no |
160162
| <a name="input_lambda_s3_bucket"></a> [lambda\_s3\_bucket](#input\_lambda\_s3\_bucket) | S3 bucket from which to specify lambda functions. This is an alternative to providing local files directly. | `string` | `null` | no |

lambdas/functions/control-plane/src/lambda.test.ts

Lines changed: 147 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -70,19 +70,33 @@ vi.mock('@aws-github-runner/aws-powertools-util');
7070
vi.mock('@aws-github-runner/aws-ssm-util');
7171

7272
describe('Test scale up lambda wrapper.', () => {
73-
it('Do not handle multiple record sets.', async () => {
74-
await testInvalidRecords([sqsRecord, sqsRecord]);
73+
it('Do not handle empty record sets.', async () => {
74+
const sqsEventMultipleRecords: SQSEvent = {
75+
Records: [],
76+
};
77+
78+
await expect(scaleUpHandler(sqsEventMultipleRecords, context)).resolves.not.toThrow();
7579
});
7680

77-
it('Do not handle empty record sets.', async () => {
78-
await testInvalidRecords([]);
81+
it('Ignores non-sqs event sources.', async () => {
82+
const record = {
83+
...sqsRecord,
84+
eventSource: 'aws:non-sqs',
85+
};
86+
87+
const sqsEventMultipleRecordsNonSQS: SQSEvent = {
88+
Records: [record],
89+
};
90+
91+
await expect(scaleUpHandler(sqsEventMultipleRecordsNonSQS, context)).resolves.not.toThrow();
92+
expect(scaleUp).toHaveBeenCalledWith([]);
7993
});
8094

8195
it('Scale without error should resolve.', async () => {
8296
const mock = vi.fn(scaleUp);
8397
mock.mockImplementation(() => {
8498
return new Promise((resolve) => {
85-
resolve();
99+
resolve([]);
86100
});
87101
});
88102
await expect(scaleUpHandler(sqsEvent, context)).resolves.not.toThrow();
@@ -104,28 +118,137 @@ describe('Test scale up lambda wrapper.', () => {
104118
vi.mocked(scaleUp).mockImplementation(mock);
105119
await expect(scaleUpHandler(sqsEvent, context)).rejects.toThrow(error);
106120
});
107-
});
108121

109-
async function testInvalidRecords(sqsRecords: SQSRecord[]) {
110-
const mock = vi.fn(scaleUp);
111-
const logWarnSpy = vi.spyOn(logger, 'warn');
112-
mock.mockImplementation(() => {
113-
return new Promise((resolve) => {
114-
resolve();
122+
describe('Batch processing', () => {
123+
beforeEach(() => {
124+
vi.clearAllMocks();
125+
});
126+
127+
const createMultipleRecords = (count: number, eventSource = 'aws:sqs'): SQSRecord[] => {
128+
return Array.from({ length: count }, (_, i) => ({
129+
...sqsRecord,
130+
eventSource,
131+
messageId: `message-${i}`,
132+
body: JSON.stringify({
133+
...body,
134+
id: i + 1,
135+
}),
136+
}));
137+
};
138+
139+
it('Should handle multiple SQS records in a single invocation', async () => {
140+
const records = createMultipleRecords(3);
141+
const multiRecordEvent: SQSEvent = { Records: records };
142+
143+
const mock = vi.fn(scaleUp);
144+
mock.mockImplementation(() => Promise.resolve([]));
145+
vi.mocked(scaleUp).mockImplementation(mock);
146+
147+
await expect(scaleUpHandler(multiRecordEvent, context)).resolves.not.toThrow();
148+
expect(scaleUp).toHaveBeenCalledWith(
149+
expect.arrayContaining([
150+
expect.objectContaining({ messageId: 'message-0' }),
151+
expect.objectContaining({ messageId: 'message-1' }),
152+
expect.objectContaining({ messageId: 'message-2' }),
153+
]),
154+
);
155+
});
156+
157+
it('Should return batch item failures for rejected messages', async () => {
158+
const records = createMultipleRecords(3);
159+
const multiRecordEvent: SQSEvent = { Records: records };
160+
161+
const mock = vi.fn(scaleUp);
162+
mock.mockImplementation(() => Promise.resolve(['message-1', 'message-2']));
163+
vi.mocked(scaleUp).mockImplementation(mock);
164+
165+
const result = await scaleUpHandler(multiRecordEvent, context);
166+
expect(result).toEqual({
167+
batchItemFailures: [{ itemIdentifier: 'message-1' }, { itemIdentifier: 'message-2' }],
168+
});
169+
});
170+
171+
it('Should filter out non-SQS event sources', async () => {
172+
const sqsRecords = createMultipleRecords(2, 'aws:sqs');
173+
const nonSqsRecords = createMultipleRecords(1, 'aws:sns');
174+
const mixedEvent: SQSEvent = {
175+
Records: [...sqsRecords, ...nonSqsRecords],
176+
};
177+
178+
const mock = vi.fn(scaleUp);
179+
mock.mockImplementation(() => Promise.resolve([]));
180+
vi.mocked(scaleUp).mockImplementation(mock);
181+
182+
await scaleUpHandler(mixedEvent, context);
183+
expect(scaleUp).toHaveBeenCalledWith(
184+
expect.arrayContaining([
185+
expect.objectContaining({ messageId: 'message-0' }),
186+
expect.objectContaining({ messageId: 'message-1' }),
187+
]),
188+
);
189+
expect(scaleUp).not.toHaveBeenCalledWith(
190+
expect.arrayContaining([expect.objectContaining({ messageId: 'message-2' })]),
191+
);
192+
});
193+
194+
it('Should sort messages by retry count', async () => {
195+
const records = [
196+
{
197+
...sqsRecord,
198+
messageId: 'high-retry',
199+
body: JSON.stringify({ ...body, retryCounter: 5 }),
200+
},
201+
{
202+
...sqsRecord,
203+
messageId: 'low-retry',
204+
body: JSON.stringify({ ...body, retryCounter: 1 }),
205+
},
206+
{
207+
...sqsRecord,
208+
messageId: 'no-retry',
209+
body: JSON.stringify({ ...body }),
210+
},
211+
];
212+
const multiRecordEvent: SQSEvent = { Records: records };
213+
214+
const mock = vi.fn(scaleUp);
215+
mock.mockImplementation((messages) => {
216+
// Verify messages are sorted by retry count (ascending)
217+
expect(messages[0].messageId).toBe('no-retry');
218+
expect(messages[1].messageId).toBe('low-retry');
219+
expect(messages[2].messageId).toBe('high-retry');
220+
return Promise.resolve([]);
221+
});
222+
vi.mocked(scaleUp).mockImplementation(mock);
223+
224+
await scaleUpHandler(multiRecordEvent, context);
225+
});
226+
227+
it('Should return all failed messages when scaleUp throws non-ScaleError', async () => {
228+
const records = createMultipleRecords(2);
229+
const multiRecordEvent: SQSEvent = { Records: records };
230+
231+
const mock = vi.fn(scaleUp);
232+
mock.mockImplementation(() => Promise.reject(new Error('Generic error')));
233+
vi.mocked(scaleUp).mockImplementation(mock);
234+
235+
const result = await scaleUpHandler(multiRecordEvent, context);
236+
expect(result).toEqual({ batchItemFailures: [] });
237+
});
238+
239+
it('Should throw when scaleUp throws ScaleError', async () => {
240+
const records = createMultipleRecords(2);
241+
const multiRecordEvent: SQSEvent = { Records: records };
242+
243+
const error = new ScaleError('Critical scaling error');
244+
const mock = vi.fn(scaleUp);
245+
mock.mockImplementation(() => Promise.reject(error));
246+
vi.mocked(scaleUp).mockImplementation(mock);
247+
248+
await expect(scaleUpHandler(multiRecordEvent, context)).rejects.toThrow(error);
115249
});
116250
});
117-
const sqsEventMultipleRecords: SQSEvent = {
118-
Records: sqsRecords,
119-
};
120-
121-
await expect(scaleUpHandler(sqsEventMultipleRecords, context)).resolves.not.toThrow();
122-
123-
expect(logWarnSpy).toHaveBeenCalledWith(
124-
expect.stringContaining(
125-
'Event ignored, only one record at the time can be handled, ensure the lambda batch size is set to 1.',
126-
),
127-
);
128-
}
251+
});
129252

130253
describe('Test scale down lambda wrapper.', () => {
131254
it('Scaling down no error.', async () => {

lambdas/functions/control-plane/src/lambda.ts

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,72 @@
11
import middy from '@middy/core';
22
import { logger, setContext } from '@aws-github-runner/aws-powertools-util';
33
import { captureLambdaHandler, tracer } from '@aws-github-runner/aws-powertools-util';
4-
import { Context, SQSEvent } from 'aws-lambda';
4+
import { Context, type SQSBatchItemFailure, type SQSBatchResponse, SQSEvent } from 'aws-lambda';
55

66
import { PoolEvent, adjust } from './pool/pool';
77
import ScaleError from './scale-runners/ScaleError';
88
import { scaleDown } from './scale-runners/scale-down';
9-
import { scaleUp } from './scale-runners/scale-up';
9+
import { type ActionRequestMessage, type ActionRequestMessageSQS, scaleUp } from './scale-runners/scale-up';
1010
import { SSMCleanupOptions, cleanSSMTokens } from './scale-runners/ssm-housekeeper';
1111
import { checkAndRetryJob } from './scale-runners/job-retry';
1212

13-
export async function scaleUpHandler(event: SQSEvent, context: Context): Promise<void> {
13+
export async function scaleUpHandler(event: SQSEvent, context: Context): Promise<SQSBatchResponse> {
1414
setContext(context, 'lambda.ts');
1515
logger.logEventIfEnabled(event);
1616

17-
if (event.Records.length !== 1) {
18-
logger.warn('Event ignored, only one record at the time can be handled, ensure the lambda batch size is set to 1.');
19-
return Promise.resolve();
17+
// Group the messages by their event source. We're only interested in
18+
// `aws:sqs`-originated messages.
19+
const groupedEvents = new Map<string, ActionRequestMessageSQS[]>();
20+
for (const { body, eventSource, messageId } of event.Records) {
21+
const group = groupedEvents.get(eventSource) || [];
22+
const payload = JSON.parse(body) as ActionRequestMessage;
23+
24+
if (group.length === 0) {
25+
groupedEvents.set(eventSource, group);
26+
}
27+
28+
groupedEvents.get(eventSource)?.push({
29+
...payload,
30+
messageId,
31+
});
32+
}
33+
34+
for (const [eventSource, messages] of groupedEvents.entries()) {
35+
if (eventSource === 'aws:sqs') {
36+
continue;
37+
}
38+
39+
logger.warn('Ignoring non-sqs event source', { eventSource, messages });
2040
}
2141

42+
const sqsMessages = groupedEvents.get('aws:sqs') ?? [];
43+
44+
// Sort messages by their retry count, so that we retry the same messages if
45+
// there's a persistent failure. This should cause messages to be dropped
46+
// quicker than if we retried in an arbitrary order.
47+
sqsMessages.sort((l, r) => {
48+
return (l.retryCounter ?? 0) - (r.retryCounter ?? 0);
49+
});
50+
51+
const batchItemFailures: SQSBatchItemFailure[] = [];
52+
2253
try {
23-
await scaleUp(event.Records[0].eventSource, JSON.parse(event.Records[0].body));
24-
return Promise.resolve();
54+
const rejectedMessageIds = await scaleUp(sqsMessages);
55+
56+
for (const messageId of rejectedMessageIds) {
57+
batchItemFailures.push({
58+
itemIdentifier: messageId,
59+
});
60+
}
61+
62+
return { batchItemFailures };
2563
} catch (e) {
2664
if (e instanceof ScaleError) {
27-
return Promise.reject(e);
28-
} else {
29-
logger.warn(`Ignoring error: ${e}`);
30-
return Promise.resolve();
65+
throw e;
3166
}
67+
68+
logger.warn(`Will retry error: ${e}`);
69+
return { batchItemFailures };
3270
}
3371
}
3472

lambdas/functions/control-plane/src/local.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
import { logger } from '@aws-github-runner/aws-powertools-util';
22

3-
import { ActionRequestMessage, scaleUp } from './scale-runners/scale-up';
3+
import { scaleUpHandler } from './lambda';
4+
import { Context, SQSEvent } from 'aws-lambda';
45

5-
const sqsEvent = {
6+
const sqsEvent: SQSEvent = {
67
Records: [
78
{
89
messageId: 'e8d74d08-644e-42ca-bf82-a67daa6c4dad',
910
receiptHandle:
10-
// eslint-disable-next-line max-len
1111
'AQEBCpLYzDEKq4aKSJyFQCkJduSKZef8SJVOperbYyNhXqqnpFG5k74WygVAJ4O0+9nybRyeOFThvITOaS21/jeHiI5fgaM9YKuI0oGYeWCIzPQsluW5CMDmtvqv1aA8sXQ5n2x0L9MJkzgdIHTC3YWBFLQ2AxSveOyIHwW+cHLIFCAcZlOaaf0YtaLfGHGkAC4IfycmaijV8NSlzYgDuxrC9sIsWJ0bSvk5iT4ru/R4+0cjm7qZtGlc04k9xk5Fu6A+wRxMaIyiFRY+Ya19ykcevQldidmEjEWvN6CRToLgclk=',
12-
body: {
12+
body: JSON.stringify({
1313
repositoryName: 'self-hosted',
1414
repositoryOwner: 'test-runners',
1515
eventType: 'workflow_job',
1616
id: 987654,
1717
installationId: 123456789,
18-
},
18+
}),
1919
attributes: {
2020
ApproximateReceiveCount: '1',
2121
SentTimestamp: '1626450047230',
@@ -34,12 +34,34 @@ const sqsEvent = {
3434
],
3535
};
3636

37+
const context: Context = {
38+
awsRequestId: '1',
39+
callbackWaitsForEmptyEventLoop: false,
40+
functionName: '',
41+
functionVersion: '',
42+
getRemainingTimeInMillis: () => 0,
43+
invokedFunctionArn: '',
44+
logGroupName: '',
45+
logStreamName: '',
46+
memoryLimitInMB: '',
47+
done: () => {
48+
return;
49+
},
50+
fail: () => {
51+
return;
52+
},
53+
succeed: () => {
54+
return;
55+
},
56+
};
57+
3758
export function run(): void {
38-
scaleUp(sqsEvent.Records[0].eventSource, sqsEvent.Records[0].body as ActionRequestMessage)
39-
.then()
40-
.catch((e) => {
41-
logger.error(e);
42-
});
59+
try {
60+
scaleUpHandler(sqsEvent, context);
61+
} catch (e: unknown) {
62+
const message = e instanceof Error ? e.message : `${e}`;
63+
logger.error(message, e instanceof Error ? { error: e } : {});
64+
}
4365
}
4466

4567
run();

0 commit comments

Comments
 (0)