Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1652,6 +1652,8 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c

factory.close(this);
STATE_UPDATER.set(this, State.Closed);
executor.execute(() ->
clearNotInitiatedPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger is closed")));
cancelScheduledTasks();

LedgerHandle lh = currentLedger;
Expand Down Expand Up @@ -1751,20 +1753,32 @@ public void operationComplete(Void v, Stat stat) {
log.debug("[{}] Updating of ledgers list after create complete. version={}", name, stat);
}
ledgersStat = stat;
synchronized (ManagedLedgerImpl.this) {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
// make sure that pendingAddEntries' operations are executed in the same thread
// to avoid potential concurrent issues
State state = STATE_UPDATER.get(ManagedLedgerImpl.this);
if (state == State.Closed || state.isFenced()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The metadataMutex is acquired in updateLedgersListAfterRollover() and must always be released. In operationComplete(), the "Closed/Fenced" branch skips the ledger update but fails to call metadataMutex.unlock(), causing a lock leak that may deadlock subsequent metadata operations. Ensure unlock() is invoked in all branches, for example, by using a finally block.

if (log.isDebugEnabled()) {
log.debug("[{}] Skipping ledger update after create complete because ledger is "
+ "closed or fenced", name);
}
} else {
executor.execute(() -> {
synchronized (ManagedLedgerImpl.this) {
LedgerHandle originalCurrentLedger = currentLedger;
ledgers.put(lh.getId(), newLedger);
currentLedger = lh;
currentLedgerTimeoutTriggered = new AtomicBoolean();
currentLedgerEntries = 0;
currentLedgerSize = 0;
updateLedgersIdsComplete(originalCurrentLedger);
mbean.addLedgerSwitchLatencySample(System.currentTimeMillis()
- lastLedgerCreationInitiationTimestamp, TimeUnit.MILLISECONDS);
// May need to update the cursor position
maybeUpdateCursorBeforeTrimmingConsumedLedger();
}
metadataMutex.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-check STATE_UPDATER inside the executor task and bail out if Closed/Fenced (while still releasing metadataMutex).

});
}
metadataMutex.unlock();
}

@Override
Expand Down Expand Up @@ -1937,6 +1951,9 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) {
// The managed ledger was closed during the write operation
clearPendingAddEntries(new ManagedLedgerAlreadyClosedException("Managed ledger was already closed"));
return;
} else if (state.isFenced()) {
clearPendingAddEntries(new ManagedLedgerFencedException("Managed ledger is fenced"));
return;
} else {
// In case we get multiple write errors for different outstanding write request, we should close the ledger
// just once
Expand Down Expand Up @@ -2070,6 +2087,17 @@ void clearPendingAddEntries(ManagedLedgerException e) {
}
}

void clearNotInitiatedPendingAddEntries(ManagedLedgerException e) {
Iterator<OpAddEntry> iterator = pendingAddEntries.iterator();
while (iterator.hasNext()) {
OpAddEntry op = iterator.next();
if (op.closeIfNotInitiated()) {
op.failed(e);
iterator.remove();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, perhaps iterator.remove() should be called first to remove the op from the queue, and then op.failed(e).

}
}
}

