From e1d72cf4ece2ac15e88cd025aa347125207ed8cc Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Mon, 5 Feb 2024 09:42:39 +0800 Subject: [PATCH 1/4] [BP-62] Refactor read op, and introduce batchReadOp. (#4190) ### Motivation This is the fourth PR for the batch read(https://github.com/apache/bookkeeper/pull/4051) feature. Refactor read op, extract ReadOpBase. Introduce batchedReadOp. --- .../bookkeeper/client/BatchedReadOp.java | 321 +++++++++++ .../client/ListenerBasedPendingReadOp.java | 2 +- .../bookkeeper/client/PendingReadOp.java | 526 +++++------------- .../apache/bookkeeper/client/ReadOpBase.java | 293 ++++++++++ .../bookkeeper/client/TestParallelRead.java | 17 +- 5 files changed, 760 insertions(+), 399 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java new file mode 100644 index 00000000000..4892882e1d1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -0,0 +1,321 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback; +import org.apache.bookkeeper.proto.checksum.DigestManager; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallback { + + private static final Logger LOG = LoggerFactory.getLogger(BatchedReadOp.class); + + final int maxCount; + final long maxSize; + + BatchedLedgerEntryRequest request; + + BatchedReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + int maxCount, + long maxSize, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, -1L, isRecoveryRead); + this.maxCount = maxCount; + this.maxSize = maxSize; + } + + @Override + void initiate() { + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); + request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); + request.read(); + if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), request); + } + } + + @Override + protected void submitCallback(int code) { + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } + + cancelSpeculativeTask(true); + + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), startEntryId, code); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + + request.close(); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(request.entries)); + } + } + + @Override + public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + bufList.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + bufList.release(); + } + } + + void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, + maxCount, maxSize, this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, maxCount, maxSize, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } + + abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest { + + //Indicate which ledger the BatchedLedgerEntryRequest is reading. + final long lId; + final int maxCount; + final long maxSize; + + final List entries; + + BatchedLedgerEntryRequest(List ensemble, long lId, long eId, int maxCount, long maxSize) { + super(ensemble, eId); + this.lId = lId; + this.maxCount = maxCount; + this.maxSize = maxSize; + this.entries = new ArrayList<>(maxCount); + } + + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + if (isComplete()) { + return false; + } + if (!complete.getAndSet(true)) { + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + ByteBuf content; + try { + content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + rc = BKException.Code.OK; + /* + * The length is a long and it is the last field of the metadata of an entry. + * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. + */ + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); + entryImpl.setEntryBuf(content); + entries.add(entryImpl); + } + writeSet.recycle(); + return true; + } else { + writeSet.recycle(); + return false; + } + } + + @Override + public String toString() { + return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + maxCount, maxSize); + } + } + + class SequenceReadRequest extends BatchedLedgerEntryRequest { + + static final int NOT_FOUND = -1; + int nextReplicaIndexToReadFrom = 0; + final BitSet sentReplicas; + final BitSet erroredReplicas; + SequenceReadRequest(List ensemble, + long lId, + long eId, + int maxCount, + long maxSize) { + super(ensemble, lId, eId, maxCount, maxSize); + this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + } + + @Override + void read() { + sendNextRead(); + } + + private synchronized int getNextReplicaIndexToReadFrom() { + return nextReplicaIndexToReadFrom; + } + + private BitSet getSentToBitSet() { + BitSet b = new BitSet(ensemble.size()); + + for (int i = 0; i < sentReplicas.length(); i++) { + if (sentReplicas.get(i)) { + b.set(writeSet.get(i)); + } + } + return b; + } + + private boolean readsOutstanding() { + return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + } + + @Override + synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + return null; + } + + BitSet sentTo = getSentToBitSet(); + sentTo.and(heardFrom); + + // only send another read if we have had no successful response at all + // (even for other entries) from any of the other bookies we have sent the + // request to + if (sentTo.cardinality() == 0) { + clientCtx.getClientStats().getSpeculativeReadCounter().inc(); + return sendNextRead(); + } else { + return null; + } + } + + synchronized BookieId sendNextRead() { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + // we are done, the read has failed from all replicas, just fail the + // read + fail(firstError); + return null; + } + + // ToDo: pick replica with writable PCBC. ISSUE #1239 + // https://github.com/apache/bookkeeper/issues/1239 + int replica = nextReplicaIndexToReadFrom; + int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom); + nextReplicaIndexToReadFrom++; + + try { + BookieId to = ensemble.get(bookieIndex); + sendReadTo(bookieIndex, to, this); + sentToHosts.add(to); + sentReplicas.set(replica); + return to; + } catch (InterruptedException ie) { + LOG.error("Interrupted reading entry " + this, ie); + Thread.currentThread().interrupt(); + fail(BKException.Code.InterruptedException); + return null; + } + } + + @Override + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); + int replica = writeSet.indexOf(bookieIndex); + if (replica == NOT_FOUND) { + LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); + return; + } + erroredReplicas.set(replica); + if (isRecoveryRead && (numBookiesMissingEntry >= requiredBookiesMissingEntryForRecovery)) { + /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies report that they do not + * have the entry */ + fail(BKException.Code.NoSuchEntryException); + return; + } + + if (!readsOutstanding()) { + sendNextRead(); + } + } + + @Override + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + boolean completed = super.complete(bookieIndex, host, bufList); + if (completed) { + int numReplicasTried = getNextReplicaIndexToReadFrom(); + // Check if any speculative reads were issued and mark any slow bookies before + // the first successful speculative read as "slow" + for (int i = 0; i < numReplicasTried - 1; i++) { + int slowBookieIndex = writeSet.get(i); + BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + } + } + return completed; + } + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java index 6733b2e9ea9..fedb79696a9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java @@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp { @Override protected void submitCallback(int code) { - LedgerEntryRequest request; + SingleLedgerEntryRequest request; while (!seq.isEmpty() && (request = seq.getFirst()) != null) { if (!request.isComplete()) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 73715859c0d..15d48c64351 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -21,27 +21,16 @@ package org.apache.bookkeeper.client; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; import java.util.BitSet; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.client.BKException.BKDigestMatchException; -import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; @@ -54,85 +43,165 @@ * application as soon as it arrives rather than waiting for the whole thing. * */ -class PendingReadOp implements ReadEntryCallback, Runnable { +class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); - private ScheduledFuture speculativeTask = null; - protected final LinkedList seq; - private final CompletableFuture future; - private final Set heardFromHosts; - private final BitSet heardFromHostsBitSet; - private final Set sentToHosts = new HashSet(); - LedgerHandle lh; - final ClientContext clientCtx; + protected boolean parallelRead = false; + protected final LinkedList seq; - long numPendingEntries; - final long startEntryId; - final long endEntryId; - long requestTimeNanos; + PendingReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + long endEntryId, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead); + this.seq = new LinkedList<>(); + numPendingEntries = endEntryId - startEntryId + 1; + } + + PendingReadOp parallelRead(boolean enabled) { + this.parallelRead = enabled; + return this; + } + + void initiate() { + long nextEnsembleChange = startEntryId, i = startEntryId; + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = null; + do { + if (i == nextEnsembleChange) { + ensemble = getLedgerMetadata().getEnsembleAt(i); + nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); + } + SingleLedgerEntryRequest entry; + if (parallelRead) { + entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); + } else { + entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); + } + seq.add(entry); + i++; + } while (i <= endEntryId); + // read the entries. + for (LedgerEntryRequest entry : seq) { + entry.read(); + if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); + } + } + } - final int requiredBookiesMissingEntryForRecovery; - final boolean isRecoveryRead; + @Override + public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + buffer.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + buffer.release(); + } + + if (numPendingEntries < 0) { + LOG.error("Read too many values for ledger {} : [{}, {}].", + ledgerId, startEntryId, endEntryId); + } + + } + + protected void submitCallback(int code) { + if (BKException.Code.OK == code) { + numPendingEntries--; + if (numPendingEntries != 0) { + return; + } + } + + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } - boolean parallelRead = false; - final AtomicBoolean complete = new AtomicBoolean(false); - boolean allowFailFast = false; + cancelSpeculativeTask(true); - abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable { + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + long firstUnread = LedgerHandle.INVALID_ENTRY_ID; + Integer firstRc = null; + for (LedgerEntryRequest req : seq) { + if (!req.isComplete()) { + firstUnread = req.eId; + firstRc = req.rc; + break; + } + } + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), firstUnread, firstRc); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + seq.forEach(LedgerEntryRequest::close); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); + } + } - final AtomicBoolean complete = new AtomicBoolean(false); + void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } - int rc = BKException.Code.OK; - int firstError = BKException.Code.OK; - int numBookiesMissingEntry = 0; + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } - final List ensemble; - final DistributionSchedule.WriteSet writeSet; + abstract class SingleLedgerEntryRequest extends LedgerEntryRequest { final LedgerEntryImpl entryImpl; - final long eId; - LedgerEntryRequest(List ensemble, long lId, long eId) { + SingleLedgerEntryRequest(List ensemble, long lId, long eId) { + super(ensemble, eId); this.entryImpl = LedgerEntryImpl.create(lId, eId); - this.ensemble = ensemble; - this.eId = eId; - - if (clientCtx.getConf().enableReorderReadSequence) { - writeSet = clientCtx.getPlacementPolicy() - .reorderReadSequence( - ensemble, - lh.getBookiesHealthInfo(), - lh.getWriteSetForReadOperation(eId)); - } else { - writeSet = lh.getWriteSetForReadOperation(eId); - } } @Override public void close() { - // this request has succeeded before, can't recycle writeSet again - if (complete.compareAndSet(false, true)) { - rc = BKException.Code.UnexpectedConditionException; - writeSet.recycle(); - } + super.close(); entryImpl.close(); } - /** - * Execute the read request. - */ - abstract void read(); - /** * Complete the read request from host. * - * @param bookieIndex - * bookie index - * @param host - * host that respond the read - * @param buffer - * the data buffer + * @param bookieIndex bookie index + * @param host host that respond the read + * @param buffer the data buffer * @return return true if we managed to complete the entry; - * otherwise return false if the read entry is not complete or it is already completed before + * otherwise return false if the read entry is not complete or it is already completed before */ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { ByteBuf content; @@ -141,7 +210,7 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { } try { content = lh.macManager.verifyDigestAndReturnData(eId, buffer); - } catch (BKDigestMatchException e) { + } catch (BKException.BKDigestMatchException e) { clientCtx.getClientStats().getReadOpDmCounter().inc(); logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); return false; @@ -161,125 +230,9 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { return false; } } - - /** - * Fail the request with given result code rc. - * - * @param rc - * result code to fail the request. - * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. - */ - boolean fail(int rc) { - if (complete.compareAndSet(false, true)) { - this.rc = rc; - submitCallback(rc); - return true; - } else { - return false; - } - } - - /** - * Log error errMsg and reattempt read from host. - * - * @param bookieIndex - * bookie index - * @param host - * host that just respond - * @param errMsg - * error msg to log - * @param rc - * read result code - */ - synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { - if (BKException.Code.OK == firstError - || BKException.Code.NoSuchEntryException == firstError - || BKException.Code.NoSuchLedgerExistsException == firstError) { - firstError = rc; - } else if (BKException.Code.BookieHandleNotAvailableException == firstError - && BKException.Code.NoSuchEntryException != rc - && BKException.Code.NoSuchLedgerExistsException != rc) { - // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is - // returned we need to update firstError to indicate that it might be a valid read but just - // failed. - firstError = rc; - } - if (BKException.Code.NoSuchEntryException == rc - || BKException.Code.NoSuchLedgerExistsException == rc) { - ++numBookiesMissingEntry; - if (LOG.isDebugEnabled()) { - LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", - lh.ledgerId, eId, host); - } - } else { - if (LOG.isInfoEnabled()) { - LOG.info("{} while reading L{} E{} from bookie: {}", - errMsg, lh.ledgerId, eId, host); - } - } - - lh.recordReadErrorOnBookie(bookieIndex); - } - - /** - * Send to next replica speculatively, if required and possible. - * This returns the host we may have sent to for unit testing. - * - * @param heardFromHostsBitSet - * the set of hosts that we already received responses. - * @return host we sent to if we sent. null otherwise. - */ - abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); - - /** - * Whether the read request completed. - * - * @return true if the read request is completed. - */ - boolean isComplete() { - return complete.get(); - } - - /** - * Get result code of this entry. - * - * @return result code. - */ - int getRc() { - return rc; - } - - @Override - public String toString() { - return String.format("L%d-E%d", lh.getId(), eId); - } - - /** - * Issues a speculative request and indicates if more speculative - * requests should be issued. - * - * @return whether more speculative requests should be issued - */ - @Override - public ListenableFuture issueSpeculativeRequest() { - return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { - @Override - public Boolean call() throws Exception { - if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send speculative read for {}. Hosts sent are {}, " - + " Hosts heard are {}, ensemble is {}.", - this, sentToHosts, heardFromHostsBitSet, ensemble); - } - return true; - } - return false; - } - }); - } } - class ParallelReadRequest extends LedgerEntryRequest { + class ParallelReadRequest extends SingleLedgerEntryRequest { int numPendings; @@ -326,7 +279,7 @@ BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) { } } - class SequenceReadRequest extends LedgerEntryRequest { + class SequenceReadRequest extends SingleLedgerEntryRequest { static final int NOT_FOUND = -1; int nextReplicaIndexToReadFrom = 0; @@ -456,205 +409,4 @@ boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { return completed; } } - - PendingReadOp(LedgerHandle lh, - ClientContext clientCtx, - long startEntryId, - long endEntryId, - boolean isRecoveryRead) { - this.seq = new LinkedList<>(); - this.future = new CompletableFuture<>(); - this.lh = lh; - this.clientCtx = clientCtx; - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; - this.isRecoveryRead = isRecoveryRead; - - this.allowFailFast = false; - numPendingEntries = endEntryId - startEntryId + 1; - requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() - - getLedgerMetadata().getAckQuorumSize() + 1; - heardFromHosts = new HashSet<>(); - heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); - } - - CompletableFuture future() { - return future; - } - - protected LedgerMetadata getLedgerMetadata() { - return lh.getLedgerMetadata(); - } - - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - - PendingReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - - void allowFailFastOnUnwritableChannel() { - allowFailFast = true; - } - - public void submit() { - clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); - } - - void initiate() { - long nextEnsembleChange = startEntryId, i = startEntryId; - this.requestTimeNanos = MathUtils.nowInNano(); - List ensemble = null; - do { - if (i == nextEnsembleChange) { - ensemble = getLedgerMetadata().getEnsembleAt(i); - nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); - } - LedgerEntryRequest entry; - if (parallelRead) { - entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); - } else { - entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); - } - seq.add(entry); - i++; - } while (i <= endEntryId); - // read the entries. - for (LedgerEntryRequest entry : seq) { - entry.read(); - if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { - speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() - .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); - } - } - } - - @Override - public void run() { - initiate(); - } - - private static class ReadContext implements ReadEntryCallbackCtx { - final int bookieIndex; - final BookieId to; - final LedgerEntryRequest entry; - long lac = LedgerHandle.INVALID_ENTRY_ID; - - ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - this.bookieIndex = bookieIndex; - this.to = to; - this.entry = entry; - } - - @Override - public void setLastAddConfirmed(long lac) { - this.lac = lac; - } - - @Override - public long getLastAddConfirmed() { - return lac; - } - } - - private static ReadContext createReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - return new ReadContext(bookieIndex, to, entry); - } - - void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException { - if (lh.throttler != null) { - lh.throttler.acquire(); - } - - if (isRecoveryRead) { - int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); - } else { - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); - } - } - - @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { - final ReadContext rctx = (ReadContext) ctx; - final LedgerEntryRequest entry = rctx.entry; - - if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); - return; - } - - heardFromHosts.add(rctx.to); - heardFromHostsBitSet.set(rctx.bookieIndex, true); - - buffer.retain(); - // if entry has completed don't handle twice - if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { - if (!isRecoveryRead) { - // do not advance LastAddConfirmed for recovery reads - lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); - } - submitCallback(BKException.Code.OK); - } else { - buffer.release(); - } - - if (numPendingEntries < 0) { - LOG.error("Read too many values for ledger {} : [{}, {}].", - ledgerId, startEntryId, endEntryId); - } - } - - protected void submitCallback(int code) { - if (BKException.Code.OK == code) { - numPendingEntries--; - if (numPendingEntries != 0) { - return; - } - } - - // ensure callback once - if (!complete.compareAndSet(false, true)) { - return; - } - - cancelSpeculativeTask(true); - - long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); - if (code != BKException.Code.OK) { - long firstUnread = LedgerHandle.INVALID_ENTRY_ID; - Integer firstRc = null; - for (LedgerEntryRequest req : seq) { - if (!req.isComplete()) { - firstUnread = req.eId; - firstRc = req.rc; - break; - } - } - LOG.error( - "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " - + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", - lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, - BKException.getMessage(code), firstUnread, firstRc); - clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); - // release the entries - seq.forEach(LedgerEntryRequest::close); - future.completeExceptionally(BKException.create(code)); - } else { - clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); - future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); - } - } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java new file mode 100644 index 00000000000..cbd68ec657a --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ReadOpBase implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + + protected ScheduledFuture speculativeTask = null; + protected final CompletableFuture future; + protected final Set heardFromHosts; + protected final BitSet heardFromHostsBitSet; + protected final Set sentToHosts = new HashSet(); + LedgerHandle lh; + protected ClientContext clientCtx; + + protected final long startEntryId; + protected long requestTimeNanos; + + protected final int requiredBookiesMissingEntryForRecovery; + protected final boolean isRecoveryRead; + + protected final AtomicBoolean complete = new AtomicBoolean(false); + protected boolean allowFailFast = false; + long numPendingEntries; + final long endEntryId; + protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, + boolean isRecoveryRead) { + this.lh = lh; + this.future = new CompletableFuture<>(); + this.startEntryId = startEntryId; + this.endEntryId = endEntryId; + this.isRecoveryRead = isRecoveryRead; + this.requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() + - getLedgerMetadata().getAckQuorumSize() + 1; + this.heardFromHosts = new HashSet<>(); + this.heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); + this.allowFailFast = false; + this.clientCtx = clientCtx; + } + + protected LedgerMetadata getLedgerMetadata() { + return lh.getLedgerMetadata(); + } + + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + + CompletableFuture future() { + return future; + } + + void allowFailFastOnUnwritableChannel() { + allowFailFast = true; + } + + public void submit() { + clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); + } + + @Override + public void run() { + initiate(); + } + + abstract void initiate(); + + protected abstract void submitCallback(int code); + + abstract class LedgerEntryRequest implements SpeculativeRequestExecutor { + + final AtomicBoolean complete = new AtomicBoolean(false); + + int rc = BKException.Code.OK; + int firstError = BKException.Code.OK; + int numBookiesMissingEntry = 0; + + final long eId; + + final List ensemble; + final DistributionSchedule.WriteSet writeSet; + + + LedgerEntryRequest(List ensemble, final long eId) { + this.ensemble = ensemble; + this.eId = eId; + if (clientCtx.getConf().enableReorderReadSequence) { + writeSet = clientCtx.getPlacementPolicy() + .reorderReadSequence( + ensemble, + lh.getBookiesHealthInfo(), + lh.getWriteSetForReadOperation(eId)); + } else { + writeSet = lh.getWriteSetForReadOperation(eId); + } + } + + public void close() { + // this request has succeeded before, can't recycle writeSet again + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.UnexpectedConditionException; + writeSet.recycle(); + } + } + + /** + * Execute the read request. + */ + abstract void read(); + + /** + * Fail the request with given result code rc. + * + * @param rc + * result code to fail the request. + * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. + */ + boolean fail(int rc) { + if (complete.compareAndSet(false, true)) { + this.rc = rc; + writeSet.recycle(); + submitCallback(rc); + return true; + } else { + return false; + } + } + + /** + * Log error errMsg and reattempt read from host. + * + * @param bookieIndex + * bookie index + * @param host + * host that just respond + * @param errMsg + * error msg to log + * @param rc + * read result code + */ + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (BKException.Code.OK == firstError + || BKException.Code.NoSuchEntryException == firstError + || BKException.Code.NoSuchLedgerExistsException == firstError) { + firstError = rc; + } else if (BKException.Code.BookieHandleNotAvailableException == firstError + && BKException.Code.NoSuchEntryException != rc + && BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is + // returned we need to update firstError to indicate that it might be a valid read but just + // failed. + firstError = rc; + } + if (BKException.Code.NoSuchEntryException == rc + || BKException.Code.NoSuchLedgerExistsException == rc) { + ++numBookiesMissingEntry; + if (LOG.isDebugEnabled()) { + LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", + lh.ledgerId, eId, host); + } + } else { + if (LOG.isInfoEnabled()) { + LOG.info("{} while reading L{} E{} from bookie: {}", + errMsg, lh.ledgerId, eId, host); + } + } + + lh.recordReadErrorOnBookie(bookieIndex); + } + + /** + * Send to next replica speculatively, if required and possible. + * This returns the host we may have sent to for unit testing. + * + * @param heardFromHostsBitSet + * the set of hosts that we already received responses. + * @return host we sent to if we sent. null otherwise. + */ + abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); + + /** + * Whether the read request completed. + * + * @return true if the read request is completed. + */ + boolean isComplete() { + return complete.get(); + } + + /** + * Get result code of this entry. + * + * @return result code. + */ + int getRc() { + return rc; + } + + @Override + public String toString() { + return String.format("L%d-E%d", lh.getId(), eId); + } + + /** + * Issues a speculative request and indicates if more speculative + * requests should be issued. + * + * @return whether more speculative requests should be issued + */ + @Override + public ListenableFuture issueSpeculativeRequest() { + return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { + @Override + public Boolean call() throws Exception { + if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send speculative read for {}. Hosts sent are {}, " + + " Hosts heard are {}, ensemble is {}.", + this, sentToHosts, heardFromHostsBitSet, ensemble); + } + return true; + } + return false; + } + }); + } + } + + protected static class ReadContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx { + final int bookieIndex; + final BookieId to; + final PendingReadOp.LedgerEntryRequest entry; + long lac = LedgerHandle.INVALID_ENTRY_ID; + + ReadContext(int bookieIndex, BookieId to, PendingReadOp.LedgerEntryRequest entry) { + this.bookieIndex = bookieIndex; + this.to = to; + this.entry = entry; + } + + @Override + public void setLastAddConfirmed(long lac) { + this.lac = lac; + } + + @Override + public long getLastAddConfirmed() { + return lac; + } + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index f2ada1e5dc8..423e02b4aad 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -33,7 +33,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; import java.util.TreeMap; @@ -268,8 +267,8 @@ public void testLedgerEntryRequestComplete() throws Exception { PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); pendingReadOp.parallelRead(true); pendingReadOp.initiate(); - PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0); - PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1); + PendingReadOp.SingleLedgerEntryRequest first = pendingReadOp.seq.get(0); + PendingReadOp.SingleLedgerEntryRequest second = pendingReadOp.seq.get(1); pendingReadOp.submitCallback(-105); @@ -287,13 +286,9 @@ public void testLedgerEntryRequestComplete() throws Exception { assertTrue(second.complete.get()); // Mock ledgerEntryImpl reuse - Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", - int.class, BookieId.class, PendingReadOp.LedgerEntryRequest.class); - method.setAccessible(true); - ByteBuf byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // byteBuf has been release assertEquals(byteBuf.refCnt(), 1); @@ -308,15 +303,15 @@ public void testLedgerEntryRequestComplete() throws Exception { // read entry failed twice, will not close twice pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // will not complete twice when completed byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); assertEquals(1, byteBuf.refCnt()); } From 22097343ceed1312964ad077cdcd9fbd7e3e026b Mon Sep 17 00:00:00 2001 From: Yan Zhao Date: Mon, 5 Feb 2024 16:58:38 +0800 Subject: [PATCH 2/4] [BP-62] LedgerHandle introduces batch read API. (#4195) ### Motivation This is the fifth PR for the batch read(https://github.com/apache/bookkeeper/pull/4051) feature. LedgerHandle introduces batch read API. This PR is based on #4190, please merge it firstly. --- .../bookkeeper/client/ClientInternalConf.java | 6 +- .../bookkeeper/client/LedgerHandle.java | 279 ++++++++++++ .../bookkeeper/client/api/ReadHandle.java | 32 ++ .../bookkeeper/conf/ClientConfiguration.java | 7 + .../proto/BatchedReadEntryProcessor.java | 7 + .../bookkeeper/client/TestBatchedRead.java | 292 +++++++++++++ .../client/TestSpeculativeBatchRead.java | 401 ++++++++++++++++++ .../bookkeeper/test/BookieClientTest.java | 2 +- 8 files changed, 1023 insertions(+), 3 deletions(-) create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java index cff66a3cb3e..fc83617cef6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java @@ -48,6 +48,8 @@ class ClientInternalConf { final boolean enableBookieFailureTracking; final boolean useV2WireProtocol; final boolean enforceMinNumFaultDomainsForWrite; + final boolean batchReadEnabled; + final int nettyMaxFrameSizeBytes; static ClientInternalConf defaultValues() { return fromConfig(new ClientConfiguration()); @@ -72,9 +74,9 @@ private ClientInternalConf(ClientConfiguration conf, this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout()); this.throttleValue = conf.getThrottleValue(); this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec(); - + this.batchReadEnabled = conf.isBatchReadEnabled(); + this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes(); this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); - this.delayEnsembleChange = conf.getDelayEnsembleChange(); this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges(); this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 9486b2e632c..6a98af55032 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle { final long ledgerId; final ExecutorService executor; long lastAddPushed; + boolean notSupportBatch; private enum HandleState { OPEN, @@ -641,6 +642,26 @@ public Enumeration readEntries(long firstEntry, long lastEntry) return SyncCallbackUtils.waitForResult(result); } + /** + * Read a sequence of entries synchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object) + */ + public Enumeration batchReadEntries(long startEntry, int maxCount, long maxSize) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadEntries(startEntry, maxCount, maxSize, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } + /** * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
* This is the same of @@ -664,6 +685,27 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las return SyncCallbackUtils.waitForResult(result); } + /** + * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
+ * This is the same of + * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) } + * + * @param firstEntry + * id of first entry of sequence (included) + * @param maxCount + * id of last entry of sequence (included) + * @param maxSize + * the total entries size + */ + public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } + /** * Read a sequence of entries asynchronously. * @@ -695,6 +737,50 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + /** + * Read a sequence of entries in asynchronously. + * It send an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, Object ctx) { + // Little sanity check + if (startEntry > lastAddConfirmed) { + LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + cb.readComplete(BKException.Code.ReadException, this, null, ctx); + return; + } + if (notSupportBatchRead()) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of @@ -734,6 +820,48 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + /** + * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); + } + if (notSupportBatchRead()) { + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } + /** * Read a sequence of entries asynchronously. * @@ -760,6 +888,123 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readEntriesInternalAsync(firstEntry, lastEntry, false); } + /** + * Read a sequence of entries in asynchronously. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + */ + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + return FutureUtils.exception(new BKIncorrectParameterException()); + } + if (startEntry > lastAddConfirmed) { + LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + return FutureUtils.exception(new BKReadException()); + } + if (notSupportBatchRead()) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + return readEntriesInternalAsync(startEntry, lastEntry, false); + } + CompletableFuture future = new CompletableFuture<>(); + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenComplete((entries, ex) -> { + if (ex != null) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (ex instanceof BKException.BKBookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> { + if (ex1 != null) { + future.completeExceptionally(ex1); + } else { + future.complete(entries1); + } + }); + } else { + future.completeExceptionally(ex); + } + } else { + future.complete(entries); + } + }); + return future; + } + + private boolean notSupportBatchRead() { + if (!clientCtx.getConf().batchReadEnabled) { + return true; + } + if (notSupportBatch) { + return true; + } + LedgerMetadata ledgerMetadata = getLedgerMetadata(); + return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize(); + } + + private CompletableFuture batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, + boolean isRecoveryRead) { + int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; + if (maxSize > nettyMaxFrameSizeBytes) { + LOG.info( + "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.", + nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + if (maxSize <= 0) { + LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + BatchedReadOp op = new BatchedReadOp(this, clientCtx, + startEntry, maxCount, maxSize, isRecoveryRead); + if (!clientCtx.isClientClosed()) { + // Waiting on the first one. + // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive + // state later after N requests sent. + // Unfortunately it seems that alternatives are: + // - send reads one-by-one (up to the app) + // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact) + // - block worker pool (not good) + // Even with this implementation one should be more concerned about OOME when all read responses arrive + // or about overloading bookies with these requests then about submission of many small requests. + // Naturally one of the solutions would be to submit smaller batches and in this case + // current implementation will prevent next batch from starting when bookie is + // unresponsive thus helpful enough. + if (clientCtx.getConf().waitForWriteSetMs >= 0) { + DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(startEntry); + try { + if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) { + op.allowFailFastOnUnwritableChannel(); + } + } finally { + ws.recycle(); + } + } + + if (isHandleWritable()) { + // Ledger handle in read/write mode: submit to OSE for ordered execution. + executeOrdered(op); + } else { + // Read-only ledger handle: bypass OSE and execute read directly in client thread. + // This avoids a context-switch to OSE thread and thus reduces latency. + op.run(); + } + } else { + op.future().completeExceptionally(BKException.create(ClientClosedException)); + } + return op.future(); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of @@ -829,6 +1074,40 @@ public void onFailure(Throwable cause) { } } + void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx, boolean isRecoveryRead) { + if (!clientCtx.isClientClosed()) { + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, isRecoveryRead) + .whenCompleteAsync(new FutureEventListener() { + @Override + public void onSuccess(LedgerEntries entries) { + cb.readComplete( + Code.OK, + LedgerHandle.this, + IteratorUtils.asEnumeration( + Iterators.transform(entries.iterator(), le -> { + LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le); + le.close(); + return entry; + })), + ctx); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof BKException) { + BKException bke = (BKException) cause; + cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx); + } else { + cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx); + } + } + }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); + } else { + cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx); + } + } + /* * Read the last entry in the ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index d5e906d17d8..e9bcddd0b39 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -45,6 +45,23 @@ public interface ReadHandle extends Handle { */ CompletableFuture readAsync(long firstEntry, long lastEntry); + /** + * Read a sequence of entries asynchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return an handle to the result of the operation + */ + default CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException()); + return future; + } + /** * Read a sequence of entries synchronously. * @@ -59,6 +76,21 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, BKException.HANDLER); } + /** + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return the result of the operation + */ + default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) + throws BKException, InterruptedException { + return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 66dc160fd5f..924dee4ada4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -202,6 +202,9 @@ public class ClientConfiguration extends AbstractConfiguration entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + Iterator iterator = readOp.future().get().iterator(); + + int numReads = 0; + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(numReads, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++numReads; + } + assertEquals(numEntries, numReads); + lh.close(); + } + + @Test + public void testReadWhenEnsembleNotEqualWQ() throws Exception { + int numEntries = 10; + long id = getLedgerToRead(5, 2, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + //read single entry + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, 1); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + lh.close(); + } + + private static void expectFail(CompletableFuture future, int expectedRc) { + try { + result(future); + fail("Expect to fail"); + } catch (Exception e) { + assertTrue(e instanceof BKException); + BKException bke = (BKException) e; + assertEquals(expectedRc, bke.getCode()); + } + } + + @Test + public void testReadMissingEntries() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 5, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + // read single entry + BatchedReadOp readOp = createReadOp(lh, 10, 1); + readOp.submit(); + expectFail(readOp.future(), Code.NoSuchEntryException); + + // read multiple entries + readOp = createReadOp(lh, 8, 3); + readOp.submit(); + + int index = 8; + int numReads = 0; + Iterator iterator = readOp.future().get().iterator(); + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(index, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++index; + ++numReads; + } + assertEquals(2, numReads); + lh.close(); + } + + @Test + public void testFailRecoveryReadMissingEntryImmediately() throws Exception { + int numEntries = 1; + + long id = getLedgerToRead(5, 5, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(10); + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + // sleep two bookie + sleepBookie(ensemble.get(0), latch1); + sleepBookie(ensemble.get(1), latch2); + + BatchedReadOp readOp = createRecoveryReadOp(lh, 10, 1); + readOp.submit(); + // would fail immediately if found missing entries don't cover ack quorum + expectFail(readOp.future(), Code.NoSuchEntryException); + latch1.countDown(); + latch2.countDown(); + + lh.close(); + newBk.close(); + } + + @Test + public void testReadWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + int numReads = 0; + for (int i = 0; i < numEntries;) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + if (!entries.hasNext()) { + i++; + continue; + } + while (entries.hasNext()) { + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + i++; + numReads++; + } + } + assertEquals(10, numReads); + lh.close(); + newBk.close(); + } + + @Test + public void testReadFailureWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + killBookie(ensemble.get(2)); + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + expectFail(readOp.future(), Code.BookieHandleNotAvailableException); + + lh.close(); + newBk.close(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java new file mode 100644 index 00000000000..3bf5e2d5e44 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -0,0 +1,401 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; +import static org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.BitSet; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This unit test tests ledger fencing. + * + */ +public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeBatchRead.class); + + private final DigestType digestType; + byte[] passwd = "specPW".getBytes(); + + public TestSpeculativeBatchRead() { + super(10); + this.digestType = DigestType.CRC32; + } + + long getLedgerToRead(int ensemble, int quorum) throws Exception { + byte[] data = "Data for test".getBytes(); + LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd); + for (int i = 0; i < 10; i++) { + l.addEntry(data); + } + l.close(); + + return l.getId(); + } + + @SuppressWarnings("deprecation") + BookKeeperTestClient createClient(int specTimeout) throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(specTimeout) + .setReadTimeout(30000) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + return new BookKeeperTestClient(conf, new TestStatsProvider()); + } + + class LatchCallback implements ReadCallback { + CountDownLatch l = new CountDownLatch(1); + boolean success = false; + long startMillis = System.currentTimeMillis(); + long endMillis = Long.MAX_VALUE; + + public void readComplete(int rc, + LedgerHandle lh, + Enumeration seq, + Object ctx) { + endMillis = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Got response {} {}", rc, getDuration()); + } + success = rc == BKException.Code.OK; + l.countDown(); + } + + long getDuration() { + return endMillis - startMillis; + } + + void expectSuccess(int milliseconds) throws Exception { + boolean await = l.await(milliseconds, TimeUnit.MILLISECONDS); + System.out.println(await); + } + + void expectFail(int milliseconds) throws Exception { + assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS)); + assertFalse(success); + } + + void expectTimeout(int milliseconds) throws Exception { + assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS)); + } + } + + /** + * Test basic speculative functionality. + * - Create 2 clients with read timeout disabled, one with spec + * read enabled, the other not. + * - create ledger + * - sleep second bookie in ensemble + * - read first entry, both should find on first bookie. + * - read second bookie, spec client should find on bookie three, + * non spec client should hang. + */ + @Test + public void testSpeculativeRead() throws Exception { + long id = getLedgerToRead(3, 2); + BookKeeperTestClient bknospec = createClient(0); // disabled + BookKeeperTestClient bkspec = createClient(2000); + + LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd); + LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd); + + // sleep second bookie + CountDownLatch sleepLatch = new CountDownLatch(1); + BookieId second = lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1); + sleepBookie(second, sleepLatch); + + try { + // read first entry, both go to first bookie, should be fine + LatchCallback nospeccb = new LatchCallback(); + LatchCallback speccb = new LatchCallback(); + lnospec.asyncBatchReadEntries(0, 1, 1024, nospeccb, null); + lspec.asyncBatchReadEntries(0, 1, 1024, speccb, null); + nospeccb.expectSuccess(2000); + speccb.expectSuccess(2000); + + // read second entry, both look for second book, spec read client + // tries third bookie, nonspec client hangs as read timeout is very long. + nospeccb = new LatchCallback(); + speccb = new LatchCallback(); + lnospec.asyncReadEntries(1, 1, nospeccb, null); + lspec.asyncReadEntries(1, 1, speccb, null); + speccb.expectSuccess(4000); + nospeccb.expectTimeout(4000); + // Check that the second bookie is registered as slow at entryId 1 + RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy(); + assertTrue(rep.slowBookies.asMap().size() == 1); + + assertTrue( + "Stats should not reflect speculative reads if disabled", + bknospec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() == 0); + assertTrue( + "Stats should reflect speculative reads", + bkspec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() > 0); + } finally { + sleepLatch.countDown(); + lspec.close(); + lnospec.close(); + bkspec.close(); + bknospec.close(); + } + } + + /** + * Test that if more than one replica is down, we can still read, as long as the quorum + * size is larger than the number of down replicas. + */ + @Test + public void testSpeculativeReadMultipleReplicasDown() throws Exception { + long id = getLedgerToRead(5, 5); + int timeout = 5000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookie 1, 2 & 4 + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(4), sleepLatch); + + try { + // read first entry, should complete faster than timeout + // as bookie 0 has the entry + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); + latch0.expectSuccess(timeout / 2); + + // second should have to hit two timeouts (bookie 1 & 2) + // bookie 3 has the entry + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); + latch1.expectTimeout(timeout); + latch1.expectSuccess(timeout * 2); + LOG.info("Timeout {} latch1 duration {}", timeout, latch1.getDuration()); + assertTrue("should have taken longer than two timeouts, but less than 3", + latch1.getDuration() >= timeout * 2 + && latch1.getDuration() < timeout * 3); + + // bookies 1 & 2 should be registered as slow bookies because of speculative reads + Set expectedSlowBookies = new HashSet<>(); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1)); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2)); + assertEquals(((RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(), + expectedSlowBookies); + + // third should not hit timeouts since bookies 1 & 2 are registered as slow + // bookie 3 has the entry + LatchCallback latch2 = new LatchCallback(); + l.asyncBatchReadEntries(2, 1, 1024, latch2, null); + latch2.expectSuccess(timeout); + + // fourth should have no timeout + // bookie 3 has the entry + LatchCallback latch3 = new LatchCallback(); + l.asyncBatchReadEntries(3, 1, 1024, latch3, null); + latch3.expectSuccess(timeout / 2); + + // fifth should hit one timeout, (bookie 4) + // bookie 0 has the entry + LatchCallback latch4 = new LatchCallback(); + l.asyncBatchReadEntries(4, 1, 1024, latch4, null); + latch4.expectTimeout(timeout / 2); + latch4.expectSuccess(timeout); + LOG.info("Timeout {} latch4 duration {}", timeout, latch4.getDuration()); + assertTrue("should have taken longer than one timeout, but less than 2", + latch4.getDuration() >= timeout + && latch4.getDuration() < timeout * 2); + } finally { + sleepLatch.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Test that if after a speculative read is kicked off, the original read completes + * nothing bad happens. + */ + @Test + public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { + long id = getLedgerToRead(2, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookies + CountDownLatch sleepLatch0 = new CountDownLatch(1); + CountDownLatch sleepLatch1 = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(0), sleepLatch0); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch1); + + try { + // read goes to first bookie, spec read timeout occurs, + // goes to second + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); + latch0.expectTimeout(timeout); + + // wake up first bookie + sleepLatch0.countDown(); + latch0.expectSuccess(timeout / 2); + + sleepLatch1.countDown(); + + // check we can read next entry without issue + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); + latch1.expectSuccess(timeout / 2); + } finally { + sleepLatch0.countDown(); + sleepLatch1.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Unit test to check if the scheduled speculative task gets cancelled + * on successful read. + */ + @Test + public void testSpeculativeReadScheduledTaskCancel() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + BatchedReadOp op = null; + try { + op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + op.initiate(); + op.future().get(); + } finally { + assertNull("Speculative Read tasks must be null", op.getSpeculativeTask()); + } + } + + /** + * Unit test for the speculative read scheduling method. + */ + @Test + public void testSpeculativeReadScheduling() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BitSet allHosts = new BitSet(ensemble.size()); + for (int i = 0; i < ensemble.size(); i++) { + allHosts.set(i, true); + } + BitSet noHost = new BitSet(ensemble.size()); + BitSet secondHostOnly = new BitSet(ensemble.size()); + secondHostOnly.set(1, true); + BatchedReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null; + try { + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + // if we've already heard from all hosts, + // we only send the initial read + req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); + assertTrue("Should have sent to first", + req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0))); + assertNull("Should not have sent another", + req0.maybeSendSpeculativeRead(allHosts)); + + // if we have heard from some hosts, but not one we have sent to + // send again + req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2, 1, 1024); + assertTrue("Should have sent to third", + req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2))); + assertTrue("Should have sent to first", + req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0))); + + // if we have heard from some hosts, which includes one we sent to + // do not read again + req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4, 1, 1024); + assertTrue("Should have sent to second", + req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1))); + assertNull("Should not have sent another", + req4.maybeSendSpeculativeRead(secondHostOnly)); + } finally { + for (BatchedReadOp.LedgerEntryRequest req + : new BatchedReadOp.LedgerEntryRequest[] { req0, req2, req4 }) { + if (req != null) { + int i = 0; + while (!req.isComplete()) { + if (i++ > 10) { + break; // wait for up to 10 seconds + } + Thread.sleep(1000); + } + assertTrue("Request should be done", req.isComplete()); + } + } + + l.close(); + bkspec.close(); + } + } + + @Test + public void testSequenceReadLocalEnsemble() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(1000) + .setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper bkspec = new BookKeeperTestClient(conf, new TestStatsProvider()); + LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd); + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + BatchedReadOp.LedgerEntryRequest req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); + assertNotNull(req0.writeSet); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 5b96c52a0be..60f89159a04 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -359,7 +359,7 @@ public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { } @Test - public void testBatchedRead() throws Exception { + public void testBatchRead() throws Exception { ClientConfiguration conf = new ClientConfiguration(); conf.setUseV2WireProtocol(true); BookieClient bc = new BookieClientImpl(conf, eventLoopGroup, From 9c373f7e7b5b62d57dc5d295039229c2461ae518 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 7 Feb 2024 10:33:00 -0800 Subject: [PATCH 3/4] Fix checksum calculation bug when the payload is a CompositeByteBuf with readerIndex > 0 (#4196) * Add a test that reproduces a bug in checksum calculation * Revert "Fixed unnecessary copy to heap (#2701)" changes to ByteBufList This partially reverts commit 3c9c7102538909fd3764ea7314e7618d6d9458fd. * Remove CompositeBuffer unwrapping in DigestManager * Rename update -> internalUpdate so that unwrapping logic could be added to update * Remove unnecessary unwrapping logic in Java9IntHash * Add safe way to handle CompositeByteBuf * Add license header * Fix checkstyle * Refactor ByteBuf visitor solution * Fix checkstyle * Reformat * Refactor recursive visiting * Revisit equals, hashCode and toString * Refactor test case * Add support for UnpooledHeapByteBuf.getBytes which passes an array * Add support for visiting buffers backed by byte[] arrays - getBytes calls setBytes with a byte[] argument for heap ByteBufs * Move ByteBufVisitor to org.apache.bookkeeper.util package * Update javadoc * Refactor to use stateless visitor so that instance can be shared * Improve test so that a single scenario can be used for debugging * Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x) - Java9IntHash uses private methods from java.util.zip.CRC32C class, updateBytes and updateDirectByteBuffer. When inspecting the use and interface contract, it doesn't match how it is used in Java9IntHash. This PR addresses that by introducing a separate initial value for initializing the accumulated value so that the initial value could match the logic in java.util.zip.CRC32C.reset method. There's also a separate method for finalizing the accumulated value into a final checksum value. This is to match the java.util.zip.CRC32C.getValue method's logic (uses bitwise complement operator ~). - With a quick glance, it might appear that the previous logic is similar. However it isn't since I have a failing test which gets fixed with this change. I haven't yet added the Java9IntHash level unit test case to prove how it differs. It must be related to integer value overflow. For the CRC32C function, I believe it means that it cannot be assumed in all cases that func(x) == ~func(~x). That's the assumption that the previous code was making. It probably applies for many inputs, but not all. It would break in overflow cases. * Fix checkstyle * Fix checkstyle * Fix missing depth increment that prevents StackOverflowException * Properly handle the depth increase and decrease * Remove unnecessary condition * Use more efficient way to read bytes to the target array * Don't use ByteBufVisitor if it's not necessary * Revert "Fix bug in Java9IntHash calculation that assumed crc32c_update(x) == ~crc32c_update(~x)" This reverts commit 272e962930a31cbc237c5e7c0bd0c93213520ba4. * Fix issue in resume byte[] version that was added - input and output should be complemented. explanation has been added to the resume ByteBuf method * Polish ByteBufVisitor - reuse GetBytesCallbackByteBuf instance for handling the root ByteBuf instance * Use extracted method * Fix bug with array handling * Polish ByteBufVisitor * Optimize the buffer copying in the case where array or memory address cannot be accessed - read-only buffers will need to be copied before reading - use ByteBuf.copy for direct buffers with pooled allocator when the algorithm can accept a memory address buffer - use the 64kB threadlocal byte[] buffer for copying all other inputs * Check if memory address is accepted * Improve comments about complement (current = ~current) in resume * Print thread dump when build is cancelled * Filter empty buffers and arrays in ByteBufVisitor --- .github/workflows/bk-ci.yml | 4 + .../proto/checksum/CRC32CDigestManager.java | 12 +- .../proto/checksum/CRC32DigestManager.java | 14 +- .../proto/checksum/DigestManager.java | 94 +- .../checksum/DirectMemoryCRC32Digest.java | 18 +- .../proto/checksum/DummyDigestManager.java | 12 +- .../proto/checksum/MacDigestManager.java | 13 +- .../proto/checksum/StandardCRC32Digest.java | 5 + .../apache/bookkeeper/util/ByteBufList.java | 36 +- .../bookkeeper/util/ByteBufVisitor.java | 1132 +++++++++++++++++ ...ompositeByteBufUnwrapBugReproduceTest.java | 280 ++++ .../bookkeeper/util/ByteBufListTest.java | 34 - .../circe/checksum/Crc32cIntChecksum.java | 24 +- .../scurrilous/circe/checksum/IntHash.java | 4 + .../circe/checksum/Java8IntHash.java | 10 + .../circe/checksum/Java9IntHash.java | 48 +- .../scurrilous/circe/checksum/JniIntHash.java | 10 + 17 files changed, 1618 insertions(+), 132 deletions(-) create mode 100644 bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java create mode 100644 bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java diff --git a/.github/workflows/bk-ci.yml b/.github/workflows/bk-ci.yml index e8d77821147..c9f9226dd80 100644 --- a/.github/workflows/bk-ci.yml +++ b/.github/workflows/bk-ci.yml @@ -200,6 +200,10 @@ jobs: path: surefire-reports retention-days: 7 + - name: print JVM thread dumps when cancelled + if: cancelled() + run: ./dev/ci-tool print_thread_dumps + integration-tests: name: Integration Tests runs-on: ubuntu-latest diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java index 1fb3d47b3e0..5343357198a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32CDigestManager.java @@ -46,7 +46,17 @@ void populateValueAndReset(int digest, ByteBuf buf) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len); } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len); + } + + @Override + boolean acceptsMemoryAddressBuffer() { + return Crc32cIntChecksum.acceptsMemoryAddressBuffer(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java index 21be2651a7a..0d18312cfc8 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/CRC32DigestManager.java @@ -34,6 +34,7 @@ interface CRC32Digest { long getValueAndReset(); void update(ByteBuf buf, int offset, int len); + void update(byte[] buffer, int offset, int len); } private static final FastThreadLocal crc = new FastThreadLocal() { @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { crc.get().update(data, offset, len); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + crc.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { // This is stored as 8 bytes return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return DirectMemoryCRC32Digest.isSupported(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index eab33945b1e..1e78e4075eb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -20,10 +20,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; @@ -34,6 +32,7 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +52,25 @@ public abstract class DigestManager { final long ledgerId; final boolean useV2Protocol; private final ByteBufAllocator allocator; + private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback; abstract int getMacCodeLength(); - abstract int update(int digest, ByteBuf buffer, int offset, int len); + abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len); + + abstract int internalUpdate(int digest, byte[] buffer, int offset, int len); + + final int update(int digest, ByteBuf buffer, int offset, int len) { + if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) { + return internalUpdate(digest, buffer, offset, len); + } else if (buffer.hasArray()) { + return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len); + } else { + UpdateContext updateContext = new UpdateContext(digest); + ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext); + return updateContext.digest; + } + } abstract void populateValueAndReset(int digest, ByteBuf buffer); @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo this.useV2Protocol = useV2Protocol; this.macCodeLength = getMacCodeLength(); this.allocator = allocator; + this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback(); } public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType, @@ -136,22 +151,7 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long // Compute checksum over the headers int digest = update(0, buf, buf.readerIndex(), buf.readableBytes()); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.safeRelease(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = (CompositeByteBuf) unwrapped; - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, buf); @@ -159,11 +159,11 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long buf.readerIndex(0); if (isSmallEntry) { - buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - unwrapped.release(); + buf.writeBytes(data, data.readerIndex(), data.readableBytes()); + data.release(); return buf; } else { - return ByteBufList.get(buf, unwrapped); + return ByteBufList.get(buf, data); } } @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA headersBuffer.writeLong(length); int digest = update(0, headersBuffer, 0, METADATA_LENGTH); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped); - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, headersBuffer); - - return ByteBufList.get(headersBuffer, unwrapped); + return ByteBufList.get(headersBuffer, data); } /** @@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr long length = dataReceived.readLong(); return new RecoveryData(lastAddConfirmed, length); } + + private static class UpdateContext { + int digest; + + UpdateContext(int digest) { + this.digest = digest; + } + } + + private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback { + + @Override + public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + // recursively visit the sub buffer and update the digest + context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) { + // update the digest with the array + context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength); + } + + @Override + public boolean acceptsMemoryAddress(UpdateContext context) { + return DigestManager.this.acceptsMemoryAddressBuffer(); + } + } + + abstract boolean acceptsMemoryAddressBuffer(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java index eda223ef7f3..07a2bdf464f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) { crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length); } else if (buf.hasArray()) { // Use the internal method to update from array based - crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length); + crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length); } else { // Fallback to data copy if buffer is not contiguous byte[] b = new byte[length]; buf.getBytes(index, b, 0, length); - crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length); + crcValue = updateArray(crcValue, b, 0, length); } } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + private static int updateArray(int crcValue, byte[] buf, int offset, int length) + throws IllegalAccessException, InvocationTargetException { + return (int) updateBytes.invoke(null, crcValue, buf, offset, length); + } + + @Override + public void update(byte[] buffer, int offset, int len) { + try { + crcValue = updateArray(crcValue, buffer, offset, len); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private static final Method updateByteBuffer; private static final Method updateBytes; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java index b15499f0cc5..e2fff9bd7ca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java @@ -38,7 +38,12 @@ int getMacCodeLength() { } @Override - int update(int digest, ByteBuf buffer, int offset, int len) { + int internalUpdate(int digest, ByteBuf buffer, int offset, int len) { + return 0; + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { return 0; } @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {} boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java index c04c411c6c7..f9fda5a531d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { mac.get().update(data.slice(offset, len).nioBuffer()); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + mac.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java index 3d48f0ef7da..7635e3e9f20 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java @@ -39,4 +39,9 @@ public long getValueAndReset() { public void update(ByteBuf buf, int offset, int len) { crc.update(buf.slice(offset, len).nioBuffer()); } + + @Override + public void update(byte[] buffer, int offset, int len) { + crc.update(buffer, offset, len); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index 324588d852b..b363e4da636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -133,43 +132,14 @@ public static ByteBufList get() { * Append a {@link ByteBuf} at the end of this {@link ByteBufList}. */ public void add(ByteBuf buf) { - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - ((CompositeByteBuf) unwrapped).forEach(b -> { - ReferenceCountUtil.retain(b); - buffers.add(b); - }); - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(unwrapped); - } + buffers.add(buf); } /** * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}. */ public void prepend(ByteBuf buf) { - // don't unwrap slices - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf composite = (CompositeByteBuf) unwrapped; - for (int i = composite.numComponents() - 1; i >= 0; i--) { - ByteBuf b = composite.component(i); - ReferenceCountUtil.retain(b); - buffers.add(0, b); - } - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(0, unwrapped); - } + buffers.add(0, buf); } /** @@ -285,7 +255,7 @@ public ByteBufList retain() { @Override protected void deallocate() { for (int i = 0; i < buffers.size(); i++) { - ReferenceCountUtil.release(buffers.get(i)); + buffers.get(i).release(); } buffers.clear(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java new file mode 100644 index 00000000000..32e9c8c55a4 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java @@ -0,0 +1,1132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ByteProcessor; +import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +/** + * This class visits the possible wrapped child buffers of a Netty {@link ByteBuf} for a given offset and length. + *

+ * The Netty ByteBuf API does not provide a method to visit the wrapped child buffers. The + * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it loses the + * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and length information. + *

+ * Despite Netty not having a public API for visiting the sub buffers, it is possible to achieve this using + * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class uses this method to visit the + * wrapped child buffers by providing a suitable {@link ByteBuf} implementation. This implementation supports + * the role of the destination buffer for the getBytes call. It requires implementing the + * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link ByteBuf#setBytes(int, byte[], int, int)} methods + * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, {@link ByteBuf#hasMemoryAddress()}, + * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}. + * All other methods in the internal ByteBuf implementation are not supported and will throw an exception. + * This is to ensure correctness and to fail fast if some ByteBuf implementation is not following the expected + * and supported interface contract. + */ +public class ByteBufVisitor { + private static final int DEFAULT_VISIT_MAX_DEPTH = 10; + + private ByteBufVisitor() { + // prevent instantiation + } + + /** + * This method traverses the potential nested composite buffers of the provided buffer, given a specific offset and + * length. The traversal continues until it encounters a buffer that is backed by an array or a memory address, + * which allows for the inspection of individual buffer segments without the need for data duplication. + * If no such wrapped buffer is found, the callback function is invoked with the original buffer, offset, + * and length as parameters. + * + * @param buffer the buffer to visit + * @param offset the offset for the buffer + * @param length the length for the buffer + * @param callback the callback to call for each visited buffer + * @param context the context to pass to the callback + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context) { + visitBuffers(buffer, offset, length, callback, context, DEFAULT_VISIT_MAX_DEPTH); + } + + /** + * The callback interface for visiting buffers. + * In case of a heap buffer that is backed by an byte[] array, the visitArray method is called. This + * is due to the internal implementation detail of the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} + * method for heap buffers. + */ + public interface ByteBufVisitorCallback { + void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int visitLength); + void visitArray(T context, byte[] visitArray, int visitIndex, int visitLength); + default boolean preferArrayOrMemoryAddress(T context) { + return true; + } + default boolean acceptsMemoryAddress(T context) { + return false; + } + } + + /** + * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, Object)}. This method + * allows to specify the maximum depth of recursion for visiting wrapped buffers. + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context, int maxDepth) { + if (length == 0) { + // skip visiting empty buffers + return; + } + InternalContext internalContext = new InternalContext<>(); + internalContext.maxDepth = maxDepth; + internalContext.callbackContext = context; + internalContext.callback = callback; + internalContext.recursivelyVisitBuffers(buffer, offset, length); + } + + private static final int TL_COPY_BUFFER_SIZE = 64 * 1024; + private static final FastThreadLocal TL_COPY_BUFFER = new FastThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[TL_COPY_BUFFER_SIZE]; + } + }; + + private static class InternalContext { + int depth; + int maxDepth; + ByteBuf parentBuffer; + int parentOffset; + int parentLength; + T callbackContext; + ByteBufVisitorCallback callback; + GetBytesCallbackByteBuf callbackByteBuf = new GetBytesCallbackByteBuf(this); + + void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int visitLength) { + // visit the wrapped buffers recursively if the buffer is not backed by an array or memory address + // and the max depth has not been reached + if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && !visitBuffer.hasArray()) { + parentBuffer = visitBuffer; + parentOffset = visitIndex; + parentLength = visitLength; + depth++; + // call getBytes to trigger the wrapped buffer visit + visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, visitLength); + depth--; + } else { + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } + } + + void handleBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty buffers + return; + } + if (visitBuffer == parentBuffer && visitIndex == parentOffset && visitLength == parentLength) { + // further recursion would cause unnecessary recursion up to the max depth of recursion + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } else { + // use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively + recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength); + } + } + + private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (callback.preferArrayOrMemoryAddress(callbackContext)) { + if (visitBuffer.hasArray()) { + handleArray(visitBuffer.array(), visitBuffer.arrayOffset() + visitIndex, visitLength); + } else if (visitBuffer.hasMemoryAddress() && callback.acceptsMemoryAddress(callbackContext)) { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } else if (callback.acceptsMemoryAddress(callbackContext) && visitBuffer.isDirect() + && visitBuffer.alloc().isDirectBufferPooled()) { + // read-only buffers need to be copied before they can be directly accessed + ByteBuf copyBuffer = visitBuffer.copy(visitIndex, visitLength); + callback.visitBuffer(callbackContext, copyBuffer, 0, visitLength); + copyBuffer.release(); + } else { + // fallback to reading the visited buffer into the copy buffer in a loop + byte[] copyBuffer = TL_COPY_BUFFER.get(); + int remaining = visitLength; + int currentOffset = visitIndex; + while (remaining > 0) { + int readLen = Math.min(remaining, copyBuffer.length); + visitBuffer.getBytes(currentOffset, copyBuffer, 0, readLen); + handleArray(copyBuffer, 0, readLen); + remaining -= readLen; + currentOffset += readLen; + } + } + } else { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } + } + + void handleArray(byte[] visitArray, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty arrays + return; + } + // pass array to callback + callback.visitArray(callbackContext, visitArray, visitIndex, visitLength); + } + } + + /** + * A ByteBuf implementation that can be used as the destination buffer for + * a {@link ByteBuf#getBytes(int, ByteBuf)} for visiting the wrapped child buffers. + */ + static class GetBytesCallbackByteBuf extends ByteBuf { + private final InternalContext internalContext; + + GetBytesCallbackByteBuf(InternalContext internalContext) { + this.internalContext = internalContext; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + internalContext.handleBuffer(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + internalContext.handleArray(src, srcIndex, length); + return this; + } + + @Override + public boolean hasArray() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public boolean hasMemoryAddress() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public int nioBufferCount() { + // return 0 so that the wrapped buffer is visited + return 0; + } + + @Override + public int capacity() { + // should return sufficient capacity for the total length + return Integer.MAX_VALUE; + } + + @Override + public ByteBuf capacity(int newCapacity) { + throw new UnsupportedOperationException(); + } + + @Override + public int maxCapacity() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBufAllocator alloc() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteOrder order() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf unwrap() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf asReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public int readerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int writableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int maxWritableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf clear() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardSomeReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getUnsignedByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLongLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public char getChar(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShortLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setInt(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIntLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLongLE(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setChar(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setFloat(int index, float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setDouble(int index, double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setZero(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readUnsignedByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShort() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readInt() { + throw new UnsupportedOperationException(); + } + + @Override + public int readIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedInt() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLong() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLongLE() { + throw new UnsupportedOperationException(); + } + + @Override + public char readChar() { + throw new UnsupportedOperationException(); + } + + @Override + public float readFloat() { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf skipBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBoolean(boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeByte(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShort(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShortLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMedium(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMediumLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeInt(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeIntLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLong(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLongLE(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeChar(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeFloat(float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeDouble(double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeZero(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf duplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedDuplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + throw new UnsupportedOperationException(); + } + + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(ByteBuf buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain(int increment) { + throw new UnsupportedOperationException(); + } + + @Override + public int refCnt() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch(Object hint) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(int decrement) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(this)); + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return obj == this; + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java new file mode 100644 index 00000000000..6252bb71be9 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.proto.checksum; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import com.scurrilous.circe.checksum.IntHash; +import com.scurrilous.circe.checksum.Java8IntHash; +import com.scurrilous.circe.checksum.Java9IntHash; +import com.scurrilous.circe.checksum.JniIntHash; +import com.scurrilous.circe.crc.Sse42Crc32C; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtoEncoding; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; +import org.apache.commons.lang3.RandomUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test class was added to reproduce a bug in the checksum calculation when + * the payload is a CompositeByteBuf and this buffer has a reader index state other than 0. + * The reader index state gets lost in the unwrapping process. + * + * There were at least 2 different bugs. One that occured when the + * payload was >= BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD and the other when + * it was < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD. + * This test covers both useV2Protocol=true and useV2Protocol=false since the bug was triggered differently. + * + * The bug has been fixed and this test is here to make sure it doesn't happen again. + */ +@RunWith(Parameterized.class) +public class CompositeByteBufUnwrapBugReproduceTest { + final byte[] testPayLoad; + final int defaultBufferPrefixLength; + private final boolean useV2Protocol; + + // set to 0 to 3 to run a single scenario for debugging purposes + private static final int RUN_SINGLE_SCENARIO_FOR_DEBUGGING = -1; + + @Parameterized.Parameters + public static Collection testScenarios() { + List scenarios = Arrays.asList(new Object[][] { + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, false}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, false} + }); + if (RUN_SINGLE_SCENARIO_FOR_DEBUGGING >= 0) { + // pick a single scenario for debugging + scenarios = scenarios.subList(RUN_SINGLE_SCENARIO_FOR_DEBUGGING, 1); + } + return scenarios; + } + + public CompositeByteBufUnwrapBugReproduceTest(int payloadSize, boolean useV2Protocol) { + this.testPayLoad = createTestPayLoad(payloadSize); + this.defaultBufferPrefixLength = payloadSize / 7; + this.useV2Protocol = useV2Protocol; + } + + private static byte[] createTestPayLoad(int payloadSize) { + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payloadSize; i++) { + payload[i] = (byte) i; + } + return payload; + } + + + /** + * A DigestManager that uses the given IntHash implementation for testing. + */ + static class TestIntHashDigestManager extends DigestManager { + private final IntHash intHash; + + public TestIntHashDigestManager(IntHash intHash, long ledgerId, boolean useV2Protocol, + ByteBufAllocator allocator) { + super(ledgerId, useV2Protocol, allocator); + this.intHash = intHash; + } + + @Override + int getMacCodeLength() { + return 4; + } + + @Override + boolean isInt32Digest() { + return true; + } + + @Override + void populateValueAndReset(int digest, ByteBuf buf) { + buf.writeInt(digest); + } + + @Override + int internalUpdate(int digest, ByteBuf data, int offset, int len) { + return intHash.resume(digest, data, offset, len); + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + return intHash.resume(digest, buffer, offset, len); + } + + @Override + boolean acceptsMemoryAddressBuffer() { + return intHash.acceptsMemoryAddressBuffer(); + } + } + + @Test + public void shouldCalculateChecksumForCompositeBuffer() { + ByteBuf testPayload = Unpooled.wrappedBuffer(testPayLoad); + byte[] referenceOutput = computeDigestAndPackageForSending(new Java8IntHash(), testPayload.retainedDuplicate()); + assertDigestAndPackageMatchesReference(new Java8IntHash(), testPayload, referenceOutput); + assertDigestAndPackageMatchesReference(new Java9IntHash(), testPayload, referenceOutput); + if (Sse42Crc32C.isSupported()) { + assertDigestAndPackageMatchesReference(new JniIntHash(), testPayload, referenceOutput); + } + testPayload.release(); + } + + private void assertDigestAndPackageMatchesReference(IntHash intHash, ByteBuf payload, byte[] referenceOutput) { + assertDigestAndPackageScenario(intHash, payload.retainedDuplicate(), referenceOutput, testPayLoad, + "plain payload, no wrapping"); + + ByteBuf payload2 = wrapWithPrefixAndCompositeByteBufWithReaderIndexState(payload.retainedDuplicate(), + defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload2, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf with readerIndex state"); + + ByteBuf payload3 = wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate( + payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload3, referenceOutput, testPayLoad, + "payload with prefix wrapped in 2 layers of CompositeByteBuf with readerIndex state in the outer " + + "composite. In addition, the outer composite is duplicated twice."); + + ByteBuf payload4 = wrapInCompositeByteBufAndSlice(payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload4, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf and sliced"); + } + + private void assertDigestAndPackageScenario(IntHash intHash, ByteBuf payload, byte[] referenceOutput, + byte[] testPayLoadArray, + String scenario) { + // this validates that the readable bytes in the payload match the TEST_PAYLOAD content + assertArrayEquals(testPayLoadArray, ByteBufUtil.getBytes(payload.duplicate()), + "input is invalid for scenario '" + scenario + "'"); + + ByteBuf visitedCopy = Unpooled.buffer(payload.readableBytes()); + ByteBufVisitor.visitBuffers(payload, payload.readerIndex(), payload.readableBytes(), + new ByteBufVisitor.ByteBufVisitorCallback() { + @Override + public void visitBuffer(Void context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(Void context, byte[] visitArray, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitArray, visitIndex, visitLength); + } + }, null); + + assertArrayEquals(ByteBufUtil.getBytes(visitedCopy), testPayLoadArray, + "visited copy is invalid for scenario '" + scenario + "'. Bug in ByteBufVisitor?"); + + // compute the digest and package + byte[] output = computeDigestAndPackageForSending(intHash, payload.duplicate()); + if (referenceOutput == null) { + referenceOutput = + computeDigestAndPackageForSending(new Java8IntHash(), Unpooled.wrappedBuffer(testPayLoadArray)); + } + // this validates that the output matches the reference output + assertArrayEquals(referenceOutput, output, "output is invalid for scenario '" + scenario + "'"); + } + + private byte[] computeDigestAndPackageForSending(IntHash intHash, ByteBuf data) { + DigestManager digestManager = new TestIntHashDigestManager(intHash, 1, useV2Protocol, ByteBufAllocator.DEFAULT); + ReferenceCounted packagedBuffer = + digestManager.computeDigestAndPackageForSending(1, 0, data.readableBytes(), data, + MacDigestManager.EMPTY_LEDGER_KEY, BookieProtocol.FLAG_NONE); + return packagedBufferToBytes(packagedBuffer); + } + + ByteBuf wrapWithPrefixAndCompositeByteBufWithReaderIndexState(ByteBuf payload, int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, prefixedPayload); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite; + } + + ByteBuf wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate(ByteBuf payload, + int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + CompositeByteBuf innerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + innerComposite.addComponent(true, prefixedPayload); + innerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, innerComposite); + outerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite.duplicate().duplicate(); + } + + ByteBuf wrapInCompositeByteBufAndSlice(ByteBuf payload, int bufferPrefixLength) { + // create a composite buffer + CompositeByteBuf compositeWithPrefix = ByteBufAllocator.DEFAULT.compositeBuffer(); + compositeWithPrefix.addComponent(true, Unpooled.wrappedBuffer(RandomUtils.nextBytes(bufferPrefixLength))); + compositeWithPrefix.addComponent(true, payload); + + // return a slice of the composite buffer so that it returns the payload + return compositeWithPrefix.slice(bufferPrefixLength, payload.readableBytes()); + } + + private static byte[] packagedBufferToBytes(ReferenceCounted packagedBuffer) { + byte[] output; + if (packagedBuffer instanceof ByteBufList) { + ByteBufList bufList = (ByteBufList) packagedBuffer; + output = new byte[bufList.readableBytes()]; + bufList.getBytes(output); + for (int i = 0; i < bufList.size(); i++) { + bufList.getBuffer(i).release(); + } + } else if (packagedBuffer instanceof ByteBuf) { + output = ByteBufUtil.getBytes((ByteBuf) packagedBuffer); + packagedBuffer.release(); + } else { + throw new RuntimeException("Unexpected type: " + packagedBuffer.getClass()); + } + return output; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java index ac7aca77226..88de17d0a9d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -87,39 +86,6 @@ public void testDouble() throws Exception { assertEquals(b2.refCnt(), 0); } - @Test - public void testComposite() throws Exception { - ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b1.writerIndex(b1.capacity()); - ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b2.writerIndex(b2.capacity()); - - CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer(); - composite.addComponent(b1); - composite.addComponent(b2); - - ByteBufList buf = ByteBufList.get(composite); - - // composite is unwrapped into two parts - assertEquals(2, buf.size()); - // and released - assertEquals(composite.refCnt(), 0); - - assertEquals(256, buf.readableBytes()); - assertEquals(b1, buf.getBuffer(0)); - assertEquals(b2, buf.getBuffer(1)); - - assertEquals(buf.refCnt(), 1); - assertEquals(b1.refCnt(), 1); - assertEquals(b2.refCnt(), 1); - - buf.release(); - - assertEquals(buf.refCnt(), 0); - assertEquals(b1.refCnt(), 0); - assertEquals(b2.refCnt(), 0); - } - @Test public void testClone() throws Exception { ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java index 65a77b1492b..d90f8b7ea5d 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java @@ -71,12 +71,30 @@ public static int resumeChecksum(int previousChecksum, ByteBuf payload) { /** * Computes incremental checksum with input previousChecksum and input payload * - * @param previousChecksum : previously computed checksum - * @param payload - * @return + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum */ public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) { return CRC32C_HASH.resume(previousChecksum, payload, offset, len); } + /** + * Computes incremental checksum with input previousChecksum and input payload + * + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum + */ + public static int resumeChecksum(int previousChecksum, byte[] payload, int offset, int len) { + return CRC32C_HASH.resume(previousChecksum, payload, offset, len); + } + + public static boolean acceptsMemoryAddressBuffer() { + return CRC32C_HASH.acceptsMemoryAddressBuffer(); + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java index e8922e3a16b..be98ae19be1 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java @@ -28,4 +28,8 @@ public interface IntHash { int resume(int current, ByteBuf buffer); int resume(int current, ByteBuf buffer, int offset, int len); + + int resume(int current, byte[] buffer, int offset, int len); + + boolean acceptsMemoryAddressBuffer(); } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java index fd548bc4de4..2825c610b11 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java @@ -53,4 +53,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java index 31af153666e..2e779a92766 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java @@ -19,7 +19,6 @@ package com.scurrilous.circe.checksum; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -77,7 +76,7 @@ public int calculate(ByteBuf buffer, int offset, int len) { return resume(0, buffer, offset, len); } - private int resume(int current, long address, int offset, int length) { + private int updateDirectByteBuffer(int current, long address, int offset, int length) { try { return (int) UPDATE_DIRECT_BYTEBUFFER.invoke(null, current, address, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -85,7 +84,20 @@ private int resume(int current, long address, int offset, int length) { } } - private int resume(int current, byte[] array, int offset, int length) { + @Override + public int resume(int current, byte[] array, int offset, int length) { + // the bit-wise complementing of the input and output is explained in the resume method below + current = ~current; + current = updateBytes(current, array, offset, length); + return ~current; + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } + + private static int updateBytes(int current, byte[] array, int offset, int length) { try { return (int) UPDATE_BYTES.invoke(null, current, array, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -100,33 +112,37 @@ public int resume(int current, ByteBuf buffer) { @Override public int resume(int current, ByteBuf buffer, int offset, int len) { - int negCrc = ~current; + // The input value is bit-wise complemented for two reasons: + // 1. The CRC32C algorithm is designed to start with a seed value where all bits are set to 1 (0xffffffff). + // When 0 is initially passed in, ~0 results in the correct initial value (0xffffffff). + // 2. The CRC32C algorithm complements the final value as the last step. This method will always complement + // the return value. Therefore, when the algorithm is used iteratively, it is necessary to complement + // the input value to continue calculations. + // This allows the algorithm to be used incrementally without needing separate initialization and + // finalization steps. + current = ~current; if (buffer.hasMemoryAddress()) { - negCrc = resume(negCrc, buffer.memoryAddress(), offset, len); + current = updateDirectByteBuffer(current, buffer.memoryAddress(), offset, len); } else if (buffer.hasArray()) { int arrayOffset = buffer.arrayOffset() + offset; - negCrc = resume(negCrc, buffer.array(), arrayOffset, len); - } else if (buffer instanceof CompositeByteBuf) { - CompositeByteBuf compositeByteBuf = (CompositeByteBuf) buffer; - int loopedCurrent = current; - for (int i = 0; i < compositeByteBuf.numComponents(); i ++) { - loopedCurrent = resume(loopedCurrent, compositeByteBuf.component(i)); - } - return loopedCurrent; + current = updateBytes(current, buffer.array(), arrayOffset, len); } else { byte[] b = TL_BUFFER.get(); int toRead = len; int loopOffset = offset; while (toRead > 0) { int length = Math.min(toRead, b.length); - buffer.slice(loopOffset, length).readBytes(b, 0, length); - negCrc = resume(negCrc, b, 0, length); + buffer.getBytes(loopOffset, b, 0, length); + current = updateBytes(current, b, 0, length); toRead -= length; loopOffset += length; } } - return ~negCrc; + // The current value is complemented to align with the finalization step of the CRC32C algorithm. + // If there is a subsequent resume step, the value will be complemented again to initiate the next step + // as described in the comments in the beginning of this method. + return ~current; } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java index e8e87bf6b1a..dc5bed0fc1c 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java @@ -51,4 +51,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } } From 27377dcb53cb9f224a729ecee8b552afd3bca73b Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 7 Feb 2024 14:02:45 -0800 Subject: [PATCH 4/4] Bump jsoup from 1.14.3 to 1.15.3 (#3465) Bumps [jsoup](https://github.com/jhy/jsoup) from 1.14.3 to 1.15.3. - [Release notes](https://github.com/jhy/jsoup/releases) - [Changelog](https://github.com/jhy/jsoup/blob/master/CHANGES) - [Commits](https://github.com/jhy/jsoup/compare/jsoup-1.14.3...jsoup-1.15.3) --- updated-dependencies: - dependency-name: org.jsoup:jsoup dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 038c13c4105..a7394b49078 100644 --- a/pom.xml +++ b/pom.xml @@ -143,7 +143,7 @@ 9.4.53.v20231009 1.19 2.8.2 - 1.14.3 + 1.15.3 4.12 5.8.2