Skip to content

Commit 66fb73e

Browse files
dreamorosisvozza
andauthored
docs(batch): clarify ordering/async processing (#4081)
Co-authored-by: Stefano Vozza <[email protected]>
1 parent a448c9f commit 66fb73e

File tree

6 files changed

+21
-80
lines changed

6 files changed

+21
-80
lines changed

docs/features/batch.md

Lines changed: 9 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -49,16 +49,15 @@ journey
4949
Records expired: 1: Failure
5050
```
5151

52-
This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
52+
This behavior changes when you enable the [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/services-sqs-errorhandling.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
5353

54-
<!-- markdownlint-disable MD013 -->
5554
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
5655
* [**Kinesis data streams**](#kinesis-and-dynamodb-streams) and [**DynamoDB streams**](#kinesis-and-dynamodb-streams). Single reported failure will use its sequence number as the stream checkpoint. Multiple reported failures will use the lowest sequence number as checkpoint.
5756

5857
<!-- HTML tags are required in admonition content thus increasing line length beyond our limits -->
5958
<!-- markdownlint-disable MD013 -->
6059
???+ warning "Warning: This utility lowers the chance of processing records more than once; it does not guarantee it"
61-
We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible.
60+
We recommend implementing processing logic in an [idempotent manner](./idempotency.md){target="_blank"} whenever possible.
6261

6362
You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation.
6463

@@ -72,7 +71,7 @@ Install the library in your project
7271
npm i @aws-lambda-powertools/batch
7372
```
7473

75-
For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.
74+
For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, so that the response from the Batch Processing utility can inform the service which records failed to be processed.
7675

7776
Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
7877

@@ -108,8 +107,8 @@ Processing batches from SQS works in three stages:
108107
2. Define your function to handle each batch record, and use the `SQSRecord` type annotation for autocompletion
109108
3. Use **`processPartialResponse`** to kick off processing
110109

111-
???+ info
112-
This code example optionally uses Logger for completion.
110+
!!! note
111+
By default, the batch processor will process messages in parallel, which does not guarantee the order of processing. If you need to process messages in order, set the [`processInParallel` option to `false`](#sequential-async-processing), or use [`SqsFifoPartialProcessor` for SQS FIFO queues](#fifo-queues).
113112

114113
=== "index.ts"
115114

@@ -147,30 +146,18 @@ By default, we will stop processing at the first failure and mark unprocessed me
147146

148147
Enable the `skipGroupOnError` option for seamless processing of messages from various group IDs. This setup ensures that messages from a failed group ID are sent back to SQS, enabling uninterrupted processing of messages from the subsequent group ID.
149148

150-
=== "Recommended"
149+
=== "index.ts"
151150

152151
```typescript hl_lines="1-4 8 20"
153152
--8<-- "examples/snippets/batch/gettingStartedSQSFifo.ts"
154153
```
155-
156-
1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)
157-
158-
=== "Async processing"
159-
160-
```typescript hl_lines="1-4 8 20"
161-
--8<-- "examples/snippets/batch/gettingStartedSQSFifoAsync.ts"
162-
```
163154

164-
=== "Enabling skipGroupOnError flag"
155+
=== "with `skipGroupOnError`"
165156

166157
```typescript hl_lines="1-4 13 30"
167158
--8<-- "examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts"
168159
```
169160

170-
!!! Note
171-
Note that `SqsFifoPartialProcessor` is synchronous using `processPartialResponseSync`.
172-
If you need asynchronous processing while preserving the order of messages in the queue, use `SqsFifoPartialProcessorAsync` with `processPartialResponse`.
173-
174161
### Processing messages from Kinesis
175162

176163
Processing batches from Kinesis works in three stages:
@@ -179,9 +166,6 @@ Processing batches from Kinesis works in three stages:
179166
2. Define your function to handle each batch record, and use the `KinesisStreamRecord` type annotation for autocompletion
180167
3. Use **`processPartialResponse`** to kick off processing
181168

182-
???+ info
183-
This code example optionally uses Logger for completion.
184-
185169
=== "index.ts"
186170

187171
```typescript hl_lines="1-5 9 12 19-21"
@@ -407,32 +391,6 @@ sequenceDiagram
407391
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
408392
</center>
409393

410-
### Async or sync processing
411-
412-
There are two processors you can use with this utility:
413-
414-
* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously
415-
* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously
416-
417-
In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously.
418-
There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another.
419-
For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions.
420-
421-
!!! info "Note that you need match your processing function with the right batch processor"
422-
*If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
423-
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`
424-
425-
The difference between the two processors is in how they handle record processing:
426-
427-
* **`BatchProcessor`**: By default, it processes records in parallel using `Promise.all()`. However, it also offers an [option](#sequential-async-processing) to process records sequentially, preserving the order.
428-
* **`BatchProcessorSync`**: Always processes records sequentially, ensuring the order is preserved by looping through each record one by one.
429-
430-
???+ question "When is this useful?"
431-
432-
For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
433-
434-
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
435-
436394
## Advanced
437395

438396
### Accessing processed messages
@@ -492,6 +450,8 @@ By default, the `BatchProcessor` processes records in parallel using `Promise.al
492450

493451
!!! important "If the `processInParallel` option is not provided, the `BatchProcessor` will process records in parallel."
494452

453+
When processing records from SQS FIFO queues, we recommend using the [`SqsFifoPartialProcessor`](#fifo-queues) class, which guarantees ordering of records and implements a short-circuit mechanism to skip processing records from a different message group ID.
454+
495455
```typescript hl_lines="8 17" title="Sequential async processing"
496456
--8<-- "examples/snippets/batch/sequentialAsyncProcessing.ts"
497457
```
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import {
2-
SqsFifoPartialProcessor,
3-
processPartialResponseSync,
2+
SqsFifoPartialProcessorAsync,
3+
processPartialResponse,
44
} from '@aws-lambda-powertools/batch';
55
import { Logger } from '@aws-lambda-powertools/logger';
66
import type { SQSHandler, SQSRecord } from 'aws-lambda';
77

8-
const processor = new SqsFifoPartialProcessor(); // (1)!
8+
const processor = new SqsFifoPartialProcessorAsync();
99
const logger = new Logger();
1010

11-
const recordHandler = (record: SQSRecord): void => {
11+
const recordHandler = async (record: SQSRecord): Promise<void> => {
1212
const payload = record.body;
1313
if (payload) {
1414
const item = JSON.parse(payload);
@@ -17,6 +17,6 @@ const recordHandler = (record: SQSRecord): void => {
1717
};
1818

1919
export const handler: SQSHandler = async (event, context) =>
20-
processPartialResponseSync(event, recordHandler, processor, {
20+
processPartialResponse(event, recordHandler, processor, {
2121
context,
2222
});

examples/snippets/batch/gettingStartedSQSFifoAsync.ts

Lines changed: 0 additions & 22 deletions
This file was deleted.

examples/snippets/batch/gettingStartedSQSFifoSkipGroupOnError.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
2-
SqsFifoPartialProcessor,
3-
processPartialResponseSync,
2+
SqsFifoPartialProcessorAsync,
3+
processPartialResponse,
44
} from '@aws-lambda-powertools/batch';
55
import { Logger } from '@aws-lambda-powertools/logger';
66
import type {
@@ -10,7 +10,7 @@ import type {
1010
SQSRecord,
1111
} from 'aws-lambda';
1212

13-
const processor = new SqsFifoPartialProcessor();
13+
const processor = new SqsFifoPartialProcessorAsync();
1414
const logger = new Logger();
1515

1616
const recordHandler = (record: SQSRecord): void => {
@@ -25,7 +25,7 @@ export const handler = async (
2525
event: SQSEvent,
2626
context: Context
2727
): Promise<SQSBatchResponse> => {
28-
return processPartialResponseSync(event, recordHandler, processor, {
28+
return processPartialResponse(event, recordHandler, processor, {
2929
context,
3030
skipGroupOnError: true,
3131
});

packages/batch/src/BatchProcessorSync.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ import type { BaseRecord, FailureResponse, SuccessResponse } from './types.js';
8080
* ```
8181
*
8282
* @param eventType The type of event to process (SQS, Kinesis, DynamoDB)
83+
* @deprecated Use {@link BasePartialBatchProcessor} instead, this class is deprecated and will be removed in the next major version.
8384
*/
8485
class BatchProcessorSync extends BasePartialBatchProcessor {
8586
/**

packages/batch/src/processPartialResponseSync.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import type {
77
} from './types.js';
88

99
/**
10+
* @deprecated Use {@link processPartialResponse} instead, this function is deprecated and will be removed in the next major version.
11+
*
1012
* Higher level function to process a batch of records synchronously
1113
* and handle partial failure cases.
1214
*

0 commit comments

Comments
 (0)