void asyncReadEntries(OpReadEntry opReadEntry) {
final State state = STATE_UPDATER.get(this);
if (state.isFenced() || state == State.Closed) {
Expand Down Expand Up @@ -4328,7 +4356,8 @@ public boolean isCursorActive(ManagedCursor cursor) {
return activeCursors.get(cursor.getName()) != null;
}

private boolean currentLedgerIsFull() {
@VisibleForTesting
protected boolean currentLedgerIsFull() {
if (!factory.isMetadataServiceAvailable()) {
// We don't want to trigger metadata operations if we already know that we're currently disconnected
return false;
Expand Down Expand Up @@ -4418,11 +4447,17 @@ private void checkManagedLedgerIsOpen() throws ManagedLedgerException {
public synchronized void setFenced() {
log.info("{} Moving to Fenced state", name);
STATE_UPDATER.set(this, State.Fenced);
executor.execute(() -> clearNotInitiatedPendingAddEntries(new ManagedLedgerFencedException("ManagedLedger "
+ name
+ " is fenced")));
}

synchronized void setFencedForDeletion() {
log.info("{} Moving to FencedForDeletion state", name);
STATE_UPDATER.set(this, State.FencedForDeletion);
executor.execute(() -> clearNotInitiatedPendingAddEntries(new ManagedLedgerFencedException("ManagedLedger "
+ name
+ " is fenced for deletion")));
}

MetaStore getStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,10 @@ protected OpAddEntry newObject(Recycler.Handle<OpAddEntry> recyclerHandle) {
}
};

public boolean closeIfNotInitiated() {
return STATE_UPDATER.compareAndSet(this, State.OPEN, State.CLOSED);
}

public void recycle() {
ml = null;
ledger = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,93 @@ public void invalidReadEntriesArg2() throws Exception {
fail("Should have thrown an exception in the above line");
}

@Test(timeOut = 30000)
public void testCloseManagedLedgerAfterRollover() throws Exception {
ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
config.setMaxCacheSize(0);
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, config);
ManagedLedgerImpl realLedger = (ManagedLedgerImpl) factory.open("my_test_ledger");
ManagedLedgerImpl ledger = Mockito.spy(realLedger);
AtomicBoolean onlyOnce = new AtomicBoolean(false);
when(ledger.currentLedgerIsFull()).thenAnswer(invocation -> onlyOnce.compareAndSet(false, true));
OpAddEntry realOp = OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128), null, null, new AtomicBoolean());
OpAddEntry op = spy(realOp);
CountDownLatch createLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
// Simulate that before the rollover is completed, new write requests arrive,
// and after these write requests are added to pendingAddEntries, the ledger is closed.
log.info("before add, ledger state:{}", ledger.state);
for (int i = 0; i < 10; ++i) {
ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128), null, null, new AtomicBoolean()));
}
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
log.info("closeComplete finished, ledger state:{}", ledger.state);
closeLatch.countDown();
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.info("closeFailed, ex:{}, state:{}", exception.getMessage(), ledger.state);
closeLatch.countDown();
}
}, null);
log.info("after add, ledger state:{}", ledger.state);
return invocationOnMock.callRealMethod();
}).when(ledger).asyncCreateLedger(any(), any(), any(), any(), any());
doAnswer(invocationOnMock -> {
Object o = invocationOnMock.callRealMethod();
log.info("createComplete finished, state:{}", ledger.state);
ledger.executor.execute(createLatch::countDown);
return o;
}).when(ledger).createComplete(anyInt(), any(), any());
ledger.internalAsyncAddEntry(op);
createLatch.await();
closeLatch.await();
Assert.assertEquals(ledger.pendingAddEntries.size(), 0);
}

@Test(timeOut = 20000)
public void testFencedManagedLedgerAfterAdd() throws Exception {
@Cleanup("shutdown")
ManagedLedgerFactory factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerImpl realLedger = (ManagedLedgerImpl) factory1.open("my_test_ledger");
ManagedLedgerImpl ledger = spy(realLedger);

int sendNum = 10;
CountDownLatch sendLatch = new CountDownLatch(sendNum);
CountDownLatch fencedLatch = new CountDownLatch(1);
doAnswer(invocationOnMock -> {
for (int i = 0; i < sendNum; ++i) {
stopBookKeeper();
stopMetadataStore();
ledger.internalAsyncAddEntry(OpAddEntry.createNoRetainBuffer(ledger,
ByteBufAllocator.DEFAULT.buffer(128), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
sendLatch.countDown();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
sendLatch.countDown();
}
}, null, new AtomicBoolean()));
}
Object o = invocationOnMock.callRealMethod();
fencedLatch.countDown();
return o;
}).when(ledger).setFenced();
ledger.setFenced();
fencedLatch.await();
sendLatch.await();
assertEquals(ledger.pendingAddEntries.size(), 0);
}

@Test(timeOut = 20000)
public void deleteAndReopen() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
Expand Down