Skip to content

(WIP) MINOR: Migrate EligibleLeaderReplicasIntegrationTestto use new test infra #20199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
* limitations under the License.
*/
package kafka.server.integration;
import kafka.integration.KafkaServerTestHarness;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import kafka.utils.Logging;
import kafka.utils.TestUtils;

import org.apache.kafka.clients.CommonClientConfigs;
Expand All @@ -41,16 +38,20 @@
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.TestInfo;

import java.io.File;
Expand All @@ -65,68 +66,43 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashMap;

import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarness implements Logging {
public class EligibleLeaderReplicasIntegrationTest {
private String bootstrapServer;
private String testTopicName;
private Admin adminClient;

@Override
public MetadataVersion metadataVersion() {
return MetadataVersion.IBP_4_0_IV1;
}
private final ClusterInstance clusterInstance;

@Override
public Seq<KafkaConfig> generateConfigs() {
List<Properties> brokerConfigs = new ArrayList<>();
brokerConfigs.addAll(scala.collection.JavaConverters.seqAsJavaList(TestUtils.createBrokerConfigs(
5, // The tests require 4 brokers to host the partition. However, we need the 5th broker to handle the admin client requests.
true,
true,
scala.Option.<SecurityProtocol>empty(),
scala.Option.<File>empty(),
scala.Option.<Properties>empty(),
true,
false,
false,
false,
new HashMap<>(),
1,
false,
1,
(short) 4,
0,
false
)));
List<KafkaConfig> configs = new ArrayList<>();
for (Properties props : brokerConfigs) {
configs.add(KafkaConfig.fromProps(props));
}
return JavaConverters.asScalaBuffer(configs).toSeq();
}
@ClusterTest(
types = {Type.KRAFT},
metadataVersion = MetadataVersion.IBP_4_0_IV1,
brokers = 5,
serverProperties = {
@ClusterConfigProperty(key = ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, value = "true"),
@ClusterConfigProperty(key = ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, value = "4"),
}
)

@BeforeEach
@Override
public void setUp(TestInfo info) {
super.setUp(info);
// create adminClient
Properties props = new Properties();
bootstrapServer = bootstrapServers(listenerName());
//bootstrapServer = bootstrapServers(listenerName());
bootstrapServer = clusterInstance.bootstrapServers(listenerName());
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
adminClient = Admin.create(props);
adminClient.updateFeatures(
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()
Map.of(EligibleLeaderReplicasVersion.FEATURE_NAME,
new FeatureUpdate(EligibleLeaderReplicasVersion.ELRV_1.featureLevel(), FeatureUpdate.UpgradeType.UPGRADE)),
new UpdateFeaturesOptions()
);
testTopicName = String.format("%s-%s", info.getTestMethod().get().getName(), "ELR-test");
}
Expand All @@ -136,11 +112,12 @@ public void close() throws Exception {
if (adminClient != null) adminClient.close();
}

@Test
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionException, InterruptedException {
@ClusterTest
public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Seq<KafkaBroker> brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq();
TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000);

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
Expand Down Expand Up @@ -180,8 +157,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc
producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get();
waitUntilOneMessageIsConsumed(consumer);

killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
clusterInstance.shutdownBroker(initialReplicas.get(0).id());
clusterInstance.shutdownBroker(initialReplicas.get(1).id());

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 2 && elrSize == 1;
Expand All @@ -193,15 +170,15 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc
assertEquals(0, consumer.poll(Duration.ofSeconds(1L)).count());

// Restore the min ISR and the previous log should be visible.
startBroker(initialReplicas.get(1).id());
startBroker(initialReplicas.get(0).id());
clusterInstance.startBroker(initialReplicas.get(1).id());
clusterInstance.startBroker(initialReplicas.get(0).id());
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 4 && elrSize == 0;
});

