Skip to content

Commit 75f6c99

Browse files
author
Zhen Li
authored
Merge pull request #298 from neo4j/1.1-support-member-stop-kill-CC-tests
Support stop/kill of cluster members in CC tests
2 parents fbdfd00 + 3d0fdca commit 75f6c99

File tree

5 files changed

+244
-29
lines changed

5 files changed

+244
-29
lines changed

driver/src/test/java/org/neo4j/driver/v1/integration/CausalClusteringIT.java

Lines changed: 64 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,11 @@
6262
import static org.junit.Assert.assertThat;
6363
import static org.junit.Assert.assertTrue;
6464
import static org.junit.Assert.fail;
65+
import static org.neo4j.driver.v1.Values.parameters;
6566

6667
public class CausalClusteringIT
6768
{
68-
private static final long DEFAULT_TIMEOUT_MS = 15_000;
69+
private static final long DEFAULT_TIMEOUT_MS = 120_000;
6970

7071
@Rule
7172
public final ClusterRule clusterRule = new ClusterRule();
@@ -293,6 +294,52 @@ public void beginTransactionThrowsForUnreachableBookmark()
293294
}
294295
}
295296

297+
@Test
298+
public void shouldHandleGracefulLeaderSwitch() throws Exception
299+
{
300+
Cluster cluster = clusterRule.getCluster();
301+
ClusterMember leader = cluster.leader();
302+
303+
try ( Driver driver = createDriver( leader.getRoutingUri() ) )
304+
{
305+
Session session1 = driver.session();
306+
Transaction tx1 = session1.beginTransaction();
307+
308+
// gracefully stop current leader to force re-election
309+
cluster.stop( leader );
310+
311+
tx1.run( "CREATE (person:Person {name: {name}, title: {title}})",
312+
parameters( "name", "Webber", "title", "Mr" ) );
313+
tx1.success();
314+
315+
closeAndExpectException( tx1, ClientException.class );
316+
closeAndExpectException( session1, ClientException.class );
317+
318+
String bookmark = inExpirableSession( driver, createSession(), new Function<Session,String>()
319+
{
320+
@Override
321+
public String apply( Session session )
322+
{
323+
try ( Transaction tx = session.beginTransaction() )
324+
{
325+
tx.run( "CREATE (person:Person {name: {name}, title: {title}})",
326+
parameters( "name", "Webber", "title", "Mr" ) );
327+
tx.success();
328+
}
329+
return session.lastBookmark();
330+
}
331+
} );
332+
333+
try ( Session session2 = driver.session( AccessMode.READ );
334+
Transaction tx2 = session2.beginTransaction( bookmark ) )
335+
{
336+
Record record = tx2.run( "MATCH (n:Person) RETURN COUNT(*) AS count" ).next();
337+
tx2.success();
338+
assertEquals( 1, record.get( "count" ).asInt() );
339+
}
340+
}
341+
}
342+
296343
private int executeWriteAndReadThroughBolt( ClusterMember member ) throws TimeoutException, InterruptedException
297344
{
298345
try ( Driver driver = createDriver( member.getRoutingUri() ) )
@@ -350,10 +397,12 @@ private <T> T inExpirableSession( Driver driver, Function<Driver,Session> acquir
350397
{
351398
return op.apply( session );
352399
}
353-
catch ( SessionExpiredException e )
400+
catch ( SessionExpiredException | ServiceUnavailableException e )
354401
{
355402
// role might have changed; try again;
356403
}
404+
405+
Thread.sleep( 500 );
357406
}
358407
while ( System.currentTimeMillis() < endTime );
359408

@@ -408,4 +457,17 @@ public Void call() throws Exception
408457
executor.shutdown();
409458
assertTrue( executor.awaitTermination( 1, TimeUnit.MINUTES ) );
410459
}
460+
461+
private static void closeAndExpectException( AutoCloseable closeable, Class<? extends Exception> exceptionClass )
462+
{
463+
try
464+
{
465+
closeable.close();
466+
fail( "Exception expected" );
467+
}
468+
catch ( Exception e )
469+
{
470+
assertThat( e, instanceOf( exceptionClass ) );
471+
}
472+
}
411473
}

driver/src/test/java/org/neo4j/driver/v1/util/cc/Cluster.java

