Skip to content

Commit 341e247

Browse files
committed
MINOR: Cleanups in coordinator-common/group-coordinator
1 parent 2e3ddb2 commit 341e247

39 files changed

+188
-440
lines changed

coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorOperationExceptionHelper.java

Lines changed: 14 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -48,36 +48,25 @@ public static <IN, OUT> OUT handleOperationException(
4848
) {
4949
ApiError apiError = ApiError.fromThrowable(exception);
5050

51-
switch (apiError.error()) {
52-
case UNKNOWN_SERVER_ERROR:
51+
return switch (apiError.error()) {
52+
case UNKNOWN_SERVER_ERROR -> {
5353
log.error("Operation {} with {} hit an unexpected exception: {}.",
54-
operationName, operationInput, exception.getMessage(), exception);
55-
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
56-
57-
case NETWORK_EXCEPTION:
54+
operationName, operationInput, exception.getMessage(), exception);
55+
yield handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
56+
}
57+
case NETWORK_EXCEPTION ->
5858
// When committing offsets transactionally, we now verify the transaction with the
5959
// transaction coordinator. Verification can fail with `NETWORK_EXCEPTION`, a
6060
// retriable error which older clients may not expect and retry correctly. We
6161
// translate the error to `COORDINATOR_LOAD_IN_PROGRESS` because it causes clients
6262
// to retry the request without an unnecessary coordinator lookup.
63-
return handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
64-
65-
case UNKNOWN_TOPIC_OR_PARTITION:
66-
case NOT_ENOUGH_REPLICAS:
67-
case REQUEST_TIMED_OUT:
68-
return handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
69-
70-
case NOT_LEADER_OR_FOLLOWER:
71-
case KAFKA_STORAGE_ERROR:
72-
return handler.apply(Errors.NOT_COORDINATOR, null);
73-
74-
case MESSAGE_TOO_LARGE:
75-
case RECORD_LIST_TOO_LARGE:
76-
case INVALID_FETCH_SIZE:
77-
return handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
78-
79-
default:
80-
return handler.apply(apiError.error(), apiError.message());
81-
}
63+
handler.apply(Errors.COORDINATOR_LOAD_IN_PROGRESS, null);
64+
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
65+
handler.apply(Errors.COORDINATOR_NOT_AVAILABLE, null);
66+
case NOT_LEADER_OR_FOLLOWER, KAFKA_STORAGE_ERROR -> handler.apply(Errors.NOT_COORDINATOR, null);
67+
case MESSAGE_TOO_LARGE, RECORD_LIST_TOO_LARGE, INVALID_FETCH_SIZE ->
68+
handler.apply(Errors.UNKNOWN_SERVER_ERROR, null);
69+
default -> handler.apply(apiError.error(), apiError.message());
70+
};
8271
}
8372
}

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorExecutor.java

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,7 @@ CoordinatorResult<Void, T> execute() {
4949
}
5050
}
5151