waitUntilOneMessageIsConsumed(consumer);
} finally {
restartDeadBrokers(false);
clusterInstance.restartDeadBrokers();
if (consumer != null) consumer.close();
if (producer != null) producer.close();
}
Expand All @@ -222,11 +199,12 @@ void waitUntilOneMessageIsConsumed(Consumer consumer) {
);
}

@Test
public void testElrMemberCanBeElected() throws ExecutionException, InterruptedException {
@ClusterTest
public void testElrMemberCanBeElected(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Seq<KafkaBroker> brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq();
TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000);

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
Expand All @@ -244,15 +222,15 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());

killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
clusterInstance.shutdownBroker(initialReplicas.get(0).id());
clusterInstance.shutdownBroker(initialReplicas.get(1).id());
clusterInstance.shutdownBroker(initialReplicas.get(2).id());

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 1 && elrSize == 2;
});

killBroker(initialReplicas.get(3).id());
clusterInstance.shutdownBroker(initialReplicas.get(3).id());

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
Expand All @@ -270,7 +248,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx
int expectLeader = topicPartitionInfo.elr().stream()
.filter(node -> node.id() != expectLastKnownLeader).toList().get(0).id();

startBroker(expectLeader);
clusterInstance.startBroker(expectLeader);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 1 && elrSize == 2;
});
Expand All @@ -282,7 +260,7 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx

// Start another 2 brokers and the ELR fields should be cleaned.
topicPartitionInfo.replicas().stream().filter(node -> node.id() != expectLeader).limit(2)
.forEach(node -> startBroker(node.id()));
.forEach(node -> clusterInstance.startBroker(node.id()));

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 3 && elrSize == 0;
Expand All @@ -293,15 +271,16 @@ public void testElrMemberCanBeElected() throws ExecutionException, InterruptedEx
assertEquals(0, topicPartitionInfo.lastKnownElr().size(), topicPartitionInfo.toString());
assertEquals(expectLeader, topicPartitionInfo.leader().id(), topicPartitionInfo.toString());
} finally {
restartDeadBrokers(false);
clusterInstance.restartDeadBrokers();
}
}

@Test
public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionException, InterruptedException {
@ClusterTest
public void testElrMemberShouldBeKickOutWhenUncleanShutdown(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Seq<KafkaBroker> brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq();
TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000);

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
Expand All @@ -319,10 +298,10 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());

killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
killBroker(initialReplicas.get(3).id());
clusterInstance.shutdownBroker(initialReplicas.get(0).id());
clusterInstance.shutdownBroker(initialReplicas.get(1).id());
clusterInstance.shutdownBroker(initialReplicas.get(2).id());
clusterInstance.shutdownBroker(initialReplicas.get(3).id());

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
Expand All @@ -331,17 +310,16 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx
.allTopicNames().get().get(testTopicName).partitions().get(0);

int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id();
KafkaBroker broker = brokers().find(b -> {
return b.config().brokerId() == brokerToBeUncleanShutdown;
}).get();
KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown).findFirst()
.orElseThrow(() -> new RuntimeException("No broker found"));
Seq<File> dirs = broker.logManager().liveLogDirs();
assertEquals(1, dirs.size());
CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString());
assertTrue(handler.exists());
assertDoesNotThrow(() -> handler.delete());

// After remove the clean shutdown file, the broker should report unclean shutdown during restart.
startBroker(brokerToBeUncleanShutdown);
clusterInstance.startBroker(brokerToBeUncleanShutdown);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 2;
});
Expand All @@ -350,18 +328,19 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx
assertNull(topicPartitionInfo.leader());
assertEquals(1, topicPartitionInfo.lastKnownElr().size());
} finally {
restartDeadBrokers(false);
clusterInstance.restartDeadBrokers();
}
}

