Skip to content

Commit 93adaea

Browse files
authored
KAFKA-19523: Gracefully handle error while building remoteLogAuxState (#20201)
Improve the error handling while building the remote-log-auxiliary state when a follower node with an empty disk begin to synchronise with the leader. If the topic has remote storage enabled, then the ReplicaFetcherThread attempt to build the remote-log-auxiliary state. Note that the remote-log-auxiliary state gets invoked only when the leader-log-start-offset is non-zero and leader-log-start-offset is not equal to leader-local-log-start-offset. When the LeaderAndISR request is received, then the ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially, followed by the RemoteLogManager#onLeadershipChange call. As a result, when ReplicaFetcherThread initiates the RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been initialized at that time and throws retriable exception. Introduced RetriableRemoteStorageException to gracefully handle the error. After the patch: ``` [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Could not build remote log auxiliary state for orange-1 due to error: RemoteLogManager is not ready for partition: orange-1 (kafka.server.ReplicaFetcherThread) [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2, fetcherId=0] Could not build remote log auxiliary state for orange-0 due to error: RemoteLogManager is not ready for partition: orange-0 (kafka.server.ReplicaFetcherThread) ``` Reviewers: Luke Chen <[email protected]>, Satish Duggana <[email protected]>
1 parent 0086f24 commit 93adaea

File tree

7 files changed

+110
-2
lines changed

7 files changed

+110
-2
lines changed

core/src/main/java/kafka/server/TierStateMachine.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
3636
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
3737
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
38+
import org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException;
3839
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
3940
import org.apache.kafka.storage.internals.log.EpochEntry;
4041
import org.apache.kafka.storage.internals.log.LogFileUtils;
@@ -230,6 +231,10 @@ private Long buildRemoteLogAuxState(TopicPartition topicPartition,
230231
}
231232
}
232233

234+
if (!rlm.isPartitionReady(topicPartition)) {
235+
throw new RemoteStorageNotReadyException("RemoteLogManager is not ready for partition: " + topicPartition);
236+
}
237+
233238
RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
234239
.orElseThrow(() -> buildRemoteStorageException(topicPartition, targetEpoch, currentLeaderEpoch,
235240
leaderLocalLogStartOffset, leaderLogStartOffset));

core/src/main/scala/kafka/server/AbstractFetcherThread.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint
3636
import org.apache.kafka.server.ResultWithPartitions
3737
import org.apache.kafka.server.ReplicaState
3838
import org.apache.kafka.server.PartitionFetchState
39+
import org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
3940
import org.apache.kafka.server.metrics.KafkaMetricsGroup
4041
import org.apache.kafka.server.util.ShutdownableThread
4142
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String,
796797
onPartitionFenced(topicPartition, leaderEpochInRequest)
797798
case e@(_: UnknownTopicOrPartitionException |
798799
_: UnknownLeaderEpochException |
799-
_: NotLeaderOrFollowerException) =>
800+
_: NotLeaderOrFollowerException |
801+
_: RetriableRemoteStorageException) =>
800802
info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}")
801803
false
802804
case e: Throwable =>

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3717,6 +3717,7 @@ class ReplicaManagerTest {
37173717
val storageManager = mock(classOf[RemoteStorageManager])
37183718
when(storageManager.fetchIndex(any(), any())).thenReturn(new ByteArrayInputStream("0".getBytes()))
37193719
when(remoteLogManager.storageManager()).thenReturn(storageManager)
3720+
when(remoteLogManager.isPartitionReady(any())).thenReturn(true)
37203721

37213722
val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, shouldMockLog = true, remoteLogManager = Some(remoteLogManager), buildRemoteLogAuxState = true)
37223723
try {
@@ -3775,7 +3776,7 @@ class ReplicaManagerTest {
37753776
replicaManager.applyDelta(delta, leaderMetadataImage)
37763777

37773778
// Replicas fetch from the leader periodically, therefore we check that the metric value is increasing
3778-
// We expect failedBuildRemoteLogAuxStateRate to increase because there is no remoteLogSegmentMetadata
3779+
// We expect failedBuildRemoteLogAuxStateRate to increase because the RemoteLogManager is not ready for the tp0
37793780
// when attempting to build log aux state
37803781
TestUtils.waitUntilTrue(() => brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count > 0,
37813782
"Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.log.remote.storage;
18+
19+
/**
20+
* This exception is thrown when a remote storage operation cannot proceed because the remote storage is not ready.
21+
* This may occur in situations where the remote storage (or) metadata layer is initializing, unreachable,
22+
* or temporarily unavailable.
23+
* <p>
24+
* Instances of this exception indicate that the error is retriable, and the operation might
25+
* succeed if attempted again when the remote storage (or) metadata layer becomes operational.
26+
*/
27+
public class RemoteStorageNotReadyException extends RetriableRemoteStorageException {
28+
29+
public RemoteStorageNotReadyException(String message) {
30+
super(message);
31+
}
32+
33+
public RemoteStorageNotReadyException(String message, Throwable cause) {
34+
super(message, cause);
35+
}
36+
37+
public RemoteStorageNotReadyException(Throwable cause) {
38+
super(cause);
39+
}
40+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.log.remote.storage;
18+
19+
/**
20+
* Represents an exception that indicates a retriable error occurred during remote storage operations.
21+
* This exception is thrown when an operation against a remote storage system has failed due to transient
22+
* or temporary issues, and the operation has a reasonable chance of succeeding if retried.
23+
*/
24+
public class RetriableRemoteStorageException extends RemoteStorageException {
25+
26+
private static final long serialVersionUID = 1L;
27+
28+
public RetriableRemoteStorageException(String message) {
29+
super(message);
30+
}
31+
32+
public RetriableRemoteStorageException(String message, Throwable cause) {
33+
super(message, cause);
34+
}
35+
36+
public RetriableRemoteStorageException(Throwable cause) {
37+
super(cause);
38+
}
39+
}

storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,15 @@ RLMTask rlmCopyTask(TopicIdPartition topicIdPartition) {
788788
return null;
789789
}
790790

791+
public boolean isPartitionReady(TopicPartition partition) {
792+
Uuid uuid = topicIdByPartitionMap.get(partition);
793+
if (uuid == null) {
794+
return false;
795+
}
796+
TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, partition);
797+
return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition);
798+
}
799+
791800
abstract class RLMTask extends CancellableRunnable {
792801

793802
protected final TopicIdPartition topicIdPartition;

storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3726,6 +3726,18 @@ long findLogStartOffset(TopicIdPartition topicIdPartition, UnifiedLog log) {
37263726
verifyNoMoreInteractions(remoteStorageManager);
37273727
}
37283728

3729+
@Test
3730+
public void testIsPartitionReady() throws InterruptedException {
3731+
assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
3732+
remoteLogManager.onLeadershipChange(
3733+
Set.of(mockPartition(leaderTopicIdPartition)),
3734+
Set.of(mockPartition(followerTopicIdPartition)),
3735+
topicIds
3736+
);
3737+
assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
3738+
assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition()));
3739+
}
3740+
37293741
@Test
37303742
public void testMonitorableRemoteLogStorageManager() throws IOException {
37313743
Properties props = new Properties();

0 commit comments

Comments
 (0)