52-
public static class ExecutorResult<T> {
53-
public final String key;
54-
public final CoordinatorResult<Void, T> result;
55-
52+
public record ExecutorResult<T>(String key, CoordinatorResult<Void, T> result) {
5653
public ExecutorResult(
5754
String key,
5855
CoordinatorResult<Void, T> result
@@ -61,24 +58,6 @@ public ExecutorResult(
6158
this.result = Objects.requireNonNull(result);
6259
}
6360

64-
@Override
65-
public boolean equals(Object o) {
66-
if (this == o) return true;
67-
if (o == null || getClass() != o.getClass()) return false;
68-
69-
ExecutorResult<?> that = (ExecutorResult<?>) o;
70-
71-
if (!Objects.equals(key, that.key)) return false;
72-
return Objects.equals(result, that.result);
73-
}
74-
75-
@Override
76-
public int hashCode() {
77-
int result = key.hashCode();
78-
result = 31 * result + this.result.hashCode();
79-
return result;
80-
}
81-
8261
@Override
8362
public String toString() {
8463
return "ExecutorResult(" +

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorShard.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* A simple Coordinator implementation that stores the records into a set.
3232
*/
3333
public class MockCoordinatorShard implements CoordinatorShard<String> {
34-
static record RecordAndMetadata(
34+
record RecordAndMetadata(
3535
long offset,
3636
long producerId,
3737
short producerEpoch,

coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.Objects;
2726
import java.util.PriorityQueue;
2827
import java.util.concurrent.TimeUnit;
2928

@@ -36,54 +35,13 @@ public class MockCoordinatorTimer<T, U> implements CoordinatorTimer<T, U> {
3635
/**
3736
* Represents a scheduled timeout.
3837
*/
39-
public static class ScheduledTimeout<T, U> {
40-
public final String key;
41-
public final long deadlineMs;
42-
public final TimeoutOperation<T, U> operation;
43-
44-
public ScheduledTimeout(
45-
String key,
46-
long deadlineMs,
47-
TimeoutOperation<T, U> operation
48-
) {
49-
this.key = key;
50-
this.deadlineMs = deadlineMs;
51-
this.operation = operation;
52-
}
38+
public record ScheduledTimeout<T, U>(String key, long deadlineMs, TimeoutOperation<T, U> operation) {
5339
}
5440

5541
/**
5642
* Represents an expired timeout.
5743
*/
58-
public static class ExpiredTimeout<T, U> {
59-
public final String key;
60-
public final CoordinatorResult<T, U> result;
61-
62-
public ExpiredTimeout(
63-
String key,
64-
CoordinatorResult<T, U> result
65-
) {
66-
this.key = key;
67-
this.result = result;
68-
}
69-
70-
@Override
71-
public boolean equals(Object o) {
72-
if (this == o) return true;
73-
if (o == null || getClass() != o.getClass()) return false;
74-
75-
ExpiredTimeout<?, ?> that = (ExpiredTimeout<?, ?>) o;
76-
77-
if (!Objects.equals(key, that.key)) return false;
78-
return Objects.equals(result, that.result);
79-
}
80-
81-
@Override
82-
public int hashCode() {
83-
int result1 = key != null ? key.hashCode() : 0;
84-
result1 = 31 * result1 + (result != null ? result.hashCode() : 0);
85-
return result1;
86-
}
44+
public record ExpiredTimeout<T, U>(String key, CoordinatorResult<T, U> result) {
8745
}
8846

8947
private final Time time;

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -550,7 +550,7 @@ protected List<ConsumerGroupPartitionAssignor> consumerGroupAssignors(
550550
}
551551
} else if (object instanceof Class<?> klass) {
552552
Object o = Utils.newInstance((Class<?>) klass);
553-
if (!ConsumerGroupPartitionAssignor.class.isInstance(o)) {
553+
if (!(o instanceof ConsumerGroupPartitionAssignor)) {
554554
throw new KafkaException(klass + " is not an instance of " + ConsumerGroupPartitionAssignor.class.getName());
555555
}
556556
assignor = (ConsumerGroupPartitionAssignor) o;

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorRecordHelpers.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionRecord(
327327
String regex,
328328
ResolvedRegularExpression resolvedRegularExpression
329329
) {
330-
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics);
330+
List<String> topics = new ArrayList<>(resolvedRegularExpression.topics());
331331
Collections.sort(topics);
332332

333333
return CoordinatorRecord.record(
@@ -337,8 +337,8 @@ public static CoordinatorRecord newConsumerGroupRegularExpressionRecord(
337337
new ApiMessageAndVersion(
338338
new ConsumerGroupRegularExpressionValue()
339339
.setTopics(topics)
340-
.setVersion(resolvedRegularExpression.version)
341-
.setTimestamp(resolvedRegularExpression.timestamp),
340+
.setVersion(resolvedRegularExpression.version())
341+
.setTimestamp(resolvedRegularExpression.timestamp()),
342342
(short) 0
343343
)
344344
);

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java

Lines changed: 21 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,7 +1266,7 @@ public CompletableFuture<AlterShareGroupOffsetsResponseData> alterShareGroupOffs
12661266
"share-group-offsets-alter",
12671267
request,
12681268
exception,
1269-
(error, message) -> AlterShareGroupOffsetsRequest.getErrorResponseData(error, message),
1269+
AlterShareGroupOffsetsRequest::getErrorResponseData,
12701270
log
12711271
));
12721272
}
@@ -1856,7 +1856,7 @@ public CompletableFuture<DeleteShareGroupOffsetsResponseData> deleteShareGroupOf
18561856
"initiate-delete-share-group-offsets",
18571857
groupId,
18581858
exception,
1859-
(error, message) -> DeleteShareGroupOffsetsRequest.getErrorDeleteResponseData(error, message),
1859+
DeleteShareGroupOffsetsRequest::getErrorDeleteResponseData,
18601860
log
18611861
));
18621862
}
@@ -2298,27 +2298,23 @@ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchExcept
22982298
) {
22992299
ApiError apiError = ApiError.fromThrowable(exception);
23002300

2301-
switch (apiError.error()) {
2302-
case UNKNOWN_TOPIC_OR_PARTITION:
2303-
case NOT_ENOUGH_REPLICAS:
2304-
case REQUEST_TIMED_OUT:
2305-
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
2306-
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
2307-
// additionally triggers coordinator re-lookup, which is necessary if the client is
2308-
// talking to a zombie coordinator.
2309-
//
2310-
// While handleOperationException does remap UNKNOWN_TOPIC_OR_PARTITION,
2311-
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
2312-
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
2313-
// 3.9.
2314-
return OffsetFetchResponse.groupError(
2315-
request,
2316-
Errors.NOT_COORDINATOR,
2317-
context.requestVersion()
2318-
);
2319-
2320-
default:
2321-
return handleOperationException(
2301+
return switch (apiError.error()) {
2302+
case UNKNOWN_TOPIC_OR_PARTITION, NOT_ENOUGH_REPLICAS, REQUEST_TIMED_OUT ->
2303+
// Remap REQUEST_TIMED_OUT to NOT_COORDINATOR, since consumers on versions prior
2304+
// to 3.9 do not expect the error and won't retry the request. NOT_COORDINATOR
2305+
// additionally triggers coordinator re-lookup, which is necessary if the client is
2306+
// talking to a zombie coordinator.
2307+
//
2308+
// While handleOperationException does remap UNKNOWN_TOPIC_OR_PARTITION,
2309+
// NOT_ENOUGH_REPLICAS and REQUEST_TIMED_OUT to COORDINATOR_NOT_AVAILABLE,
2310+
// COORDINATOR_NOT_AVAILABLE is also not handled by consumers on versions prior to
2311+
// 3.9.
2312+
OffsetFetchResponse.groupError(
2313+
request,
2314+
Errors.NOT_COORDINATOR,
2315+
context.requestVersion()
2316+
);
2317+
default -> handleOperationException(
23222318
operationName,
23232319
request,
23242320
exception,
@@ -2328,8 +2324,8 @@ private OffsetFetchResponseData.OffsetFetchResponseGroup handleOffsetFetchExcept
23282324
context.requestVersion()
23292325
),
23302326
log
2331-
);
2332-
}
2327+
);
2328+
};
23332329
}
23342330

23352331
private static void requireNonNull(Object obj, String msg) {

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3327,14 +3327,14 @@ private CoordinatorResult<Void, CoordinatorRecord> handleRegularExpressionsResul
33273327
.resolvedRegularExpression(regex)
33283328
.orElse(ResolvedRegularExpression.EMPTY);
33293329

3330-
if (!oldResolvedRegularExpression.topics.equals(newResolvedRegularExpression.topics)) {
3330+
if (!oldResolvedRegularExpression.topics().equals(newResolvedRegularExpression.topics())) {
33313331
bumpGroupEpoch = true;
33323332

3333-
oldResolvedRegularExpression.topics.forEach(topicName ->
3333+
oldResolvedRegularExpression.topics().forEach(topicName ->
33343334
subscribedTopicNames.compute(topicName, SubscriptionCount::decRegexCount)
33353335
);
33363336

3337-
newResolvedRegularExpression.topics.forEach(topicName ->
3337+
newResolvedRegularExpression.topics().forEach(topicName ->
33383338
subscribedTopicNames.compute(topicName, SubscriptionCount::incRegexCount)
33393339
);
33403340
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetExpirationConditionImpl.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,12 @@
1818

1919
import java.util.function.Function;
2020

21-
public class OffsetExpirationConditionImpl implements OffsetExpirationCondition {
22-
23-
/**
24-
* Given an offset and metadata, obtain the base timestamp that should be used
25-
* as the start of the offsets retention period.
26-
*/
27-
private final Function<OffsetAndMetadata, Long> baseTimestamp;
28-
29-
public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimestamp) {
30-
this.baseTimestamp = baseTimestamp;
31-
}
21+
/**
22+
* @param baseTimestamp Given an offset and metadata, obtain the base timestamp that should be used
23+
* as the start of the offsets retention period.
24+
*/
25+
public record OffsetExpirationConditionImpl(
26+
Function<OffsetAndMetadata, Long> baseTimestamp) implements OffsetExpirationCondition {
3227

3328
/**
3429
* Determine whether an offset is expired. Older versions have an expire timestamp per partition. If this
@@ -39,7 +34,6 @@ public OffsetExpirationConditionImpl(Function<OffsetAndMetadata, Long> baseTimes
3934
* @param offset The offset and metadata.
4035
* @param currentTimestampMs The current timestamp.
4136
* @param offsetsRetentionMs The offsets retention in milliseconds.
42-
*
4337
* @return Whether the given offset is expired or not.
4438
*/
4539
@Override
@@ -56,6 +50,7 @@ public boolean isOffsetExpired(OffsetAndMetadata offset, long currentTimestampMs
5650
/**
5751
* @return The base timestamp.
5852
*/
53+
@Override
5954
public Function<OffsetAndMetadata, Long> baseTimestamp() {
6055
return this.baseTimestamp;
6156
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ public OffsetMetadataManager build() {
201201

202202
/**
203203
* Tracks open transactions (producer ids) by group id, topic name and partition id.
204-
* It is the responsiblity of the caller to update {@link #pendingTransactionalOffsets}.
204+
* It is the responsibility of the caller to update {@link #pendingTransactionalOffsets}.
205205
*/
206206
private class OpenTransactions {
207207
/**

0 commit comments

Comments
 (0)