Skip to content

Integrate driver matrix fixes #561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,5 +107,11 @@ public void applyTo(BlockHound.Builder builder) {
builder.allowBlockingCallsInside("io.netty.util.concurrent.GlobalEventExecutor", "addTask");
builder.allowBlockingCallsInside(
"io.netty.util.concurrent.SingleThreadEventExecutor", "addTask");

// Exceptions for scylla-java-driver-matrix

// Various parallelizable tests sometimes fail due to ConcurrentMap's put.
builder.allowBlockingCallsInside(
"com.datastax.oss.driver.shaded.guava.common.collect.MapMakerInternalMap", "put");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.junit.Test;

public class ZeroTokenNodesIT {

@Before
public void checkScyllaVersion() {
// minOSS = "6.2.0",
Expand All @@ -45,7 +44,7 @@ public void checkScyllaVersion() {
public void should_not_ignore_zero_token_peer_when_option_is_enabled() {
CqlSession session = null;
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(3).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(3).build()) {
ccmBridge.create();
ccmBridge.startWithArgs("--wait-for-binary-proto");
ccmBridge.addWithoutStart(4, "dc1");
Expand All @@ -64,8 +63,13 @@ public void should_not_ignore_zero_token_peer_when_option_is_enabled() {
Collection<Node> nodes = session.getMetadata().getNodes().values();
Set<String> toStrings =
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());

assertThat(toStrings)
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
.containsOnly(
String.format("/%s:9042", ccmBridge.getNodeIpAddress(1)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(2)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(3)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(4)));
} finally {
if (session != null) session.close();
}
Expand All @@ -75,7 +79,7 @@ public void should_not_ignore_zero_token_peer_when_option_is_enabled() {
public void should_not_discover_zero_token_DC_when_option_is_disabled() {
CqlSession session = null;
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).build()) {
ccmBridge.create();
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.updateNodeConfig(4, "join_ring", false);
Expand All @@ -98,11 +102,15 @@ public void should_not_discover_zero_token_DC_when_option_is_disabled() {
Objects.requireNonNull(rs.one()).getInetAddress("rpc_address");
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
}
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
assertThat(landedOn)
.containsOnly("/" + ccmBridge.getNodeIpAddress(1), "/" + ccmBridge.getNodeIpAddress(2));
Collection<Node> nodes = session.getMetadata().getNodes().values();
Set<String> toStrings =
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042");
assertThat(toStrings)
.containsOnly(
String.format("/%s:9042", ccmBridge.getNodeIpAddress(1)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(2)));
} finally {
if (session != null) session.close();
}
Expand All @@ -112,7 +120,7 @@ public void should_not_discover_zero_token_DC_when_option_is_disabled() {
public void should_discover_zero_token_DC_when_option_is_enabled() {
CqlSession session = null;
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).build()) {
ccmBridge.create();
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.updateNodeConfig(4, "join_ring", false);
Expand All @@ -137,13 +145,18 @@ public void should_discover_zero_token_DC_when_option_is_enabled() {
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
}
// LBP should still target local datacenter:
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
assertThat(landedOn)
.containsOnly("/" + ccmBridge.getNodeIpAddress(1), "/" + ccmBridge.getNodeIpAddress(2));
Collection<Node> nodes = session.getMetadata().getNodes().values();
Set<String> toStrings =
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
// Metadata should have all nodes:
assertThat(toStrings)
.containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042", "/127.0.1.4:9042");
.containsOnly(
String.format("/%s:9042", ccmBridge.getNodeIpAddress(1)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(2)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(3)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(4)));
} finally {
if (session != null) session.close();
}
Expand All @@ -153,7 +166,7 @@ public void should_discover_zero_token_DC_when_option_is_enabled() {
public void should_connect_to_zero_token_contact_point() {
CqlSession session = null;
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2).build()) {
ccmBridge.create();
ccmBridge.startWithArgs("--wait-for-binary-proto");
ccmBridge.addWithoutStart(3, "dc1");
Expand All @@ -170,7 +183,11 @@ public void should_connect_to_zero_token_contact_point() {
Collection<Node> nodes = session.getMetadata().getNodes().values();
Set<String> toStrings =
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
assertThat(toStrings)
.containsOnly(
String.format("/%s:9042", ccmBridge.getNodeIpAddress(1)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(2)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(3)));
} finally {
if (session != null) session.close();
}
Expand All @@ -183,7 +200,7 @@ public void should_connect_to_zero_token_DC() {
// method.
CqlSession session = null;
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder();
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = ccmBridgeBuilder.withNodes(2, 2).build()) {
ccmBridge.create();
ccmBridge.updateNodeConfig(3, "join_ring", false);
ccmBridge.updateNodeConfig(4, "join_ring", false);
Expand All @@ -208,12 +225,17 @@ public void should_connect_to_zero_token_DC() {
landedOn.add(Objects.requireNonNull(broadcastRpcInetAddress).toString());
}
// LBP should still target local datacenter:
assertThat(landedOn).containsOnly("/127.0.1.1", "/127.0.1.2");
assertThat(landedOn)
.containsOnly("/" + ccmBridge.getNodeIpAddress(1), "/" + ccmBridge.getNodeIpAddress(2));
Collection<Node> nodes = session.getMetadata().getNodes().values();
Set<String> toStrings =
nodes.stream().map(Node::getEndPoint).map(EndPoint::toString).collect(Collectors.toSet());
// Metadata should have valid ordinary peers plus zero-token contact point:
assertThat(toStrings).containsOnly("/127.0.1.1:9042", "/127.0.1.2:9042", "/127.0.1.3:9042");
assertThat(toStrings)
.containsOnly(
String.format("/%s:9042", ccmBridge.getNodeIpAddress(1)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(2)),
String.format("/%s:9042", ccmBridge.getNodeIpAddress(3)));
} finally {
if (session != null) session.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,13 @@ public void stopCapturingLogs() {
public void should_initialize_all_channels(boolean reuseAddress) {
Map<Pattern, Integer> expectedOccurences =
ImmutableMap.of(
Pattern.compile(
".*127\\.0\\.0\\.2:19042.*Reconnection attempt complete, 6/6 channels.*"),
1,
Pattern.compile(
".*127\\.0\\.0\\.1:19042.*Reconnection attempt complete, 6/6 channels.*"),
1,
Pattern.compile(".*\\.2:19042.*Reconnection attempt complete, 6/6 channels.*"), 1,
Pattern.compile(".*\\.1:19042.*Reconnection attempt complete, 6/6 channels.*"), 1,
Pattern.compile(".*Reconnection attempt complete.*"), 2,
Pattern.compile(".*127\\.0\\.0\\.1:19042.*New channel added \\[.*"), 5,
Pattern.compile(".*127\\.0\\.0\\.2:19042.*New channel added \\[.*"), 5,
Pattern.compile(".*127\\.0\\.0\\.1:19042\\] Trying to create 5 missing channels.*"), 1,
Pattern.compile(".*127\\.0\\.0\\.2:19042\\] Trying to create 5 missing channels.*"), 1);
Pattern.compile(".*\\.1:19042.*New channel added \\[.*"), 5,
Pattern.compile(".*\\.2:19042.*New channel added \\[.*"), 5,
Pattern.compile(".*\\.1:19042\\] Trying to create 5 missing channels.*"), 1,
Pattern.compile(".*\\.2:19042\\] Trying to create 5 missing channels.*"), 1);
DriverConfigLoader loader =
SessionUtils.configLoaderBuilder()
.withBoolean(DefaultDriverOption.SOCKET_REUSE_ADDRESS, reuseAddress)
Expand Down Expand Up @@ -204,20 +200,15 @@ public void should_not_struggle_to_fill_pools() {
int tolerance = 2; // Sometimes socket ends up already in use
Map<Pattern, Integer> expectedOccurences =
ImmutableMap.of(
Pattern.compile(
".*127\\.0\\.0\\.2:19042.*Reconnection attempt complete, 66/66 channels.*"),
Pattern.compile(".*\\.2:19042.*Reconnection attempt complete, 66/66 channels.*"),
1 * sessions,
Pattern.compile(
".*127\\.0\\.0\\.1:19042.*Reconnection attempt complete, 66/66 channels.*"),
Pattern.compile(".*\\.1:19042.*Reconnection attempt complete, 66/66 channels.*"),
1 * sessions,
Pattern.compile(".*Reconnection attempt complete.*"), 2 * sessions,
Pattern.compile(".*127\\.0\\.0\\.1:19042.*New channel added \\[.*"),
65 * sessions - tolerance,
Pattern.compile(".*127\\.0\\.0\\.2:19042.*New channel added \\[.*"),
65 * sessions - tolerance,
Pattern.compile(".*127\\.0\\.0\\.1:19042\\] Trying to create 65 missing channels.*"),
1 * sessions,
Pattern.compile(".*127\\.0\\.0\\.2:19042\\] Trying to create 65 missing channels.*"),
Pattern.compile(".*.1:19042.*New channel added \\[.*"), 65 * sessions - tolerance,
Pattern.compile(".*.2:19042.*New channel added \\[.*"), 65 * sessions - tolerance,
Pattern.compile(".*.1:19042\\] Trying to create 65 missing channels.*"), 1 * sessions,
Pattern.compile(".*.2:19042\\] Trying to create 65 missing channels.*"),
1 * sessions);
expectedOccurences.forEach(
(pattern, times) -> assertMatchesAtLeast(pattern, times, appender.list));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class MockResolverIT {

@Test
public void should_connect_with_mocked_hostname() {
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder().withNodes(1).withIpPrefix("127.0.1.");
CcmBridge.Builder ccmBridgeBuilder = CcmBridge.builder().withNodes(1);
try (CcmBridge ccmBridge = ccmBridgeBuilder.build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
Expand Down Expand Up @@ -258,7 +258,7 @@ public void cannot_reconnect_with_resolved_socket() {
CqlSession session;
Collection<Node> nodes;
Set<Node> filteredNodes;
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.0.1.").build()) {
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
Expand Down Expand Up @@ -319,8 +319,7 @@ public void cannot_reconnect_with_resolved_socket() {
LOG.warn(
"Launching another cluster until we lose resolved socket from metadata (run {}).",
counter);
try (CcmBridge ccmBridge =
CcmBridge.builder().withNodes(3).withIpPrefix("127.0." + counter + ".").build()) {
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
Expand Down Expand Up @@ -374,7 +373,7 @@ public void cannot_reconnect_with_resolved_socket() {
InetSocketAddress address = (InetSocketAddress) iterator.next().getEndPoint().resolve();
assertFalse(address.isUnresolved());
}
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).withIpPrefix("127.1.1.").build()) {
try (CcmBridge ccmBridge = CcmBridge.builder().withNodes(3).build()) {
MultimapHostResolverProvider.removeResolverEntries("test.cluster.fake");
MultimapHostResolverProvider.addResolverEntry(
"test.cluster.fake", ccmBridge.getNodeIpAddress(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.Version;
import com.datastax.oss.driver.api.core.metadata.EndPoint;
import com.datastax.oss.driver.api.testinfra.CassandraResourceRule;
import com.datastax.oss.driver.api.testinfra.CassandraSkip;
import com.datastax.oss.driver.api.testinfra.ScyllaRequirement;
import com.datastax.oss.driver.api.testinfra.ScyllaSkip;
import com.datastax.oss.driver.api.testinfra.requirement.BackendRequirementRule;
import com.datastax.oss.driver.api.testinfra.requirement.BackendType;
import com.datastax.oss.driver.internal.core.metadata.DefaultEndPoint;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.junit.AssumptionViolatedException;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
Expand Down Expand Up @@ -211,4 +216,10 @@ public ProtocolVersion getHighestProtocolVersion() {
return DefaultProtocolVersion.V3;
}
}

@Override
public Set<EndPoint> getContactPoints() {
return Collections.singleton(
new DefaultEndPoint(new InetSocketAddress(ccmBridge.getNodeIpAddress(1), 9042)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.exec.CommandLine;
import org.apache.commons.exec.DefaultExecutor;
Expand All @@ -62,6 +63,7 @@
public class CcmBridge implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(CcmBridge.class);
private static final AtomicInteger CLUSTER_ID = new AtomicInteger();

public static BackendType DISTRIBUTION =
BackendType.valueOf(
Expand Down Expand Up @@ -202,7 +204,14 @@ private CcmBridge(
} else {
this.nodes = nodes;
}
this.ipPrefix = ipPrefix;

if (ipPrefix == null || ipPrefix.isEmpty()) {
Integer clusterId = CLUSTER_ID.addAndGet(1);
this.ipPrefix = String.format("127.%d.%d.", clusterId / 255, (clusterId % 255) + 1);
} else {
this.ipPrefix = ipPrefix;
}

this.cassandraConfiguration = cassandraConfiguration;
this.dseConfiguration = dseConfiguration;
this.rawDseYaml = dseConfigurationRawYaml;
Expand Down Expand Up @@ -663,7 +672,7 @@ public static class Builder {
private final Map<String, Object> dseConfiguration = new LinkedHashMap<>();
private final List<String> dseRawYaml = new ArrayList<>();
private final List<String> jvmArgs = new ArrayList<>();
private String ipPrefix = "127.0.0.";
private String ipPrefix;
private final List<String> createOptions = new ArrayList<>();
private final List<String> dseWorkloads = new ArrayList<>();

Expand Down
Loading