diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index fbcf18e28..1370e3dfa 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -289,18 +289,19 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) { return true; } WriteAheadLog.AppendResult appendResult; - Lock lock = confirmOffsetCalculator.addLock(); - lock.lock(); try { try { StreamRecordBatch streamRecord = request.record; streamRecord.retain(); - appendResult = deltaWAL.append(streamRecord.encoded()); - lock.unlock(); + Lock lock = confirmOffsetCalculator.addLock(); + lock.lock(); + try { + appendResult = deltaWAL.append(streamRecord.encoded()); + } finally { + lock.unlock(); + } } catch (WriteAheadLog.OverCapacityException e) { // the WAL write data align with block, 'WAL is full but LogCacheBlock is not full' may happen. - // the read lock must release before write lock https://github.com/AutoMQ/automq-for-kafka/issues/581 - lock.unlock(); confirmOffsetCalculator.update(); forceUpload(LogCache.MATCH_ALL_STREAMS); if (!fromBackoff) { @@ -315,7 +316,6 @@ public boolean append0(WalWriteRequest request, boolean fromBackoff) { request.offset = appendResult.recordOffset(); confirmOffsetCalculator.add(request); } catch (Throwable e) { - lock.unlock(); LOGGER.error("[UNEXPECTED] append WAL fail", e); request.cf.completeExceptionally(e); return false;