Skip to content

Commit

Permalink
Fixed issue where entries could not be read due to mismatch between t…
Browse files Browse the repository at this point in the history
…he ensemble on metadata and the actual written bookies.
  • Loading branch information
shustsud committed Jan 30, 2024
1 parent 1eceb5d commit 037b320
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2010,6 +2010,21 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, final Set<Integer>
pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
}
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback to cover this case.
sendAddSuccessCallbacks();
}

void registerOperationFailureOnBookie(BookieId bookie, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,7 @@ synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int b
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
if (!lh.distributionSchedule.hasEntry(entryId, bookieIndex)) {
lh.sendAddSuccessCallbacks();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ public void testConstructionZkDelay() throws Exception {
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(20000);

CountDownLatch l = new CountDownLatch(1);
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
l.await();
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS);

BookKeeper bkc = new BookKeeper(conf);
bkc.createLedger(digestType, "testPasswd".getBytes()).close();
Expand All @@ -109,9 +107,7 @@ public void testConstructionNotConnectedExplicitZk() throws Exception {
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(20000);

CountDownLatch l = new CountDownLatch(1);
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
l.await();
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS);

ZooKeeper zk = new ZooKeeper(
zkUtil.getZooKeeperConnectString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -1539,7 +1541,142 @@ public void testLedgerMetadataTest() throws Exception {
lh.close();
}

@Test
public void testToDelayEnsembleReplacementAndRewriteEntry() throws Exception {
lh = bkc.createLedger(4, 2, digestType, ledgerPassword);

// Put Bookie0 to sleep.
List<BookieId> currentEnsemble = lh.getLedgerMetadata()
.getAllEnsembles().entrySet().iterator().next().getValue();
CountDownLatch bookie0Latch = new CountDownLatch(1);
sleepBookie(currentEnsemble.get(0), bookie0Latch);

// Write entry0,1,2,3 to Bookie.
int sendCount = 7;
CountDownLatch addCompleteLatch = new CountDownLatch(sendCount);
for (int count = 0; count < 4; count++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(count);
entry.position(0);
entries1.add(entry.array());

lh.asyncAddEntry(entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
CountDownLatch addCompleteLatch = (CountDownLatch) ctx;
addCompleteLatch.countDown();
}
}, addCompleteLatch);
}

// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0.
Field fieldPendingAddOps = lh.getClass().getDeclaredField("pendingAddOps");
fieldPendingAddOps.setAccessible(true);
int completedCount;
do {
Thread.sleep(100);

completedCount = 0;
for (PendingAddOp pendingAddOp : (Queue<PendingAddOp>) fieldPendingAddOps.get(lh)) {
if (pendingAddOp.completed) {
completedCount++;
}
}
} while (completedCount != 2);

// Kill Bookie2,3 and start a new Bookie.
killBookie(currentEnsemble.get(2));
killBookie(currentEnsemble.get(3));
startNewBookie();

// Put ZK cluster to sleep to delay ensemble replacement.
CountDownLatch zkLatch = new CountDownLatch(1);
sleepZKCluster(zkLatch);

// Write entry4,5,6 to Bookie.
for (int count = 4; count < sendCount; count++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(count);
entry.position(0);
entries1.add(entry.array());

lh.asyncAddEntry(entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
CountDownLatch addCompleteLatch = (CountDownLatch) ctx;
addCompleteLatch.countDown();
}
}, addCompleteLatch);
}

// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0.
// entry: 4(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2.
// entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3.
Field fieldChangingEnsemble = lh.getClass().getDeclaredField("changingEnsemble");
fieldChangingEnsemble.setAccessible(true);
boolean changingEnsemble;
do {
Thread.sleep(100);

changingEnsemble = (boolean) fieldChangingEnsemble.get(lh);
} while (!changingEnsemble);

// Bookie0 is wake up, write to Bookie0 is successful.
//
// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Writing to Bookie3,0 was successful, but its completion is pending.
// entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2.
// entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3.
bookie0Latch.countDown();
do {
Thread.sleep(100);

completedCount = 0;
for (PendingAddOp pendingAddOp : (Queue<PendingAddOp>) fieldPendingAddOps.get(lh)) {
if (pendingAddOp.completed) {
completedCount++;
}
}
} while (completedCount != 5);

// ZK cluster is wake up, then ensemble replacement is completed.
//
// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Entry write is completed.
// entry: 1(Bookie1, Bookie4) -> Write to Bookie4.
// entry: 2(Bookie4, Bookie5) -> Write to Bookie4,5.
// entry: 3(Bookie5, Bookie0) -> Write to Bookie5.
// entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 5(Bookie1, Bookie4) -> Write to Bookie4.
// entry: 6(Bookie4, Bookie5) -> Write to Bookie4,5.
zkLatch.countDown();

// Waiting for all Entry writes to complete.
addCompleteLatch.await();

readEntries(lh, entries1, sendCount);
lh.close();
}

private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
readEntries(lh, entries, numEntriesToWrite);
}

private void readEntries(LedgerHandle lh, List<byte[]> entries, int numEntriesToWrite)
throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
while (ls.hasMoreElements()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ protected void stopZKCluster() throws Exception {
zkUtil.killCluster();
}

protected void sleepZKCluster(final CountDownLatch l)
throws InterruptedException, IOException {
zkUtil.sleepCluster(l);
}

/**
* Start cluster. Also, starts the auto recovery process for each bookie, if
* isAutoRecoveryEnabled is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public interface ZooKeeperCluster {

void killCluster() throws Exception;

void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
void sleepCluster(int time, TimeUnit timeUnit)
throws InterruptedException, IOException;

void sleepCluster(CountDownLatch l)
throws InterruptedException, IOException;

default void expireSession(ZooKeeper zk) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ public void killCluster() throws Exception {
}

@Override
public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException {
public void sleepCluster(int time, TimeUnit timeUnit) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}

@Override
public void sleepCluster(CountDownLatch l) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,19 @@ public void restartCluster() throws Exception {

@Override
public void sleepCluster(final int time,
final TimeUnit timeUnit,
final CountDownLatch l)
final TimeUnit timeUnit)
throws InterruptedException, IOException {
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
final CountDownLatch suspendLatch = new CountDownLatch(1);
Thread sleeper = new Thread() {
@SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
l.countDown();
suspendLatch.countDown();
timeUnit.sleep(time);
t.resume();
} catch (Exception e) {
Expand All @@ -161,6 +161,36 @@ public void run() {
}
};
sleeper.start();
suspendLatch.await();
return;
}
}
throw new IOException("ZooKeeper thread not found");
}

@Override
public void sleepCluster(final CountDownLatch l)
throws InterruptedException, IOException {
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
final CountDownLatch suspendLatch = new CountDownLatch(1);
Thread sleeper = new Thread() {
@SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
suspendLatch.countDown();
l.await();
t.resume();
} catch (Exception e) {
LOG.error("Error suspending thread", e);
}
}
};
sleeper.start();
suspendLatch.await();
return;
}
}
Expand Down

0 comments on commit 037b320

Please sign in to comment.