Skip to content

Commit 41c342a

Browse files
authored
GH-3178: PartitionOffset support for SeekPosition
* Fixes #3178 * add support for custom SeekPosition via `@PartitionOffset` * align `@PartitionOffset` to `TopicPartitionOffset` * add unit test for `@PartitionOffset.SeekPosition` * add unit test for SpEL partitions to Integer[] and Integer * address PR review.
1 parent df8b7a1 commit 41c342a

File tree

6 files changed

+293
-56
lines changed

6 files changed

+293
-56
lines changed

spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/listener-annotation.adoc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,25 @@ public void listen(ConsumerRecord<?, ?> record) {
150150

151151
The initial offset will be applied to all 6 partitions.
152152

153+
Since 3.2, `@PartitionOffset` support `SeekPosition.END`, `SeekPosition.BEGINNING`, `SeekPosition.TIMESTAMP`, `seekPosition` match `SeekPosition` enum name:
154+
155+
[source, java]
156+
----
157+
@KafkaListener(id = "seekPositionTime", topicPartitions = {
158+
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
159+
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
160+
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
161+
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
162+
})
163+
})
164+
public void listen(ConsumerRecord<?, ?> record) {
165+
...
166+
}
167+
----
168+
169+
If seekPosition set `END` or `BEGINNING` will ignore `initialOffset` and `relativeToCurrent`.
170+
If seekPosition set `TIMESTAMP`, `initialOffset` means timestamp.
171+
153172
[[manual-acknowledgment]]
154173
== Manual Acknowledgment
155174

spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ Provides new class `EndpointHandlerMultiMethod` to handler multi method for retr
7575
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
7676
See xref:kafka/seek.adoc#seek[Seek API Docs] for more details.
7777

78+
[[x32-annotation-partition-offset-seek-position]]
79+
=== @PartitionOffset support for SeekPosition
80+
Adding `seekPosition` property to `@PartitionOffset` support for `TopicPartitionOffset.SeekPosition`.
81+
See xref:kafka/receiving-messages/listener-annotation.adoc#manual-assignment[manual-assignment] for more details.
82+
7883
[[x32-topic-partition-offset-constructor]]
7984
=== New constructor in TopicPartitionOffset that accepts a function to compute the offset to seek to
8085
`TopicPartitionOffset` has a new constructor that takes a user-provided function to compute the offset to seek to.

spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