Lines changed: 142 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,13 @@
4343
public class Cluster
4444
{
4545
private static final String ADMIN_USER = "neo4j";
46-
private static final int STARTUP_TIMEOUT_SECONDS = 60;
46+
private static final int STARTUP_TIMEOUT_SECONDS = 90;
47+
private static final int ONLINE_MEMBERS_CHECK_SLEEP_MS = 500;
4748

4849
private final Path path;
4950
private final String password;
5051
private final Set<ClusterMember> members;
52+
private final Set<ClusterMember> offlineMembers;
5153

5254
public Cluster( Path path, String password )
5355
{
@@ -59,11 +61,12 @@ private Cluster( Path path, String password, Set<ClusterMember> members )
5961
this.path = path;
6062
this.password = password;
6163
this.members = members;
64+
this.offlineMembers = new HashSet<>();
6265
}
6366

6467
Cluster withMembers( Set<ClusterMember> newMembers ) throws ClusterUnavailableException
6568
{
66-
waitForMembers( newMembers, password );
69+
waitForMembersToBeOnline( newMembers, password );
6770
return new Cluster( path, password, newMembers );
6871
}
6972

@@ -126,6 +129,35 @@ public Set<ClusterMember> readReplicas()
126129
return membersWithRole( ClusterMemberRole.READ_REPLICA );
127130
}
128131

132+
public void start( ClusterMember member )
133+
{
134+
startNoWait( member );
135+
waitForMembersToBeOnline();
136+
}
137+
138+
public void startOfflineMembers()
139+
{
140+
for ( ClusterMember member : offlineMembers )
141+
{
142+
startNoWait( member );
143+
}
144+
waitForMembersToBeOnline();
145+
}
146+
147+
public void stop( ClusterMember member )
148+
{
149+
removeOfflineMember( member );
150+
SharedCluster.stop( member );
151+
waitForMembersToBeOnline();
152+
}
153+
154+
public void kill( ClusterMember member )
155+
{
156+
removeOfflineMember( member );
157+
SharedCluster.kill( member );
158+
waitForMembersToBeOnline();
159+
}
160+
129161
@Override
130162
public String toString()
131163
{
@@ -135,6 +167,30 @@ public String toString()
135167
"}";
136168
}
137169

170+
private void addOfflineMember( ClusterMember member )
171+
{
172+
if ( !offlineMembers.remove( member ) )
173+
{
174+
throw new IllegalArgumentException( "Cluster member is not offline: " + member );
175+
}
176+
members.add( member );
177+
}
178+
179+
private void removeOfflineMember( ClusterMember member )
180+
{
181+
if ( !members.remove( member ) )
182+
{
183+
throw new IllegalArgumentException( "Unknown cluster member " + member );
184+
}
185+
offlineMembers.add( member );
186+
}
187+
188+
private void startNoWait( ClusterMember member )
189+
{
190+
addOfflineMember( member );
191+
SharedCluster.start( member );
192+
}
193+
138194
private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
139195
{
140196
Set<ClusterMember> membersWithRole = new HashSet<>();
@@ -166,41 +222,57 @@ private Set<ClusterMember> membersWithRole( ClusterMemberRole role )
166222
return membersWithRole;
167223
}
168224

169-
private static Set<ClusterMember> waitForMembers( Set<ClusterMember> members, String password )
225+
private void waitForMembersToBeOnline()
226+
{
227+
try
228+
{
229+
waitForMembersToBeOnline( members, password );
230+
}
231+
catch ( ClusterUnavailableException e )
232+
{
233+
throw new RuntimeException( e );
234+
}
235+
}
236+
237+
private static void waitForMembersToBeOnline( Set<ClusterMember> members, String password )
170238
throws ClusterUnavailableException
171239
{
172240
if ( members.isEmpty() )
173241
{
174242
throw new IllegalArgumentException( "No members to wait for" );
175243
}
176244

177-
Set<ClusterMember> offlineMembers = new HashSet<>( members );
245+
Set<URI> expectedOnlineUris = extractBoltUris( members );
246+
Set<URI> actualOnlineUris = Collections.emptySet();
247+
178248
long deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis( STARTUP_TIMEOUT_SECONDS );
249+
Throwable error = null;
179250

180-
try ( Driver driver = createDriver( members, password ) )
251+
while ( !expectedOnlineUris.equals( actualOnlineUris ) )
181252
{
182-
while ( !offlineMembers.isEmpty() )
253+
sleep( ONLINE_MEMBERS_CHECK_SLEEP_MS );
254+
assertDeadlineNotReached( deadline, expectedOnlineUris, actualOnlineUris, error );
255+
256+
try ( Driver driver = createDriver( members, password );
257+
Session session = driver.session( AccessMode.READ ) )
183258
{
184-
assertDeadlineNotReached( deadline );
259+
List<Record> records = findClusterOverview( session );
260+
actualOnlineUris = extractBoltUris( records );
261+
}
262+
catch ( Throwable t )
263+
{
264+
t.printStackTrace();
185265

186-
try ( Session session = driver.session( AccessMode.READ ) )
266+
if ( error == null )
187267
{
188-
List<Record> records = findClusterOverview( session );
189-
for ( Record record : records )
190-
{
191-
URI boltUri = extractBoltUri( record );
192-
193-
ClusterMember member = findByBoltUri( boltUri, offlineMembers );
194-
if ( member != null )
195-
{
196-
offlineMembers.remove( member );
197-
}
198-
}
268+
error = t;
269+
}
270+
else
271+
{
272+
error.addSuppressed( t );
199273
}
200274
}
201275
}
202-
203-
return members;
204276
}
205277

