Skip to content

Commit d042e80

Browse files
fix partition key
1 parent e19741b commit d042e80

File tree

3 files changed

+8
-8
lines changed

3 files changed

+8
-8
lines changed

aws_lambda_powertools/utilities/batch/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,12 +374,12 @@ def _collect_dynamodb_failures(self):
374374
def _collect_kafka_failures(self):
375375
failures = []
376376
for msg in self.fail_messages:
377-
# Kafka uses a composite identifier with topic-partition and offset
377+
# Kafka uses a composite identifier with partition and offset
378378
# Both data class and Pydantic model use the same field names
379379
failures.append(
380380
{
381381
"itemIdentifier": {
382-
"topic-partition": f"{msg.topic}-{msg.partition}",
382+
"partition": f"{msg.topic}-{msg.partition}",
383383
"offset": msg.offset,
384384
},
385385
},

aws_lambda_powertools/utilities/batch/types.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929

3030

3131
class KafkaItemIdentifier(TypedDict):
32-
"""Kafka uses a composite identifier with topic-partition and offset."""
32+
"""Kafka uses a composite identifier with partition and offset."""
3333

34-
topic_partition: str # Maps to "topic-partition" in the actual response
34+
partition: str
3535
offset: int
3636

3737

@@ -40,7 +40,7 @@ class PartialItemFailures(TypedDict):
4040
Represents a partial item failure response.
4141
4242
For SQS, Kinesis, and DynamoDB: itemIdentifier is a string (message_id or sequence_number)
43-
For Kafka: itemIdentifier is a KafkaItemIdentifier dict with topic-partition and offset
43+
For Kafka: itemIdentifier is a KafkaItemIdentifier dict with partition and offset
4444
"""
4545

4646
itemIdentifier: str | KafkaItemIdentifier

tests/functional/batch/required_dependencies/test_utilities_batch_kafka.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ def test_kafka_batch_processor_partial_failure(self, kafka_event_factory, kafka_
150150
# THEN - Kafka uses composite identifier
151151
assert len(result["batchItemFailures"]) == 1
152152
assert result["batchItemFailures"][0]["itemIdentifier"] == {
153-
"topic-partition": "mytopic-0",
153+
"partition": "mytopic-0",
154154
"offset": 1,
155155
}
156156

@@ -203,7 +203,7 @@ def test_kafka_batch_processor_multi_topic_partition(self, kafka_event_factory,
203203

204204
# THEN
205205
assert len(result["batchItemFailures"]) == 2
206-
topic_partitions = [f["itemIdentifier"]["topic-partition"] for f in result["batchItemFailures"]]
206+
topic_partitions = [f["itemIdentifier"]["partition"] for f in result["batchItemFailures"]]
207207
assert "topic1-0" in topic_partitions
208208
assert "topic2-1" in topic_partitions
209209

@@ -332,7 +332,7 @@ def test_async_kafka_batch_processor_partial_failure(self, kafka_event_factory,
332332
# THEN
333333
assert len(result["batchItemFailures"]) == 1
334334
assert result["batchItemFailures"][0]["itemIdentifier"] == {
335-
"topic-partition": "mytopic-0",
335+
"partition": "mytopic-0",
336336
"offset": 1,
337337
}
338338

0 commit comments

Comments
 (0)