Lines changed: 57 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@
9797
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
9898
import org.springframework.kafka.retrytopic.RetryTopicSchedulerWrapper;
9999
import org.springframework.kafka.support.TopicPartitionOffset;
100+
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
100101
import org.springframework.lang.Nullable;
101102
import org.springframework.messaging.converter.GenericMessageConverter;
102103
import org.springframework.messaging.converter.SmartMessageConverter;
@@ -827,10 +828,8 @@ private String getEndpointGroupId(KafkaListener kafkaListener, @Nullable String
827828
private TopicPartitionOffset[] resolveTopicPartitions(KafkaListener kafkaListener) {
828829
TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
829830
List<TopicPartitionOffset> result = new ArrayList<>();
830-
if (topicPartitions.length > 0) {
831-
for (TopicPartition topicPartition : topicPartitions) {
832-
result.addAll(resolveTopicPartitionsList(topicPartition));
833-
}
831+
for (TopicPartition topicPartition : topicPartitions) {
832+
result.addAll(resolveTopicPartitionsList(topicPartition));
834833
}
835834
return result.toArray(new TopicPartitionOffset[0]);
836835
}
@@ -877,7 +876,7 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
877876
() -> "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
878877
List<TopicPartitionOffset> result = new ArrayList<>();
879878
for (String partition : partitions) {
880-
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result, null, false, false);
879+
resolvePartitionAsInteger((String) topic, resolveExpression(partition), result);
881880
}
882881
if (partitionOffsets.length == 1 && resolveExpression(partitionOffsets[0].partition()).equals("*")) {
883882
result.forEach(tpo -> {
@@ -890,7 +889,8 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
890889
Assert.isTrue(!partitionOffset.partition().equals("*"), () ->
891890
"Partition wildcard '*' is only allowed in a single @PartitionOffset in " + result);
892891
resolvePartitionAsInteger((String) topic, resolveExpression(partitionOffset.partition()), result,
893-
resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true);
892+
resolveInitialOffset(topic, partitionOffset), isRelative(topic, partitionOffset), true,
893+
resolveExpression(partitionOffset.seekPosition()));
894894
}
895895
}
896896
Assert.isTrue(!result.isEmpty(), () -> "At least one partition required for " + topic);
@@ -899,11 +899,11 @@ private List<TopicPartitionOffset> resolveTopicPartitionsList(TopicPartition top
899899

900900
private Long resolveInitialOffset(Object topic, PartitionOffset partitionOffset) {
901901
Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
902-
Long initialOffset;
902+
long initialOffset;
903903
if (initialOffsetValue instanceof String str) {
904904
Assert.state(StringUtils.hasText(str),
905905
() -> "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
906-
initialOffset = Long.valueOf(str);
906+
initialOffset = Long.parseLong(str);
907907
}
908908
else if (initialOffsetValue instanceof Long lng) {
909909
initialOffset = lng;
@@ -954,20 +954,33 @@ else if (resolvedValue instanceof Iterable) {
954954
}
955955
}
956956

957+
private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result) {
958+
resolvePartitionAsInteger(topic, resolvedValue, result, null, false, false, null);
959+
}
960+
957961
@SuppressWarnings(UNCHECKED)
958-
private void resolvePartitionAsInteger(String topic, Object resolvedValue,
959-
List<TopicPartitionOffset> result, @Nullable Long offset, boolean isRelative, boolean checkDups) {
962+
private void resolvePartitionAsInteger(String topic, Object resolvedValue, List<TopicPartitionOffset> result,
963+
@Nullable Long offset, boolean isRelative, boolean checkDups, @Nullable Object seekPosition) {
960964

961965
if (resolvedValue instanceof String[] strArr) {
962966
for (Object object : strArr) {
963-
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
967+
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups, seekPosition);
964968
}
969+
return;
965970
}
966-
else if (resolvedValue instanceof String str) {
971+
else if (resolvedValue instanceof Iterable) {
972+
for (Object object : (Iterable<Object>) resolvedValue) {
973+
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups, seekPosition);
974+
}
975+
return;
976+
}
977+
978+
TopicPartitionOffset.SeekPosition tpoSp = resloveTopicPartitionOffsetSeekPosition(seekPosition);
979+
if (resolvedValue instanceof String str) {
967980
Assert.state(StringUtils.hasText(str),
968981
() -> "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
969982
List<TopicPartitionOffset> collected = parsePartitions(str)
970-
.map(part -> new TopicPartitionOffset(topic, part, offset, isRelative))
983+
.map(part -> createTopicPartitionOffset(topic, part, offset, isRelative, tpoSp))
971984
.toList();
972985
if (checkDups) {
973986
collected.forEach(tpo -> {
@@ -980,23 +993,47 @@ else if (resolvedValue instanceof String str) {
980993
}
981994
else if (resolvedValue instanceof Integer[] intArr) {
982995
for (Integer partition : intArr) {
983-
result.add(new TopicPartitionOffset(topic, partition));
996+
result.add(createTopicPartitionOffset(topic, partition, offset, isRelative, tpoSp));
984997
}
985998
}
986999
else if (resolvedValue instanceof Integer intgr) {
987-
result.add(new TopicPartitionOffset(topic, intgr));
988-
}
989-
else if (resolvedValue instanceof Iterable) {
990-
for (Object object : (Iterable<Object>) resolvedValue) {
991-
resolvePartitionAsInteger(topic, object, result, offset, isRelative, checkDups);
992-
}
1000+
result.add(createTopicPartitionOffset(topic, intgr, offset, isRelative, tpoSp));
9931001
}
9941002
else {
9951003
throw new IllegalArgumentException(String.format(
9961004
"@KafKaListener for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
9971005
}
9981006
}
9991007

1008+
@Nullable
1009+
private TopicPartitionOffset.SeekPosition resloveTopicPartitionOffsetSeekPosition(@Nullable Object seekPosition) {
1010+
TopicPartitionOffset.SeekPosition resloveTpoSp = null;
1011+
if (seekPosition instanceof String seekPositionName) {
1012+
String capitalLetterSeekPositionName = seekPositionName.trim().toUpperCase();
1013+
if (SeekPosition.BEGINNING.name().equals(capitalLetterSeekPositionName)) {
1014+
resloveTpoSp = SeekPosition.BEGINNING;
1015+
}
1016+
else if (SeekPosition.END.name().equals(capitalLetterSeekPositionName)) {
1017+
resloveTpoSp = SeekPosition.END;
1018+
}
1019+
else if (SeekPosition.TIMESTAMP.name().equals(capitalLetterSeekPositionName)) {
1020+
resloveTpoSp = SeekPosition.TIMESTAMP;
1021+
}
1022+
}
1023+
return resloveTpoSp;
1024+
}
1025+
1026+
private TopicPartitionOffset createTopicPartitionOffset(String topic, int partition, @Nullable Long offset,
1027+
boolean isRelative, @Nullable SeekPosition seekPosition) {
1028+
1029+
if (seekPosition != null) {
1030+
return new TopicPartitionOffset(topic, partition, offset, seekPosition);
1031+
}
1032+
else {
1033+
return new TopicPartitionOffset(topic, partition, offset, isRelative);
1034+
}
1035+
}
1036+
10001037
private String resolveExpressionAsString(String value, String attribute) {
10011038
Object resolved = resolveExpression(value);
10021039
if (resolved instanceof String str) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/PartitionOffset.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,11 +20,14 @@
2020
import java.lang.annotation.RetentionPolicy;
2121
import java.lang.annotation.Target;
2222

23+
import org.springframework.kafka.support.TopicPartitionOffset.SeekPosition;
24+
2325
/**
2426
* Used to add partition/initial offset information to a {@code KafkaListener}.
2527
*
2628
* @author Artem Bilan
2729
* @author Gary Russell
30+
* @author Wang Zhiyang
2831
*/
2932
@Target({})
3033
@Retention(RetentionPolicy.RUNTIME)
@@ -60,4 +63,17 @@
6063
*/
6164
String relativeToCurrent() default "false";
6265

66+
/**
67+
* Position to seek on partition assignment. By default, seek by offset.
68+
* Set {@link SeekPosition} seek position enum name to specify "special"
69+
* seeks, no restrictions on capitalization. If seekPosition set 'BEGINNING'
70+
* or 'END', ignore {@code relativeToCurrent} and {@code initialOffset}.
71+
* If seekPosition set 'TIMESTAMP', initialOffset means time stamp, ignore
72+
* {@code relativeToCurrent}.
73+
* @return special seeks.
74+
* @since 3.2
75+
* @see SeekPosition
76+
*/
77+
String seekPosition() default "";
78+
6379
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3103,6 +3103,7 @@ private void initPartitionsIfNeeded() {
31033103
.filter(e -> SeekPosition.TIMESTAMP.equals(e.getValue().seekPosition))
31043104
.collect(Collectors.toMap(Entry::getKey, entry -> entry.getValue().offset));
31053105
if (!times.isEmpty()) {
3106+
times.forEach((key, value) -> partitions.remove(key));
31063107
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes = this.consumer.offsetsForTimes(times);
31073108
offsetsForTimes.forEach((tp, off) -> {
31083109
if (off == null) {
@@ -3117,7 +3118,7 @@ private void initPartitionsIfNeeded() {
31173118
if (this.consumerSeekAwareListener != null) {
31183119
this.consumerSeekAwareListener.onPartitionsAssigned(this.definedPartitions.keySet().stream()
31193120
.map(tp -> new SimpleEntry<>(tp, this.consumer.position(tp)))
3120-
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue())),
3121+
.collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue)),
31213122
this.seekCallback);
31223123
}
31233124
}

0 commit comments

Comments
 (0)