206278
private static Driver createDriver( Set<ClusterMember> members, String password )
@@ -243,15 +315,48 @@ private static boolean isCoreMember( Session session )
243315
return role != ClusterMemberRole.READ_REPLICA;
244316
}
245317

246-
private static void assertDeadlineNotReached( long deadline ) throws ClusterUnavailableException
318+
private static void assertDeadlineNotReached( long deadline, Set<URI> expectedUris, Set<URI> actualUris,
319+
Throwable error ) throws ClusterUnavailableException
247320
{
248321
if ( System.currentTimeMillis() > deadline )
249322
{
250-
throw new ClusterUnavailableException(
251-
"Cluster did not become available in " + STARTUP_TIMEOUT_SECONDS + " seconds" );
323+
String baseMessage = "Cluster did not become available in " + STARTUP_TIMEOUT_SECONDS + " seconds.\n";
324+
String errorMessage = error == null ? "" : "There were errors checking cluster members.\n";
325+
String expectedUrisMessage = "Expected online URIs: " + expectedUris + "\n";
326+
String actualUrisMessage = "Actual last seen online URIs: " + actualUris + "\n";
327+
String message = baseMessage + errorMessage + expectedUrisMessage + actualUrisMessage;
328+
329+
ClusterUnavailableException clusterUnavailable = new ClusterUnavailableException( message );
330+
331+
if ( error != null )
332+
{
333+
clusterUnavailable.addSuppressed( error );
334+
}
335+
336+
throw clusterUnavailable;
252337
}
253338
}
254339

340+
private static Set<URI> extractBoltUris( Set<ClusterMember> members )
341+
{
342+
Set<URI> uris = new HashSet<>();
343+
for ( ClusterMember member : members )
344+
{
345+
uris.add( member.getBoltUri() );
346+
}
347+
return uris;
348+
}
349+
350+
private static Set<URI> extractBoltUris( List<Record> records )
351+
{
352+
Set<URI> uris = new HashSet<>();
353+
for ( Record record : records )
354+
{
355+
uris.add( extractBoltUri( record ) );
356+
}
357+
return uris;
358+
}
359+
255360
private static URI extractBoltUri( Record record )
256361
{
257362
List<Object> addresses = record.get( "addresses" ).asList();
@@ -307,4 +412,17 @@ private static ClusterMember randomOf( Set<ClusterMember> members )
307412
}
308413
throw new AssertionError();
309414
}
415+
416+
private static void sleep( int millis )
417+
{
418+
try
419+
{
420+
Thread.sleep( millis );
421+
}
422+
catch ( InterruptedException e )
423+
{
424+
Thread.currentThread().interrupt();
425+
throw new RuntimeException( e );
426+
}
427+
}
310428
}

driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterControl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,31 @@ static String startCluster( Path path )
6262
return executeCommand( "neoctrl-cluster", "start", path.toString() );
6363
}
6464

65+
static String startClusterMember( Path path )
66+
{
67+
return executeCommand( "neoctrl-start", path.toString() );
68+
}
69+
6570
static void stopCluster( Path path )
6671
{
6772
executeCommand( "neoctrl-cluster", "stop", path.toString() );
6873
}
6974

75+
static void stopClusterMember( Path path )
76+
{
77+
executeCommand( "neoctrl-stop", path.toString() );
78+
}
79+
7080
static void killCluster( Path path )
7181
{
7282
executeCommand( "neoctrl-cluster", "stop", "--kill", path.toString() );
7383
}
7484

85+
static void killClusterMember( Path path )
86+
{
87+
executeCommand( "neoctrl-stop", "--kill", path.toString() );
88+
}
89+
7590
private static String executeCommand( String... command )
7691
{
7792
try

driver/src/test/java/org/neo4j/driver/v1/util/cc/ClusterRule.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class ClusterRule extends ExternalResource
4040
private static final int INITIAL_PORT = 20_000;
4141

4242
// todo: neo4j version should come from environment variables
43-
private static final String NEO4J_VERSION = "3.1.0-RC1";
43+
private static final String NEO4J_VERSION = "3.1.0";
4444
// todo: should be possible to configure (dynamically add/remove) cores and read replicas
4545
private static final int CORE_COUNT = 3;
4646
private static final int READ_REPLICA_COUNT = 2;
@@ -96,7 +96,9 @@ protected void before() throws Throwable
9696
@Override
9797
protected void after()
9898
{
99-
getCluster().deleteData();
99+
Cluster cluster = getCluster();
100+
cluster.startOfflineMembers();
101+
cluster.deleteData();
100102
}
101103

102104
private static void addShutdownHookToStopCluster()

0 commit comments

Comments
 (0)