/*
This test is only valid for KIP-966 part 1. When the unclean recovery is implemented, it should be removed.
*/
@Test
public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionException, InterruptedException {
@ClusterTest
public void testLastKnownLeaderShouldBeElectedIfEmptyElr(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException {
adminClient.createTopics(
List.of(new NewTopic(testTopicName, 1, (short) 4))).all().get();
TestUtils.waitForPartitionMetadata(brokers(), testTopicName, 0, 1000);
Seq<KafkaBroker> brokerSeq = scala.collection.JavaConverters.asScalaBuffer(new ArrayList<>(clusterInstance.brokers().values())).toSeq();
TestUtils.waitForPartitionMetadata(brokerSeq, testTopicName, 0, 1000);

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, testTopicName);
Collection<AlterConfigOp> ops = new ArrayList<>();
Expand All @@ -379,10 +358,10 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep
assertEquals(0, topicPartitionInfo.elr().size());
assertEquals(0, topicPartitionInfo.lastKnownElr().size());

killBroker(initialReplicas.get(0).id());
killBroker(initialReplicas.get(1).id());
killBroker(initialReplicas.get(2).id());
killBroker(initialReplicas.get(3).id());
clusterInstance.shutdownBroker(initialReplicas.get(0).id());
clusterInstance.shutdownBroker(initialReplicas.get(1).id());
clusterInstance.shutdownBroker(initialReplicas.get(2).id());
clusterInstance.shutdownBroker(initialReplicas.get(3).id());

waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 3;
Expand All @@ -392,20 +371,18 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep
int lastKnownLeader = topicPartitionInfo.lastKnownElr().get(0).id();

Set<Integer> initialReplicaSet = initialReplicas.stream().map(node -> node.id()).collect(Collectors.toSet());
brokers().foreach(broker -> {
for (KafkaBroker broker : clusterInstance.brokers().values()) {
if (initialReplicaSet.contains(broker.config().brokerId())) {
Seq<File> dirs = broker.logManager().liveLogDirs();
assertEquals(1, dirs.size());
CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString());
assertDoesNotThrow(() -> handler.delete());
}
return true;
});

}

// After remove the clean shutdown file, the broker should report unclean shutdown during restart.
topicPartitionInfo.replicas().forEach(replica -> {
if (replica.id() != lastKnownLeader) startBroker(replica.id());
if (replica.id() != lastKnownLeader) clusterInstance.startBroker(replica.id());
});
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize == 0 && elrSize == 1;
Expand All @@ -416,7 +393,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep
assertEquals(1, topicPartitionInfo.lastKnownElr().size());

// Now if the last known leader goes through unclean shutdown, it will still be elected.
startBroker(lastKnownLeader);
clusterInstance.startBroker(lastKnownLeader);
waitForIsrAndElr((isrSize, elrSize) -> {
return isrSize > 0 && elrSize == 0;
});
Expand All @@ -436,7 +413,7 @@ public void testLastKnownLeaderShouldBeElectedIfEmptyElr() throws ExecutionExcep
DEFAULT_MAX_WAIT_MS, 100L
);
} finally {
restartDeadBrokers(false);
clusterInstance.restartDeadBrokers();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import scala.collection.Seq;
import scala.jdk.javaapi.CollectionConverters;
import scala.collection.JavaConverters;

import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
import static org.apache.kafka.clients.consumer.GroupProtocol.CONSUMER;
Expand Down Expand Up @@ -444,6 +446,21 @@ default List<Integer> boundPorts() {
.map(KafkaBroker::socketServer)
.map(s -> s.boundPort(clientListener()))
.toList();
}

default void restartDeadBrokers() {
for (Map.Entry<Integer, KafkaBroker> entry : brokers().entrySet()) {
int brokerId = entry.getKey();
KafkaBroker broker = entry.getValue();

if (broker.isShutdown()) {
startBroker(brokerId);
}
}
}

default String bootstrapServers(ListenerName listenerName){
Seq<KafkaBroker> brokerSeq = new ArrayList<>(brokers().values()).asScala().toSeq();
kafka.utils.TestUtils.bootstrapServers(brokerSeq, listenerName);
}
}