diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java new file mode 100644 index 00000000000..4892882e1d1 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -0,0 +1,321 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; +import org.apache.bookkeeper.client.impl.LedgerEntryImpl; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.BatchedReadEntryCallback; +import org.apache.bookkeeper.proto.checksum.DigestManager; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.MathUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BatchedReadOp extends ReadOpBase implements BatchedReadEntryCallback { + + private static final Logger LOG = LoggerFactory.getLogger(BatchedReadOp.class); + + final int maxCount; + final long maxSize; + + BatchedLedgerEntryRequest request; + + BatchedReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + int maxCount, + long maxSize, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, -1L, isRecoveryRead); + this.maxCount = maxCount; + this.maxSize = maxSize; + } + + @Override + void initiate() { + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = getLedgerMetadata().getEnsembleAt(startEntryId); + request = new SequenceReadRequest(ensemble, lh.ledgerId, startEntryId, maxCount, maxSize); + request.read(); + if (clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), request); + } + } + + @Override + protected void submitCallback(int code) { + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } + + cancelSpeculativeTask(true); + + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), startEntryId, code); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + + request.close(); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(request.entries)); + } + } + + @Override + public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBufList bufList, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final BatchedLedgerEntryRequest entry = (BatchedLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + bufList.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + bufList.release(); + } + } + + void sendReadTo(int bookieIndex, BookieId to, BatchedLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, + maxCount, maxSize, this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().batchReadEntries(to, lh.ledgerId, entry.eId, maxCount, maxSize, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } + + abstract class BatchedLedgerEntryRequest extends LedgerEntryRequest { + + //Indicate which ledger the BatchedLedgerEntryRequest is reading. + final long lId; + final int maxCount; + final long maxSize; + + final List entries; + + BatchedLedgerEntryRequest(List ensemble, long lId, long eId, int maxCount, long maxSize) { + super(ensemble, eId); + this.lId = lId; + this.maxCount = maxCount; + this.maxSize = maxSize; + this.entries = new ArrayList<>(maxCount); + } + + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + if (isComplete()) { + return false; + } + if (!complete.getAndSet(true)) { + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + ByteBuf content; + try { + content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + rc = BKException.Code.OK; + /* + * The length is a long and it is the last field of the metadata of an entry. + * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. + */ + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); + entryImpl.setEntryBuf(content); + entries.add(entryImpl); + } + writeSet.recycle(); + return true; + } else { + writeSet.recycle(); + return false; + } + } + + @Override + public String toString() { + return String.format("L%d-E%d~%d s-%d", lh.getId(), eId, eId + maxCount, maxSize); + } + } + + class SequenceReadRequest extends BatchedLedgerEntryRequest { + + static final int NOT_FOUND = -1; + int nextReplicaIndexToReadFrom = 0; + final BitSet sentReplicas; + final BitSet erroredReplicas; + SequenceReadRequest(List ensemble, + long lId, + long eId, + int maxCount, + long maxSize) { + super(ensemble, lId, eId, maxCount, maxSize); + this.sentReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + this.erroredReplicas = new BitSet(lh.getLedgerMetadata().getWriteQuorumSize()); + } + + @Override + void read() { + sendNextRead(); + } + + private synchronized int getNextReplicaIndexToReadFrom() { + return nextReplicaIndexToReadFrom; + } + + private BitSet getSentToBitSet() { + BitSet b = new BitSet(ensemble.size()); + + for (int i = 0; i < sentReplicas.length(); i++) { + if (sentReplicas.get(i)) { + b.set(writeSet.get(i)); + } + } + return b; + } + + private boolean readsOutstanding() { + return (sentReplicas.cardinality() - erroredReplicas.cardinality()) > 0; + } + + @Override + synchronized BookieId maybeSendSpeculativeRead(BitSet heardFrom) { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + return null; + } + + BitSet sentTo = getSentToBitSet(); + sentTo.and(heardFrom); + + // only send another read if we have had no successful response at all + // (even for other entries) from any of the other bookies we have sent the + // request to + if (sentTo.cardinality() == 0) { + clientCtx.getClientStats().getSpeculativeReadCounter().inc(); + return sendNextRead(); + } else { + return null; + } + } + + synchronized BookieId sendNextRead() { + if (nextReplicaIndexToReadFrom >= getLedgerMetadata().getWriteQuorumSize()) { + // we are done, the read has failed from all replicas, just fail the + // read + fail(firstError); + return null; + } + + // ToDo: pick replica with writable PCBC. ISSUE #1239 + // https://github.com/apache/bookkeeper/issues/1239 + int replica = nextReplicaIndexToReadFrom; + int bookieIndex = writeSet.get(nextReplicaIndexToReadFrom); + nextReplicaIndexToReadFrom++; + + try { + BookieId to = ensemble.get(bookieIndex); + sendReadTo(bookieIndex, to, this); + sentToHosts.add(to); + sentReplicas.set(replica); + return to; + } catch (InterruptedException ie) { + LOG.error("Interrupted reading entry " + this, ie); + Thread.currentThread().interrupt(); + fail(BKException.Code.InterruptedException); + return null; + } + } + + @Override + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + super.logErrorAndReattemptRead(bookieIndex, host, errMsg, rc); + int replica = writeSet.indexOf(bookieIndex); + if (replica == NOT_FOUND) { + LOG.error("Received error from a host which is not in the ensemble {} {}.", host, ensemble); + return; + } + erroredReplicas.set(replica); + if (isRecoveryRead && (numBookiesMissingEntry >= requiredBookiesMissingEntryForRecovery)) { + /* For recovery, report NoSuchEntry as soon as wQ-aQ+1 bookies report that they do not + * have the entry */ + fail(BKException.Code.NoSuchEntryException); + return; + } + + if (!readsOutstanding()) { + sendNextRead(); + } + } + + @Override + boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { + boolean completed = super.complete(bookieIndex, host, bufList); + if (completed) { + int numReplicasTried = getNextReplicaIndexToReadFrom(); + // Check if any speculative reads were issued and mark any slow bookies before + // the first successful speculative read as "slow" + for (int i = 0; i < numReplicasTried - 1; i++) { + int slowBookieIndex = writeSet.get(i); + BookieId slowBookieSocketAddress = ensemble.get(slowBookieIndex); + clientCtx.getPlacementPolicy().registerSlowBookie(slowBookieSocketAddress, eId); + } + } + return completed; + } + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java index cff66a3cb3e..fc83617cef6 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ClientInternalConf.java @@ -48,6 +48,8 @@ class ClientInternalConf { final boolean enableBookieFailureTracking; final boolean useV2WireProtocol; final boolean enforceMinNumFaultDomainsForWrite; + final boolean batchReadEnabled; + final int nettyMaxFrameSizeBytes; static ClientInternalConf defaultValues() { return fromConfig(new ClientConfiguration()); @@ -72,9 +74,9 @@ private ClientInternalConf(ClientConfiguration conf, this.addEntryQuorumTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryQuorumTimeout()); this.throttleValue = conf.getThrottleValue(); this.bookieFailureHistoryExpirationMSec = conf.getBookieFailureHistoryExpirationMSec(); - + this.batchReadEnabled = conf.isBatchReadEnabled(); + this.nettyMaxFrameSizeBytes = conf.getNettyMaxFrameSizeBytes(); this.disableEnsembleChangeFeature = featureProvider.getFeature(conf.getDisableEnsembleChangeFeatureName()); - this.delayEnsembleChange = conf.getDelayEnsembleChange(); this.maxAllowedEnsembleChanges = conf.getMaxAllowedEnsembleChanges(); this.timeoutMonitorIntervalSec = conf.getTimeoutMonitorIntervalSec(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index 9486b2e632c..6a98af55032 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -103,6 +103,7 @@ public class LedgerHandle implements WriteHandle { final long ledgerId; final ExecutorService executor; long lastAddPushed; + boolean notSupportBatch; private enum HandleState { OPEN, @@ -641,6 +642,26 @@ public Enumeration readEntries(long firstEntry, long lastEntry) return SyncCallbackUtils.waitForResult(result); } + /** + * Read a sequence of entries synchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @see #asyncBatchReadEntries(long, int, long, boolean, ReadCallback, Object) + */ + public Enumeration batchReadEntries(long startEntry, int maxCount, long maxSize) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadEntries(startEntry, maxCount, maxSize, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } + /** * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
* This is the same of @@ -664,6 +685,27 @@ public Enumeration readUnconfirmedEntries(long firstEntry, long las return SyncCallbackUtils.waitForResult(result); } + /** + * Read a sequence of entries synchronously, allowing to read after the LastAddConfirmed range.
+ * This is the same of + * {@link #asyncBatchReadUnconfirmedEntries(long, int, long, boolean, ReadCallback, Object) } + * + * @param firstEntry + * id of first entry of sequence (included) + * @param maxCount + * id of last entry of sequence (included) + * @param maxSize + * the total entries size + */ + public Enumeration batchReadUnconfirmedEntries(long firstEntry, int maxCount, long maxSize) + throws InterruptedException, BKException { + CompletableFuture> result = new CompletableFuture<>(); + + asyncBatchReadUnconfirmedEntries(firstEntry, maxCount, maxSize, new SyncReadCallback(result), null); + + return SyncCallbackUtils.waitForResult(result); + } + /** * Read a sequence of entries asynchronously. * @@ -695,6 +737,50 @@ public void asyncReadEntries(long firstEntry, long lastEntry, ReadCallback cb, O asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + /** + * Read a sequence of entries in asynchronously. + * It send an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, Object ctx) { + // Little sanity check + if (startEntry > lastAddConfirmed) { + LOG.error("ReadEntries exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + cb.readComplete(BKException.Code.ReadException, this, null, ctx); + return; + } + if (notSupportBatchRead()) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of @@ -734,6 +820,48 @@ public void asyncReadUnconfirmedEntries(long firstEntry, long lastEntry, ReadCal asyncReadEntriesInternal(firstEntry, lastEntry, cb, ctx, false); } + /** + * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + * @param cb + * object implementing read callback interface + * @param ctx + * control object + */ + public void asyncBatchReadUnconfirmedEntries(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + cb.readComplete(BKException.Code.IncorrectParameterException, this, null, ctx); + } + if (notSupportBatchRead()) { + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + asyncBatchReadEntriesInternal(startEntry, maxCount, maxSize, new ReadCallback() { + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object ctx) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (rc == Code.BookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = startEntry + maxCount - 1; + asyncReadEntriesInternal(startEntry, lastEntry, cb, ctx, false); + } else { + cb.readComplete(rc, lh, seq, ctx); + } + } + }, ctx, false); + } + } + /** * Read a sequence of entries asynchronously. * @@ -760,6 +888,123 @@ public CompletableFuture readAsync(long firstEntry, long lastEntr return readEntriesInternalAsync(firstEntry, lastEntry, false); } + /** + * Read a sequence of entries in asynchronously. + * It sends an RPC to get all entries instead of send multi RPC to get all entries. + * + * @param startEntry + * id of first entry of sequence + * @param maxCount + * the entries count + * @param maxSize + * the total entries size + */ + @Override + public CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + // Little sanity check + if (startEntry < 0) { + LOG.error("IncorrectParameterException on ledgerId:{} firstEntry:{}", ledgerId, startEntry); + return FutureUtils.exception(new BKIncorrectParameterException()); + } + if (startEntry > lastAddConfirmed) { + LOG.error("ReadAsync exception on ledgerId:{} firstEntry:{} lastAddConfirmed:{}", + ledgerId, startEntry, lastAddConfirmed); + return FutureUtils.exception(new BKReadException()); + } + if (notSupportBatchRead()) { + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + return readEntriesInternalAsync(startEntry, lastEntry, false); + } + CompletableFuture future = new CompletableFuture<>(); + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, false) + .whenComplete((entries, ex) -> { + if (ex != null) { + //If the bookie server not support the batch read request, the bookie server will close the + // connection, then get the BookieHandleNotAvailableException. + if (ex instanceof BKException.BKBookieHandleNotAvailableException) { + notSupportBatch = true; + long lastEntry = Math.min(startEntry + maxCount - 1, lastAddConfirmed); + readEntriesInternalAsync(startEntry, lastEntry, false).whenComplete((entries1, ex1) -> { + if (ex1 != null) { + future.completeExceptionally(ex1); + } else { + future.complete(entries1); + } + }); + } else { + future.completeExceptionally(ex); + } + } else { + future.complete(entries); + } + }); + return future; + } + + private boolean notSupportBatchRead() { + if (!clientCtx.getConf().batchReadEnabled) { + return true; + } + if (notSupportBatch) { + return true; + } + LedgerMetadata ledgerMetadata = getLedgerMetadata(); + return ledgerMetadata.getEnsembleSize() != ledgerMetadata.getWriteQuorumSize(); + } + + private CompletableFuture batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, + boolean isRecoveryRead) { + int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; + if (maxSize > nettyMaxFrameSizeBytes) { + LOG.info( + "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.", + nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + if (maxSize <= 0) { + LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); + maxSize = nettyMaxFrameSizeBytes; + } + BatchedReadOp op = new BatchedReadOp(this, clientCtx, + startEntry, maxCount, maxSize, isRecoveryRead); + if (!clientCtx.isClientClosed()) { + // Waiting on the first one. + // This is not very helpful if there are multiple ensembles or if bookie goes into unresponsive + // state later after N requests sent. + // Unfortunately it seems that alternatives are: + // - send reads one-by-one (up to the app) + // - rework LedgerHandle to send requests one-by-one (maybe later, potential perf impact) + // - block worker pool (not good) + // Even with this implementation one should be more concerned about OOME when all read responses arrive + // or about overloading bookies with these requests then about submission of many small requests. + // Naturally one of the solutions would be to submit smaller batches and in this case + // current implementation will prevent next batch from starting when bookie is + // unresponsive thus helpful enough. + if (clientCtx.getConf().waitForWriteSetMs >= 0) { + DistributionSchedule.WriteSet ws = distributionSchedule.getWriteSet(startEntry); + try { + if (!waitForWritable(ws, ws.size() - 1, clientCtx.getConf().waitForWriteSetMs)) { + op.allowFailFastOnUnwritableChannel(); + } + } finally { + ws.recycle(); + } + } + + if (isHandleWritable()) { + // Ledger handle in read/write mode: submit to OSE for ordered execution. + executeOrdered(op); + } else { + // Read-only ledger handle: bypass OSE and execute read directly in client thread. + // This avoids a context-switch to OSE thread and thus reduces latency. + op.run(); + } + } else { + op.future().completeExceptionally(BKException.create(ClientClosedException)); + } + return op.future(); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of @@ -829,6 +1074,40 @@ public void onFailure(Throwable cause) { } } + void asyncBatchReadEntriesInternal(long startEntry, int maxCount, long maxSize, ReadCallback cb, + Object ctx, boolean isRecoveryRead) { + if (!clientCtx.isClientClosed()) { + batchReadEntriesInternalAsync(startEntry, maxCount, maxSize, isRecoveryRead) + .whenCompleteAsync(new FutureEventListener() { + @Override + public void onSuccess(LedgerEntries entries) { + cb.readComplete( + Code.OK, + LedgerHandle.this, + IteratorUtils.asEnumeration( + Iterators.transform(entries.iterator(), le -> { + LedgerEntry entry = new LedgerEntry((LedgerEntryImpl) le); + le.close(); + return entry; + })), + ctx); + } + + @Override + public void onFailure(Throwable cause) { + if (cause instanceof BKException) { + BKException bke = (BKException) cause; + cb.readComplete(bke.getCode(), LedgerHandle.this, null, ctx); + } else { + cb.readComplete(Code.UnexpectedConditionException, LedgerHandle.this, null, ctx); + } + } + }, clientCtx.getMainWorkerPool().chooseThread(ledgerId)); + } else { + cb.readComplete(Code.ClientClosedException, LedgerHandle.this, null, ctx); + } + } + /* * Read the last entry in the ledger * diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java index 6733b2e9ea9..fedb79696a9 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ListenerBasedPendingReadOp.java @@ -45,7 +45,7 @@ class ListenerBasedPendingReadOp extends PendingReadOp { @Override protected void submitCallback(int code) { - LedgerEntryRequest request; + SingleLedgerEntryRequest request; while (!seq.isEmpty() && (request = seq.getFirst()) != null) { if (!request.isComplete()) { return; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java index 73715859c0d..15d48c64351 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingReadOp.java @@ -21,27 +21,16 @@ package org.apache.bookkeeper.client; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ListenableFuture; import io.netty.buffer.ByteBuf; import java.util.BitSet; -import java.util.HashSet; import java.util.LinkedList; import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.bookkeeper.client.BKException.BKDigestMatchException; -import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.client.api.LedgerMetadata; import org.apache.bookkeeper.client.impl.LedgerEntriesImpl; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallbackCtx; import org.apache.bookkeeper.proto.checksum.DigestManager; import org.apache.bookkeeper.util.MathUtils; import org.slf4j.Logger; @@ -54,85 +43,165 @@ * application as soon as it arrives rather than waiting for the whole thing. * */ -class PendingReadOp implements ReadEntryCallback, Runnable { +class PendingReadOp extends ReadOpBase implements ReadEntryCallback { private static final Logger LOG = LoggerFactory.getLogger(PendingReadOp.class); - private ScheduledFuture speculativeTask = null; - protected final LinkedList seq; - private final CompletableFuture future; - private final Set heardFromHosts; - private final BitSet heardFromHostsBitSet; - private final Set sentToHosts = new HashSet(); - LedgerHandle lh; - final ClientContext clientCtx; + protected boolean parallelRead = false; + protected final LinkedList seq; - long numPendingEntries; - final long startEntryId; - final long endEntryId; - long requestTimeNanos; + PendingReadOp(LedgerHandle lh, + ClientContext clientCtx, + long startEntryId, + long endEntryId, + boolean isRecoveryRead) { + super(lh, clientCtx, startEntryId, endEntryId, isRecoveryRead); + this.seq = new LinkedList<>(); + numPendingEntries = endEntryId - startEntryId + 1; + } + + PendingReadOp parallelRead(boolean enabled) { + this.parallelRead = enabled; + return this; + } + + void initiate() { + long nextEnsembleChange = startEntryId, i = startEntryId; + this.requestTimeNanos = MathUtils.nowInNano(); + List ensemble = null; + do { + if (i == nextEnsembleChange) { + ensemble = getLedgerMetadata().getEnsembleAt(i); + nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); + } + SingleLedgerEntryRequest entry; + if (parallelRead) { + entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); + } else { + entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); + } + seq.add(entry); + i++; + } while (i <= endEntryId); + // read the entries. + for (LedgerEntryRequest entry : seq) { + entry.read(); + if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { + speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() + .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); + } + } + } - final int requiredBookiesMissingEntryForRecovery; - final boolean isRecoveryRead; + @Override + public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { + final ReadContext rctx = (ReadContext) ctx; + final SingleLedgerEntryRequest entry = (SingleLedgerEntryRequest) rctx.entry; + + if (rc != BKException.Code.OK) { + entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); + return; + } + + heardFromHosts.add(rctx.to); + heardFromHostsBitSet.set(rctx.bookieIndex, true); + + buffer.retain(); + // if entry has completed don't handle twice + if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { + if (!isRecoveryRead) { + // do not advance LastAddConfirmed for recovery reads + lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); + } + submitCallback(BKException.Code.OK); + } else { + buffer.release(); + } + + if (numPendingEntries < 0) { + LOG.error("Read too many values for ledger {} : [{}, {}].", + ledgerId, startEntryId, endEntryId); + } + + } + + protected void submitCallback(int code) { + if (BKException.Code.OK == code) { + numPendingEntries--; + if (numPendingEntries != 0) { + return; + } + } + + // ensure callback once + if (!complete.compareAndSet(false, true)) { + return; + } - boolean parallelRead = false; - final AtomicBoolean complete = new AtomicBoolean(false); - boolean allowFailFast = false; + cancelSpeculativeTask(true); - abstract class LedgerEntryRequest implements SpeculativeRequestExecutor, AutoCloseable { + long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); + if (code != BKException.Code.OK) { + long firstUnread = LedgerHandle.INVALID_ENTRY_ID; + Integer firstRc = null; + for (LedgerEntryRequest req : seq) { + if (!req.isComplete()) { + firstUnread = req.eId; + firstRc = req.rc; + break; + } + } + LOG.error( + "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " + + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", + lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, + BKException.getMessage(code), firstUnread, firstRc); + clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); + // release the entries + seq.forEach(LedgerEntryRequest::close); + future.completeExceptionally(BKException.create(code)); + } else { + clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); + future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); + } + } - final AtomicBoolean complete = new AtomicBoolean(false); + void sendReadTo(int bookieIndex, BookieId to, SingleLedgerEntryRequest entry) throws InterruptedException { + if (lh.throttler != null) { + lh.throttler.acquire(); + } - int rc = BKException.Code.OK; - int firstError = BKException.Code.OK; - int numBookiesMissingEntry = 0; + if (isRecoveryRead) { + int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); + } else { + clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, + this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); + } + } - final List ensemble; - final DistributionSchedule.WriteSet writeSet; + abstract class SingleLedgerEntryRequest extends LedgerEntryRequest { final LedgerEntryImpl entryImpl; - final long eId; - LedgerEntryRequest(List ensemble, long lId, long eId) { + SingleLedgerEntryRequest(List ensemble, long lId, long eId) { + super(ensemble, eId); this.entryImpl = LedgerEntryImpl.create(lId, eId); - this.ensemble = ensemble; - this.eId = eId; - - if (clientCtx.getConf().enableReorderReadSequence) { - writeSet = clientCtx.getPlacementPolicy() - .reorderReadSequence( - ensemble, - lh.getBookiesHealthInfo(), - lh.getWriteSetForReadOperation(eId)); - } else { - writeSet = lh.getWriteSetForReadOperation(eId); - } } @Override public void close() { - // this request has succeeded before, can't recycle writeSet again - if (complete.compareAndSet(false, true)) { - rc = BKException.Code.UnexpectedConditionException; - writeSet.recycle(); - } + super.close(); entryImpl.close(); } - /** - * Execute the read request. - */ - abstract void read(); - /** * Complete the read request from host. * - * @param bookieIndex - * bookie index - * @param host - * host that respond the read - * @param buffer - * the data buffer + * @param bookieIndex bookie index + * @param host host that respond the read + * @param buffer the data buffer * @return return true if we managed to complete the entry; - * otherwise return false if the read entry is not complete or it is already completed before + * otherwise return false if the read entry is not complete or it is already completed before */ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { ByteBuf content; @@ -141,7 +210,7 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { } try { content = lh.macManager.verifyDigestAndReturnData(eId, buffer); - } catch (BKDigestMatchException e) { + } catch (BKException.BKDigestMatchException e) { clientCtx.getClientStats().getReadOpDmCounter().inc(); logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", BKException.Code.DigestMatchException); return false; @@ -161,125 +230,9 @@ boolean complete(int bookieIndex, BookieId host, final ByteBuf buffer) { return false; } } - - /** - * Fail the request with given result code rc. - * - * @param rc - * result code to fail the request. - * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. - */ - boolean fail(int rc) { - if (complete.compareAndSet(false, true)) { - this.rc = rc; - submitCallback(rc); - return true; - } else { - return false; - } - } - - /** - * Log error errMsg and reattempt read from host. - * - * @param bookieIndex - * bookie index - * @param host - * host that just respond - * @param errMsg - * error msg to log - * @param rc - * read result code - */ - synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { - if (BKException.Code.OK == firstError - || BKException.Code.NoSuchEntryException == firstError - || BKException.Code.NoSuchLedgerExistsException == firstError) { - firstError = rc; - } else if (BKException.Code.BookieHandleNotAvailableException == firstError - && BKException.Code.NoSuchEntryException != rc - && BKException.Code.NoSuchLedgerExistsException != rc) { - // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is - // returned we need to update firstError to indicate that it might be a valid read but just - // failed. - firstError = rc; - } - if (BKException.Code.NoSuchEntryException == rc - || BKException.Code.NoSuchLedgerExistsException == rc) { - ++numBookiesMissingEntry; - if (LOG.isDebugEnabled()) { - LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", - lh.ledgerId, eId, host); - } - } else { - if (LOG.isInfoEnabled()) { - LOG.info("{} while reading L{} E{} from bookie: {}", - errMsg, lh.ledgerId, eId, host); - } - } - - lh.recordReadErrorOnBookie(bookieIndex); - } - - /** - * Send to next replica speculatively, if required and possible. - * This returns the host we may have sent to for unit testing. - * - * @param heardFromHostsBitSet - * the set of hosts that we already received responses. - * @return host we sent to if we sent. null otherwise. - */ - abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); - - /** - * Whether the read request completed. - * - * @return true if the read request is completed. - */ - boolean isComplete() { - return complete.get(); - } - - /** - * Get result code of this entry. - * - * @return result code. - */ - int getRc() { - return rc; - } - - @Override - public String toString() { - return String.format("L%d-E%d", lh.getId(), eId); - } - - /** - * Issues a speculative request and indicates if more speculative - * requests should be issued. - * - * @return whether more speculative requests should be issued - */ - @Override - public ListenableFuture issueSpeculativeRequest() { - return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { - @Override - public Boolean call() throws Exception { - if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Send speculative read for {}. Hosts sent are {}, " - + " Hosts heard are {}, ensemble is {}.", - this, sentToHosts, heardFromHostsBitSet, ensemble); - } - return true; - } - return false; - } - }); - } } - class ParallelReadRequest extends LedgerEntryRequest { + class ParallelReadRequest extends SingleLedgerEntryRequest { int numPendings; @@ -326,7 +279,7 @@ BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet) { } } - class SequenceReadRequest extends LedgerEntryRequest { + class SequenceReadRequest extends SingleLedgerEntryRequest { static final int NOT_FOUND = -1; int nextReplicaIndexToReadFrom = 0; @@ -456,205 +409,4 @@ boolean complete(int bookieIndex, BookieId host, ByteBuf buffer) { return completed; } } - - PendingReadOp(LedgerHandle lh, - ClientContext clientCtx, - long startEntryId, - long endEntryId, - boolean isRecoveryRead) { - this.seq = new LinkedList<>(); - this.future = new CompletableFuture<>(); - this.lh = lh; - this.clientCtx = clientCtx; - this.startEntryId = startEntryId; - this.endEntryId = endEntryId; - this.isRecoveryRead = isRecoveryRead; - - this.allowFailFast = false; - numPendingEntries = endEntryId - startEntryId + 1; - requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() - - getLedgerMetadata().getAckQuorumSize() + 1; - heardFromHosts = new HashSet<>(); - heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); - } - - CompletableFuture future() { - return future; - } - - protected LedgerMetadata getLedgerMetadata() { - return lh.getLedgerMetadata(); - } - - protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { - if (speculativeTask != null) { - speculativeTask.cancel(mayInterruptIfRunning); - speculativeTask = null; - } - } - - public ScheduledFuture getSpeculativeTask() { - return speculativeTask; - } - - PendingReadOp parallelRead(boolean enabled) { - this.parallelRead = enabled; - return this; - } - - void allowFailFastOnUnwritableChannel() { - allowFailFast = true; - } - - public void submit() { - clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); - } - - void initiate() { - long nextEnsembleChange = startEntryId, i = startEntryId; - this.requestTimeNanos = MathUtils.nowInNano(); - List ensemble = null; - do { - if (i == nextEnsembleChange) { - ensemble = getLedgerMetadata().getEnsembleAt(i); - nextEnsembleChange = LedgerMetadataUtils.getNextEnsembleChange(getLedgerMetadata(), i); - } - LedgerEntryRequest entry; - if (parallelRead) { - entry = new ParallelReadRequest(ensemble, lh.ledgerId, i); - } else { - entry = new SequenceReadRequest(ensemble, lh.ledgerId, i); - } - seq.add(entry); - i++; - } while (i <= endEntryId); - // read the entries. - for (LedgerEntryRequest entry : seq) { - entry.read(); - if (!parallelRead && clientCtx.getConf().readSpeculativeRequestPolicy.isPresent()) { - speculativeTask = clientCtx.getConf().readSpeculativeRequestPolicy.get() - .initiateSpeculativeRequest(clientCtx.getScheduler(), entry); - } - } - } - - @Override - public void run() { - initiate(); - } - - private static class ReadContext implements ReadEntryCallbackCtx { - final int bookieIndex; - final BookieId to; - final LedgerEntryRequest entry; - long lac = LedgerHandle.INVALID_ENTRY_ID; - - ReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - this.bookieIndex = bookieIndex; - this.to = to; - this.entry = entry; - } - - @Override - public void setLastAddConfirmed(long lac) { - this.lac = lac; - } - - @Override - public long getLastAddConfirmed() { - return lac; - } - } - - private static ReadContext createReadContext(int bookieIndex, BookieId to, LedgerEntryRequest entry) { - return new ReadContext(bookieIndex, to, entry); - } - - void sendReadTo(int bookieIndex, BookieId to, LedgerEntryRequest entry) throws InterruptedException { - if (lh.throttler != null) { - lh.throttler.acquire(); - } - - if (isRecoveryRead) { - int flags = BookieProtocol.FLAG_HIGH_PRIORITY | BookieProtocol.FLAG_DO_FENCING; - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), flags, lh.ledgerKey); - } else { - clientCtx.getBookieClient().readEntry(to, lh.ledgerId, entry.eId, - this, new ReadContext(bookieIndex, to, entry), BookieProtocol.FLAG_NONE); - } - } - - @Override - public void readEntryComplete(int rc, long ledgerId, final long entryId, final ByteBuf buffer, Object ctx) { - final ReadContext rctx = (ReadContext) ctx; - final LedgerEntryRequest entry = rctx.entry; - - if (rc != BKException.Code.OK) { - entry.logErrorAndReattemptRead(rctx.bookieIndex, rctx.to, "Error: " + BKException.getMessage(rc), rc); - return; - } - - heardFromHosts.add(rctx.to); - heardFromHostsBitSet.set(rctx.bookieIndex, true); - - buffer.retain(); - // if entry has completed don't handle twice - if (entry.complete(rctx.bookieIndex, rctx.to, buffer)) { - if (!isRecoveryRead) { - // do not advance LastAddConfirmed for recovery reads - lh.updateLastConfirmed(rctx.getLastAddConfirmed(), 0L); - } - submitCallback(BKException.Code.OK); - } else { - buffer.release(); - } - - if (numPendingEntries < 0) { - LOG.error("Read too many values for ledger {} : [{}, {}].", - ledgerId, startEntryId, endEntryId); - } - } - - protected void submitCallback(int code) { - if (BKException.Code.OK == code) { - numPendingEntries--; - if (numPendingEntries != 0) { - return; - } - } - - // ensure callback once - if (!complete.compareAndSet(false, true)) { - return; - } - - cancelSpeculativeTask(true); - - long latencyNanos = MathUtils.elapsedNanos(requestTimeNanos); - if (code != BKException.Code.OK) { - long firstUnread = LedgerHandle.INVALID_ENTRY_ID; - Integer firstRc = null; - for (LedgerEntryRequest req : seq) { - if (!req.isComplete()) { - firstUnread = req.eId; - firstRc = req.rc; - break; - } - } - LOG.error( - "Read of ledger entry failed: L{} E{}-E{}, Sent to {}, " - + "Heard from {} : bitset = {}, Error = '{}'. First unread entry is ({}, rc = {})", - lh.getId(), startEntryId, endEntryId, sentToHosts, heardFromHosts, heardFromHostsBitSet, - BKException.getMessage(code), firstUnread, firstRc); - clientCtx.getClientStats().getReadOpLogger().registerFailedEvent(latencyNanos, TimeUnit.NANOSECONDS); - // release the entries - seq.forEach(LedgerEntryRequest::close); - future.completeExceptionally(BKException.create(code)); - } else { - clientCtx.getClientStats().getReadOpLogger().registerSuccessfulEvent(latencyNanos, TimeUnit.NANOSECONDS); - future.complete(LedgerEntriesImpl.create(Lists.transform(seq, input -> input.entryImpl))); - } - } - } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java new file mode 100644 index 00000000000..cbd68ec657a --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/ReadOpBase.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import com.google.common.util.concurrent.ListenableFuture; +import java.util.BitSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerMetadata; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class ReadOpBase implements Runnable { + + private static final Logger LOG = LoggerFactory.getLogger(ReadOpBase.class); + + protected ScheduledFuture speculativeTask = null; + protected final CompletableFuture future; + protected final Set heardFromHosts; + protected final BitSet heardFromHostsBitSet; + protected final Set sentToHosts = new HashSet(); + LedgerHandle lh; + protected ClientContext clientCtx; + + protected final long startEntryId; + protected long requestTimeNanos; + + protected final int requiredBookiesMissingEntryForRecovery; + protected final boolean isRecoveryRead; + + protected final AtomicBoolean complete = new AtomicBoolean(false); + protected boolean allowFailFast = false; + long numPendingEntries; + final long endEntryId; + protected ReadOpBase(LedgerHandle lh, ClientContext clientCtx, long startEntryId, long endEntryId, + boolean isRecoveryRead) { + this.lh = lh; + this.future = new CompletableFuture<>(); + this.startEntryId = startEntryId; + this.endEntryId = endEntryId; + this.isRecoveryRead = isRecoveryRead; + this.requiredBookiesMissingEntryForRecovery = getLedgerMetadata().getWriteQuorumSize() + - getLedgerMetadata().getAckQuorumSize() + 1; + this.heardFromHosts = new HashSet<>(); + this.heardFromHostsBitSet = new BitSet(getLedgerMetadata().getEnsembleSize()); + this.allowFailFast = false; + this.clientCtx = clientCtx; + } + + protected LedgerMetadata getLedgerMetadata() { + return lh.getLedgerMetadata(); + } + + protected void cancelSpeculativeTask(boolean mayInterruptIfRunning) { + if (speculativeTask != null) { + speculativeTask.cancel(mayInterruptIfRunning); + speculativeTask = null; + } + } + + public ScheduledFuture getSpeculativeTask() { + return speculativeTask; + } + + CompletableFuture future() { + return future; + } + + void allowFailFastOnUnwritableChannel() { + allowFailFast = true; + } + + public void submit() { + clientCtx.getMainWorkerPool().executeOrdered(lh.ledgerId, this); + } + + @Override + public void run() { + initiate(); + } + + abstract void initiate(); + + protected abstract void submitCallback(int code); + + abstract class LedgerEntryRequest implements SpeculativeRequestExecutor { + + final AtomicBoolean complete = new AtomicBoolean(false); + + int rc = BKException.Code.OK; + int firstError = BKException.Code.OK; + int numBookiesMissingEntry = 0; + + final long eId; + + final List ensemble; + final DistributionSchedule.WriteSet writeSet; + + + LedgerEntryRequest(List ensemble, final long eId) { + this.ensemble = ensemble; + this.eId = eId; + if (clientCtx.getConf().enableReorderReadSequence) { + writeSet = clientCtx.getPlacementPolicy() + .reorderReadSequence( + ensemble, + lh.getBookiesHealthInfo(), + lh.getWriteSetForReadOperation(eId)); + } else { + writeSet = lh.getWriteSetForReadOperation(eId); + } + } + + public void close() { + // this request has succeeded before, can't recycle writeSet again + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.UnexpectedConditionException; + writeSet.recycle(); + } + } + + /** + * Execute the read request. + */ + abstract void read(); + + /** + * Fail the request with given result code rc. + * + * @param rc + * result code to fail the request. + * @return true if we managed to fail the entry; otherwise return false if it already failed or completed. + */ + boolean fail(int rc) { + if (complete.compareAndSet(false, true)) { + this.rc = rc; + writeSet.recycle(); + submitCallback(rc); + return true; + } else { + return false; + } + } + + /** + * Log error errMsg and reattempt read from host. + * + * @param bookieIndex + * bookie index + * @param host + * host that just respond + * @param errMsg + * error msg to log + * @param rc + * read result code + */ + synchronized void logErrorAndReattemptRead(int bookieIndex, BookieId host, String errMsg, int rc) { + if (BKException.Code.OK == firstError + || BKException.Code.NoSuchEntryException == firstError + || BKException.Code.NoSuchLedgerExistsException == firstError) { + firstError = rc; + } else if (BKException.Code.BookieHandleNotAvailableException == firstError + && BKException.Code.NoSuchEntryException != rc + && BKException.Code.NoSuchLedgerExistsException != rc) { + // if other exception rather than NoSuchEntryException or NoSuchLedgerExistsException is + // returned we need to update firstError to indicate that it might be a valid read but just + // failed. + firstError = rc; + } + if (BKException.Code.NoSuchEntryException == rc + || BKException.Code.NoSuchLedgerExistsException == rc) { + ++numBookiesMissingEntry; + if (LOG.isDebugEnabled()) { + LOG.debug("No such entry found on bookie. L{} E{} bookie: {}", + lh.ledgerId, eId, host); + } + } else { + if (LOG.isInfoEnabled()) { + LOG.info("{} while reading L{} E{} from bookie: {}", + errMsg, lh.ledgerId, eId, host); + } + } + + lh.recordReadErrorOnBookie(bookieIndex); + } + + /** + * Send to next replica speculatively, if required and possible. + * This returns the host we may have sent to for unit testing. + * + * @param heardFromHostsBitSet + * the set of hosts that we already received responses. + * @return host we sent to if we sent. null otherwise. + */ + abstract BookieId maybeSendSpeculativeRead(BitSet heardFromHostsBitSet); + + /** + * Whether the read request completed. + * + * @return true if the read request is completed. + */ + boolean isComplete() { + return complete.get(); + } + + /** + * Get result code of this entry. + * + * @return result code. + */ + int getRc() { + return rc; + } + + @Override + public String toString() { + return String.format("L%d-E%d", lh.getId(), eId); + } + + /** + * Issues a speculative request and indicates if more speculative + * requests should be issued. + * + * @return whether more speculative requests should be issued + */ + @Override + public ListenableFuture issueSpeculativeRequest() { + return clientCtx.getMainWorkerPool().submitOrdered(lh.getId(), new Callable() { + @Override + public Boolean call() throws Exception { + if (!isComplete() && null != maybeSendSpeculativeRead(heardFromHostsBitSet)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Send speculative read for {}. Hosts sent are {}, " + + " Hosts heard are {}, ensemble is {}.", + this, sentToHosts, heardFromHostsBitSet, ensemble); + } + return true; + } + return false; + } + }); + } + } + + protected static class ReadContext implements BookkeeperInternalCallbacks.ReadEntryCallbackCtx { + final int bookieIndex; + final BookieId to; + final PendingReadOp.LedgerEntryRequest entry; + long lac = LedgerHandle.INVALID_ENTRY_ID; + + ReadContext(int bookieIndex, BookieId to, PendingReadOp.LedgerEntryRequest entry) { + this.bookieIndex = bookieIndex; + this.to = to; + this.entry = entry; + } + + @Override + public void setLastAddConfirmed(long lac) { + this.lac = lac; + } + + @Override + public long getLastAddConfirmed() { + return lac; + } + } +} diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java index d5e906d17d8..e9bcddd0b39 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/api/ReadHandle.java @@ -45,6 +45,23 @@ public interface ReadHandle extends Handle { */ CompletableFuture readAsync(long firstEntry, long lastEntry); + /** + * Read a sequence of entries asynchronously. + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return an handle to the result of the operation + */ + default CompletableFuture batchReadAsync(long startEntry, int maxCount, long maxSize) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(new UnsupportedOperationException()); + return future; + } + /** * Read a sequence of entries synchronously. * @@ -59,6 +76,21 @@ default LedgerEntries read(long firstEntry, long lastEntry) throws BKException, BKException.HANDLER); } + /** + * + * @param startEntry + * start entry id + * @param maxCount + * the total entries count. + * @param maxSize + * the total entries size. + * @return the result of the operation + */ + default LedgerEntries batchRead(long startEntry, int maxCount, long maxSize) + throws BKException, InterruptedException { + return FutureUtils.result(batchReadAsync(startEntry, maxCount, maxSize), BKException.HANDLER); + } + /** * Read a sequence of entries asynchronously, allowing to read after the LastAddConfirmed range. *
This is the same of diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java index 66dc160fd5f..924dee4ada4 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ClientConfiguration.java @@ -202,6 +202,9 @@ public class ClientConfiguration extends AbstractConfiguration crc = new FastThreadLocal() { @@ -62,14 +63,25 @@ void populateValueAndReset(int digest, ByteBuf buf) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { crc.get().update(data, offset, len); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + crc.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { // This is stored as 8 bytes return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return DirectMemoryCRC32Digest.isSupported(); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java index eab33945b1e..1e78e4075eb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java @@ -20,10 +20,8 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufUtil; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; -import io.netty.util.ReferenceCountUtil; import io.netty.util.ReferenceCounted; import io.netty.util.concurrent.FastThreadLocal; import java.security.GeneralSecurityException; @@ -34,6 +32,7 @@ import org.apache.bookkeeper.proto.BookieProtocol; import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,10 +52,25 @@ public abstract class DigestManager { final long ledgerId; final boolean useV2Protocol; private final ByteBufAllocator allocator; + private final DigestUpdaterByteBufVisitorCallback byteBufVisitorCallback; abstract int getMacCodeLength(); - abstract int update(int digest, ByteBuf buffer, int offset, int len); + abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len); + + abstract int internalUpdate(int digest, byte[] buffer, int offset, int len); + + final int update(int digest, ByteBuf buffer, int offset, int len) { + if (buffer.hasMemoryAddress() && acceptsMemoryAddressBuffer()) { + return internalUpdate(digest, buffer, offset, len); + } else if (buffer.hasArray()) { + return internalUpdate(digest, buffer.array(), buffer.arrayOffset() + offset, len); + } else { + UpdateContext updateContext = new UpdateContext(digest); + ByteBufVisitor.visitBuffers(buffer, offset, len, byteBufVisitorCallback, updateContext); + return updateContext.digest; + } + } abstract void populateValueAndReset(int digest, ByteBuf buffer); @@ -69,6 +83,7 @@ public DigestManager(long ledgerId, boolean useV2Protocol, ByteBufAllocator allo this.useV2Protocol = useV2Protocol; this.macCodeLength = getMacCodeLength(); this.allocator = allocator; + this.byteBufVisitorCallback = new DigestUpdaterByteBufVisitorCallback(); } public static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digestType, @@ -136,22 +151,7 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long // Compute checksum over the headers int digest = update(0, buf, buf.readerIndex(), buf.readableBytes()); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.safeRelease(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = (CompositeByteBuf) unwrapped; - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, buf); @@ -159,11 +159,11 @@ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long buf.readerIndex(0); if (isSmallEntry) { - buf.writeBytes(unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - unwrapped.release(); + buf.writeBytes(data, data.readerIndex(), data.readableBytes()); + data.release(); return buf; } else { - return ByteBufList.get(buf, unwrapped); + return ByteBufList.get(buf, data); } } @@ -176,25 +176,9 @@ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long lastA headersBuffer.writeLong(length); int digest = update(0, headersBuffer, 0, METADATA_LENGTH); - - // don't unwrap slices - final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() instanceof CompositeByteBuf - ? data.unwrap() : data; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(data); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf cbb = ((CompositeByteBuf) unwrapped); - for (int i = 0; i < cbb.numComponents(); i++) { - ByteBuf b = cbb.component(i); - digest = update(digest, b, b.readerIndex(), b.readableBytes()); - } - } else { - digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes()); - } + digest = update(digest, data, data.readerIndex(), data.readableBytes()); populateValueAndReset(digest, headersBuffer); - - return ByteBufList.get(headersBuffer, unwrapped); + return ByteBufList.get(headersBuffer, data); } /** @@ -373,4 +357,34 @@ public RecoveryData verifyDigestAndReturnLastConfirmed(ByteBuf dataReceived) thr long length = dataReceived.readLong(); return new RecoveryData(lastAddConfirmed, length); } + + private static class UpdateContext { + int digest; + + UpdateContext(int digest) { + this.digest = digest; + } + } + + private class DigestUpdaterByteBufVisitorCallback implements ByteBufVisitor.ByteBufVisitorCallback { + + @Override + public void visitBuffer(UpdateContext context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + // recursively visit the sub buffer and update the digest + context.digest = internalUpdate(context.digest, visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(UpdateContext context, byte[] visitArray, int visitIndex, int visitLength) { + // update the digest with the array + context.digest = internalUpdate(context.digest, visitArray, visitIndex, visitLength); + } + + @Override + public boolean acceptsMemoryAddress(UpdateContext context) { + return DigestManager.this.acceptsMemoryAddressBuffer(); + } + } + + abstract boolean acceptsMemoryAddressBuffer(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java index eda223ef7f3..07a2bdf464f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DirectMemoryCRC32Digest.java @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) { crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length); } else if (buf.hasArray()) { // Use the internal method to update from array based - crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length); + crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length); } else { // Fallback to data copy if buffer is not contiguous byte[] b = new byte[length]; buf.getBytes(index, b, 0, length); - crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length); + crcValue = updateArray(crcValue, b, 0, length); } } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); } } + private static int updateArray(int crcValue, byte[] buf, int offset, int length) + throws IllegalAccessException, InvocationTargetException { + return (int) updateBytes.invoke(null, crcValue, buf, offset, length); + } + + @Override + public void update(byte[] buffer, int offset, int len) { + try { + crcValue = updateArray(crcValue, buffer, offset, len); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + private static final Method updateByteBuffer; private static final Method updateBytes; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java index b15499f0cc5..e2fff9bd7ca 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DummyDigestManager.java @@ -38,7 +38,12 @@ int getMacCodeLength() { } @Override - int update(int digest, ByteBuf buffer, int offset, int len) { + int internalUpdate(int digest, ByteBuf buffer, int offset, int len) { + return 0; + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { return 0; } @@ -49,4 +54,9 @@ void populateValueAndReset(int digest, ByteBuf buffer) {} boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java index c04c411c6c7..f9fda5a531d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/MacDigestManager.java @@ -96,13 +96,24 @@ void populateValueAndReset(int digest, ByteBuf buffer) { } @Override - int update(int digest, ByteBuf data, int offset, int len) { + int internalUpdate(int digest, ByteBuf data, int offset, int len) { mac.get().update(data.slice(offset, len).nioBuffer()); return 0; } + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + mac.get().update(buffer, offset, len); + return 0; + } + @Override boolean isInt32Digest() { return false; } + + @Override + boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java index 3d48f0ef7da..7635e3e9f20 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/StandardCRC32Digest.java @@ -39,4 +39,9 @@ public long getValueAndReset() { public void update(ByteBuf buf, int offset, int len) { crc.update(buf.slice(offset, len).nioBuffer()); } + + @Override + public void update(byte[] buffer, int offset, int len) { + crc.update(buffer, offset, len); + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index 324588d852b..b363e4da636 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -22,7 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; @@ -133,43 +132,14 @@ public static ByteBufList get() { * Append a {@link ByteBuf} at the end of this {@link ByteBufList}. */ public void add(ByteBuf buf) { - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - ((CompositeByteBuf) unwrapped).forEach(b -> { - ReferenceCountUtil.retain(b); - buffers.add(b); - }); - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(unwrapped); - } + buffers.add(buf); } /** * Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}. */ public void prepend(ByteBuf buf) { - // don't unwrap slices - final ByteBuf unwrapped = buf.unwrap() != null && buf.unwrap() instanceof CompositeByteBuf - ? buf.unwrap() : buf; - ReferenceCountUtil.retain(unwrapped); - ReferenceCountUtil.release(buf); - - if (unwrapped instanceof CompositeByteBuf) { - CompositeByteBuf composite = (CompositeByteBuf) unwrapped; - for (int i = composite.numComponents() - 1; i >= 0; i--) { - ByteBuf b = composite.component(i); - ReferenceCountUtil.retain(b); - buffers.add(0, b); - } - ReferenceCountUtil.release(unwrapped); - } else { - buffers.add(0, unwrapped); - } + buffers.add(0, buf); } /** @@ -285,7 +255,7 @@ public ByteBufList retain() { @Override protected void deallocate() { for (int i = 0; i < buffers.size(); i++) { - ReferenceCountUtil.release(buffers.get(i)); + buffers.get(i).release(); } buffers.clear(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java new file mode 100644 index 00000000000..32e9c8c55a4 --- /dev/null +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufVisitor.java @@ -0,0 +1,1132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.bookkeeper.util; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.util.ByteProcessor; +import io.netty.util.concurrent.FastThreadLocal; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.channels.GatheringByteChannel; +import java.nio.channels.ScatteringByteChannel; +import java.nio.charset.Charset; + +/** + * This class visits the possible wrapped child buffers of a Netty {@link ByteBuf} for a given offset and length. + *

+ * The Netty ByteBuf API does not provide a method to visit the wrapped child buffers. The + * {@link ByteBuf#unwrap()} method is not suitable for this purpose as it loses the + * {@link ByteBuf#readerIndex()} state, resulting in incorrect offset and length information. + *

+ * Despite Netty not having a public API for visiting the sub buffers, it is possible to achieve this using + * the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} method. This class uses this method to visit the + * wrapped child buffers by providing a suitable {@link ByteBuf} implementation. This implementation supports + * the role of the destination buffer for the getBytes call. It requires implementing the + * {@link ByteBuf#setBytes(int, ByteBuf, int, int)} and {@link ByteBuf#setBytes(int, byte[], int, int)} methods + * and other methods required by getBytes such as {@link ByteBuf#hasArray()}, {@link ByteBuf#hasMemoryAddress()}, + * {@link ByteBuf#nioBufferCount()} and {@link ByteBuf#capacity()}. + * All other methods in the internal ByteBuf implementation are not supported and will throw an exception. + * This is to ensure correctness and to fail fast if some ByteBuf implementation is not following the expected + * and supported interface contract. + */ +public class ByteBufVisitor { + private static final int DEFAULT_VISIT_MAX_DEPTH = 10; + + private ByteBufVisitor() { + // prevent instantiation + } + + /** + * This method traverses the potential nested composite buffers of the provided buffer, given a specific offset and + * length. The traversal continues until it encounters a buffer that is backed by an array or a memory address, + * which allows for the inspection of individual buffer segments without the need for data duplication. + * If no such wrapped buffer is found, the callback function is invoked with the original buffer, offset, + * and length as parameters. + * + * @param buffer the buffer to visit + * @param offset the offset for the buffer + * @param length the length for the buffer + * @param callback the callback to call for each visited buffer + * @param context the context to pass to the callback + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context) { + visitBuffers(buffer, offset, length, callback, context, DEFAULT_VISIT_MAX_DEPTH); + } + + /** + * The callback interface for visiting buffers. + * In case of a heap buffer that is backed by an byte[] array, the visitArray method is called. This + * is due to the internal implementation detail of the {@link ByteBuf#getBytes(int, ByteBuf, int, int)} + * method for heap buffers. + */ + public interface ByteBufVisitorCallback { + void visitBuffer(T context, ByteBuf visitBuffer, int visitIndex, int visitLength); + void visitArray(T context, byte[] visitArray, int visitIndex, int visitLength); + default boolean preferArrayOrMemoryAddress(T context) { + return true; + } + default boolean acceptsMemoryAddress(T context) { + return false; + } + } + + /** + * See @{@link #visitBuffers(ByteBuf, int, int, ByteBufVisitorCallback, Object)}. This method + * allows to specify the maximum depth of recursion for visiting wrapped buffers. + */ + public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufVisitorCallback callback, + T context, int maxDepth) { + if (length == 0) { + // skip visiting empty buffers + return; + } + InternalContext internalContext = new InternalContext<>(); + internalContext.maxDepth = maxDepth; + internalContext.callbackContext = context; + internalContext.callback = callback; + internalContext.recursivelyVisitBuffers(buffer, offset, length); + } + + private static final int TL_COPY_BUFFER_SIZE = 64 * 1024; + private static final FastThreadLocal TL_COPY_BUFFER = new FastThreadLocal() { + @Override + protected byte[] initialValue() { + return new byte[TL_COPY_BUFFER_SIZE]; + } + }; + + private static class InternalContext { + int depth; + int maxDepth; + ByteBuf parentBuffer; + int parentOffset; + int parentLength; + T callbackContext; + ByteBufVisitorCallback callback; + GetBytesCallbackByteBuf callbackByteBuf = new GetBytesCallbackByteBuf(this); + + void recursivelyVisitBuffers(ByteBuf visitBuffer, int visitIndex, int visitLength) { + // visit the wrapped buffers recursively if the buffer is not backed by an array or memory address + // and the max depth has not been reached + if (depth < maxDepth && !visitBuffer.hasMemoryAddress() && !visitBuffer.hasArray()) { + parentBuffer = visitBuffer; + parentOffset = visitIndex; + parentLength = visitLength; + depth++; + // call getBytes to trigger the wrapped buffer visit + visitBuffer.getBytes(visitIndex, callbackByteBuf, 0, visitLength); + depth--; + } else { + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } + } + + void handleBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty buffers + return; + } + if (visitBuffer == parentBuffer && visitIndex == parentOffset && visitLength == parentLength) { + // further recursion would cause unnecessary recursion up to the max depth of recursion + passBufferToCallback(visitBuffer, visitIndex, visitLength); + } else { + // use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively + recursivelyVisitBuffers(visitBuffer, visitIndex, visitLength); + } + } + + private void passBufferToCallback(ByteBuf visitBuffer, int visitIndex, int visitLength) { + if (callback.preferArrayOrMemoryAddress(callbackContext)) { + if (visitBuffer.hasArray()) { + handleArray(visitBuffer.array(), visitBuffer.arrayOffset() + visitIndex, visitLength); + } else if (visitBuffer.hasMemoryAddress() && callback.acceptsMemoryAddress(callbackContext)) { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } else if (callback.acceptsMemoryAddress(callbackContext) && visitBuffer.isDirect() + && visitBuffer.alloc().isDirectBufferPooled()) { + // read-only buffers need to be copied before they can be directly accessed + ByteBuf copyBuffer = visitBuffer.copy(visitIndex, visitLength); + callback.visitBuffer(callbackContext, copyBuffer, 0, visitLength); + copyBuffer.release(); + } else { + // fallback to reading the visited buffer into the copy buffer in a loop + byte[] copyBuffer = TL_COPY_BUFFER.get(); + int remaining = visitLength; + int currentOffset = visitIndex; + while (remaining > 0) { + int readLen = Math.min(remaining, copyBuffer.length); + visitBuffer.getBytes(currentOffset, copyBuffer, 0, readLen); + handleArray(copyBuffer, 0, readLen); + remaining -= readLen; + currentOffset += readLen; + } + } + } else { + callback.visitBuffer(callbackContext, visitBuffer, visitIndex, visitLength); + } + } + + void handleArray(byte[] visitArray, int visitIndex, int visitLength) { + if (visitLength == 0) { + // skip visiting empty arrays + return; + } + // pass array to callback + callback.visitArray(callbackContext, visitArray, visitIndex, visitLength); + } + } + + /** + * A ByteBuf implementation that can be used as the destination buffer for + * a {@link ByteBuf#getBytes(int, ByteBuf)} for visiting the wrapped child buffers. + */ + static class GetBytesCallbackByteBuf extends ByteBuf { + private final InternalContext internalContext; + + GetBytesCallbackByteBuf(InternalContext internalContext) { + this.internalContext = internalContext; + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) { + internalContext.handleBuffer(src, srcIndex, length); + return this; + } + + @Override + public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) { + internalContext.handleArray(src, srcIndex, length); + return this; + } + + @Override + public boolean hasArray() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public boolean hasMemoryAddress() { + // return false so that the wrapped buffer is visited + return false; + } + + @Override + public int nioBufferCount() { + // return 0 so that the wrapped buffer is visited + return 0; + } + + @Override + public int capacity() { + // should return sufficient capacity for the total length + return Integer.MAX_VALUE; + } + + @Override + public ByteBuf capacity(int newCapacity) { + throw new UnsupportedOperationException(); + } + + @Override + public int maxCapacity() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBufAllocator alloc() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteOrder order() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf order(ByteOrder endianness) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf unwrap() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isDirect() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf asReadOnly() { + throw new UnsupportedOperationException(); + } + + @Override + public int readerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readerIndex(int readerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int writerIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writerIndex(int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIndex(int readerIndex, int writerIndex) { + throw new UnsupportedOperationException(); + } + + @Override + public int readableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int writableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public int maxWritableBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isReadable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isWritable(int size) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf clear() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetReaderIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf markWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf resetWriterIndex() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf discardSomeReadBytes() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf ensureWritable(int minWritableBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public int ensureWritable(int minWritableBytes, boolean force) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean getBoolean(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public byte getByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getUnsignedByte(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public short getShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShort(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedShortLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMedium(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getUnsignedMediumLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public int getIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedInt(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getUnsignedIntLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLong(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public long getLongLE(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public char getChar(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public float getFloat(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public double getDouble(int index) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int getBytes(int index, FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence getCharSequence(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBoolean(int index, boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setByte(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShort(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setShortLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMedium(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setMediumLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setInt(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setIntLE(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLong(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setLongLE(int index, long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setChar(int index, int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setFloat(int index, float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setDouble(int index, double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setBytes(int index, ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int setBytes(int index, FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf setZero(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int setCharSequence(int index, CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean readBoolean() { + throw new UnsupportedOperationException(); + } + + @Override + public byte readByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readUnsignedByte() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShort() { + throw new UnsupportedOperationException(); + } + + @Override + public short readShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShort() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedShortLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMedium() { + throw new UnsupportedOperationException(); + } + + @Override + public int readUnsignedMediumLE() { + throw new UnsupportedOperationException(); + } + + @Override + public int readInt() { + throw new UnsupportedOperationException(); + } + + @Override + public int readIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedInt() { + throw new UnsupportedOperationException(); + } + + @Override + public long readUnsignedIntLE() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLong() { + throw new UnsupportedOperationException(); + } + + @Override + public long readLongLE() { + throw new UnsupportedOperationException(); + } + + @Override + public char readChar() { + throw new UnsupportedOperationException(); + } + + @Override + public float readFloat() { + throw new UnsupportedOperationException(); + } + + @Override + public double readDouble() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readRetainedSlice(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(byte[] dst, int dstIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(ByteBuffer dst) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf readBytes(OutputStream out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(GatheringByteChannel out, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public CharSequence readCharSequence(int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int readBytes(FileChannel out, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf skipBytes(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBoolean(boolean value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeByte(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShort(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeShortLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMedium(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeMediumLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeInt(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeIntLE(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLong(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeLongLE(long value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeChar(int value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeFloat(float value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeDouble(double value) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(byte[] src, int srcIndex, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeBytes(ByteBuffer src) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(InputStream in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(ScatteringByteChannel in, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int writeBytes(FileChannel in, long position, int length) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf writeZero(int length) { + throw new UnsupportedOperationException(); + } + + @Override + public int writeCharSequence(CharSequence sequence, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int indexOf(int fromIndex, int toIndex, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int bytesBefore(int index, int length, byte value) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByte(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public int forEachByteDesc(int index, int length, ByteProcessor processor) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf copy(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf slice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedSlice(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf duplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retainedDuplicate() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer nioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer internalNioBuffer(int index, int length) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuffer[] nioBuffers(int index, int length) { + throw new UnsupportedOperationException(); + } + + + @Override + public byte[] array() { + throw new UnsupportedOperationException(); + } + + @Override + public int arrayOffset() { + throw new UnsupportedOperationException(); + } + @Override + public long memoryAddress() { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString(int index, int length, Charset charset) { + throw new UnsupportedOperationException(); + } + + @Override + public int compareTo(ByteBuf buffer) { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain(int increment) { + throw new UnsupportedOperationException(); + } + + @Override + public int refCnt() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf retain() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch() { + throw new UnsupportedOperationException(); + } + + @Override + public ByteBuf touch(Object hint) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean release(int decrement) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return getClass().getSimpleName() + '@' + Integer.toHexString(System.identityHashCode(this)); + } + + @Override + public int hashCode() { + return System.identityHashCode(this); + } + + @Override + public boolean equals(Object obj) { + return obj == this; + } + } + +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java new file mode 100644 index 00000000000..1bb95ed0478 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java @@ -0,0 +1,292 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import org.apache.bookkeeper.client.BKException.Code; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit tests for batch reading. + */ +public class TestBatchedRead extends BookKeeperClusterTestCase { + + private static final Logger LOG = LoggerFactory.getLogger(TestBatchedRead.class); + + final DigestType digestType; + final byte[] passwd = "sequence-read".getBytes(); + + public TestBatchedRead() { + super(6); + baseClientConf.setUseV2WireProtocol(true); + this.digestType = DigestType.CRC32; + } + + long getLedgerToRead(int ensemble, int writeQuorum, int ackQuorum, int numEntries) + throws Exception { + LedgerHandle lh = bkc.createLedger(ensemble, writeQuorum, ackQuorum, digestType, passwd); + for (int i = 0; i < numEntries; i++) { + lh.addEntry(("" + i).getBytes()); + } + lh.close(); + return lh.getId(); + } + + BatchedReadOp createReadOp(LedgerHandle lh, long startEntry, int count) { + return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 1024 * count, false); + } + + BatchedReadOp createRecoveryReadOp(LedgerHandle lh, long startEntry, int count) { + return new BatchedReadOp(lh, bkc.getClientCtx(), startEntry, count, 1024 * count, true); + } + + @Test + public void testNormalRead() throws Exception { + int numEntries = 10; + long id = getLedgerToRead(5, 5, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + //read single entry + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, 1); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + Iterator iterator = readOp.future().get().iterator(); + + int numReads = 0; + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(numReads, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++numReads; + } + assertEquals(numEntries, numReads); + lh.close(); + } + + @Test + public void testReadWhenEnsembleNotEqualWQ() throws Exception { + int numEntries = 10; + long id = getLedgerToRead(5, 2, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + //read single entry + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, 1); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + for (int i = 0; i < numEntries; i++) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + assertFalse(entries.hasNext()); + } + lh.close(); + } + + private static void expectFail(CompletableFuture future, int expectedRc) { + try { + result(future); + fail("Expect to fail"); + } catch (Exception e) { + assertTrue(e instanceof BKException); + BKException bke = (BKException) e; + assertEquals(expectedRc, bke.getCode()); + } + } + + @Test + public void testReadMissingEntries() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 5, 2, numEntries); + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + // read single entry + BatchedReadOp readOp = createReadOp(lh, 10, 1); + readOp.submit(); + expectFail(readOp.future(), Code.NoSuchEntryException); + + // read multiple entries + readOp = createReadOp(lh, 8, 3); + readOp.submit(); + + int index = 8; + int numReads = 0; + Iterator iterator = readOp.future().get().iterator(); + while (iterator.hasNext()) { + LedgerEntry entry = iterator.next(); + assertNotNull(entry); + assertEquals(index, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + ++index; + ++numReads; + } + assertEquals(2, numReads); + lh.close(); + } + + @Test + public void testFailRecoveryReadMissingEntryImmediately() throws Exception { + int numEntries = 1; + + long id = getLedgerToRead(5, 5, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(10); + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(1); + // sleep two bookie + sleepBookie(ensemble.get(0), latch1); + sleepBookie(ensemble.get(1), latch2); + + BatchedReadOp readOp = createRecoveryReadOp(lh, 10, 1); + readOp.submit(); + // would fail immediately if found missing entries don't cover ack quorum + expectFail(readOp.future(), Code.NoSuchEntryException); + latch1.countDown(); + latch2.countDown(); + + lh.close(); + newBk.close(); + } + + @Test + public void testReadWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + + // read multiple entries, because the ensemble is not equals with write quorum, the return entries + // will less than max count. + int numReads = 0; + for (int i = 0; i < numEntries;) { + BatchedReadOp readOp = createReadOp(lh, i, numEntries); + readOp.submit(); + Iterator entries = readOp.future().get().iterator(); + if (!entries.hasNext()) { + i++; + continue; + } + while (entries.hasNext()) { + LedgerEntry entry = entries.next(); + assertNotNull(entry); + assertEquals(i, Integer.parseInt(new String(entry.getEntryBytes()))); + entry.close(); + i++; + numReads++; + } + } + assertEquals(10, numReads); + lh.close(); + newBk.close(); + } + + @Test + public void testReadFailureWithFailedBookies() throws Exception { + int numEntries = 10; + + long id = getLedgerToRead(5, 3, 3, numEntries); + + ClientConfiguration newConf = new ClientConfiguration() + .setReadEntryTimeout(30000); + newConf.setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper newBk = new BookKeeper(newConf); + + LedgerHandle lh = bkc.openLedger(id, digestType, passwd); + + List ensemble = lh.getLedgerMetadata().getEnsembleAt(5); + // kill two bookies + killBookie(ensemble.get(0)); + killBookie(ensemble.get(1)); + killBookie(ensemble.get(2)); + + // read multiple entries + BatchedReadOp readOp = createReadOp(lh, 0, numEntries); + readOp.submit(); + expectFail(readOp.future(), Code.BookieHandleNotAvailableException); + + lh.close(); + newBk.close(); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java index f2ada1e5dc8..423e02b4aad 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestParallelRead.java @@ -33,7 +33,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.List; import java.util.TreeMap; @@ -268,8 +267,8 @@ public void testLedgerEntryRequestComplete() throws Exception { PendingReadOp pendingReadOp = new PendingReadOp(lh, clientContext, 1, 2, false); pendingReadOp.parallelRead(true); pendingReadOp.initiate(); - PendingReadOp.LedgerEntryRequest first = pendingReadOp.seq.get(0); - PendingReadOp.LedgerEntryRequest second = pendingReadOp.seq.get(1); + PendingReadOp.SingleLedgerEntryRequest first = pendingReadOp.seq.get(0); + PendingReadOp.SingleLedgerEntryRequest second = pendingReadOp.seq.get(1); pendingReadOp.submitCallback(-105); @@ -287,13 +286,9 @@ public void testLedgerEntryRequestComplete() throws Exception { assertTrue(second.complete.get()); // Mock ledgerEntryImpl reuse - Method method = PendingReadOp.class.getDeclaredMethod("createReadContext", - int.class, BookieId.class, PendingReadOp.LedgerEntryRequest.class); - method.setAccessible(true); - ByteBuf byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(BKException.Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // byteBuf has been release assertEquals(byteBuf.refCnt(), 1); @@ -308,15 +303,15 @@ public void testLedgerEntryRequestComplete() throws Exception { // read entry failed twice, will not close twice pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); pendingReadOp.readEntryComplete(BKException.Code.TooManyRequestsException, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); // will not complete twice when completed byteBuf = Unpooled.buffer(10); pendingReadOp.readEntryComplete(Code.OK, 1, 1, Unpooled.buffer(10), - method.invoke(pendingReadOp, 1, BookieId.parse("test"), first)); + new ReadOpBase.ReadContext(1, BookieId.parse("test"), first)); assertEquals(1, byteBuf.refCnt()); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java new file mode 100644 index 00000000000..3bf5e2d5e44 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestSpeculativeBatchRead.java @@ -0,0 +1,401 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.bookkeeper.client; + +import static org.apache.bookkeeper.client.BookKeeperClientStats.CLIENT_SCOPE; +import static org.apache.bookkeeper.client.BookKeeperClientStats.SPECULATIVE_READ_COUNT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.BitSet; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.LocalBookieEnsemblePlacementPolicy; +import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; +import org.apache.bookkeeper.client.BookKeeper.DigestType; +import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.net.BookieId; +import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.apache.bookkeeper.test.TestStatsProvider; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This unit test tests ledger fencing. + * + */ +public class TestSpeculativeBatchRead extends BookKeeperClusterTestCase { + private static final Logger LOG = LoggerFactory.getLogger(TestSpeculativeBatchRead.class); + + private final DigestType digestType; + byte[] passwd = "specPW".getBytes(); + + public TestSpeculativeBatchRead() { + super(10); + this.digestType = DigestType.CRC32; + } + + long getLedgerToRead(int ensemble, int quorum) throws Exception { + byte[] data = "Data for test".getBytes(); + LedgerHandle l = bkc.createLedger(ensemble, quorum, digestType, passwd); + for (int i = 0; i < 10; i++) { + l.addEntry(data); + } + l.close(); + + return l.getId(); + } + + @SuppressWarnings("deprecation") + BookKeeperTestClient createClient(int specTimeout) throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(specTimeout) + .setReadTimeout(30000) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + return new BookKeeperTestClient(conf, new TestStatsProvider()); + } + + class LatchCallback implements ReadCallback { + CountDownLatch l = new CountDownLatch(1); + boolean success = false; + long startMillis = System.currentTimeMillis(); + long endMillis = Long.MAX_VALUE; + + public void readComplete(int rc, + LedgerHandle lh, + Enumeration seq, + Object ctx) { + endMillis = System.currentTimeMillis(); + if (LOG.isDebugEnabled()) { + LOG.debug("Got response {} {}", rc, getDuration()); + } + success = rc == BKException.Code.OK; + l.countDown(); + } + + long getDuration() { + return endMillis - startMillis; + } + + void expectSuccess(int milliseconds) throws Exception { + boolean await = l.await(milliseconds, TimeUnit.MILLISECONDS); + System.out.println(await); + } + + void expectFail(int milliseconds) throws Exception { + assertTrue(l.await(milliseconds, TimeUnit.MILLISECONDS)); + assertFalse(success); + } + + void expectTimeout(int milliseconds) throws Exception { + assertFalse(l.await(milliseconds, TimeUnit.MILLISECONDS)); + } + } + + /** + * Test basic speculative functionality. + * - Create 2 clients with read timeout disabled, one with spec + * read enabled, the other not. + * - create ledger + * - sleep second bookie in ensemble + * - read first entry, both should find on first bookie. + * - read second bookie, spec client should find on bookie three, + * non spec client should hang. + */ + @Test + public void testSpeculativeRead() throws Exception { + long id = getLedgerToRead(3, 2); + BookKeeperTestClient bknospec = createClient(0); // disabled + BookKeeperTestClient bkspec = createClient(2000); + + LedgerHandle lnospec = bknospec.openLedger(id, digestType, passwd); + LedgerHandle lspec = bkspec.openLedger(id, digestType, passwd); + + // sleep second bookie + CountDownLatch sleepLatch = new CountDownLatch(1); + BookieId second = lnospec.getLedgerMetadata().getAllEnsembles().get(0L).get(1); + sleepBookie(second, sleepLatch); + + try { + // read first entry, both go to first bookie, should be fine + LatchCallback nospeccb = new LatchCallback(); + LatchCallback speccb = new LatchCallback(); + lnospec.asyncBatchReadEntries(0, 1, 1024, nospeccb, null); + lspec.asyncBatchReadEntries(0, 1, 1024, speccb, null); + nospeccb.expectSuccess(2000); + speccb.expectSuccess(2000); + + // read second entry, both look for second book, spec read client + // tries third bookie, nonspec client hangs as read timeout is very long. + nospeccb = new LatchCallback(); + speccb = new LatchCallback(); + lnospec.asyncReadEntries(1, 1, nospeccb, null); + lspec.asyncReadEntries(1, 1, speccb, null); + speccb.expectSuccess(4000); + nospeccb.expectTimeout(4000); + // Check that the second bookie is registered as slow at entryId 1 + RackawareEnsemblePlacementPolicy rep = (RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy(); + assertTrue(rep.slowBookies.asMap().size() == 1); + + assertTrue( + "Stats should not reflect speculative reads if disabled", + bknospec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() == 0); + assertTrue( + "Stats should reflect speculative reads", + bkspec.getTestStatsProvider() + .getCounter(CLIENT_SCOPE + "." + SPECULATIVE_READ_COUNT).get() > 0); + } finally { + sleepLatch.countDown(); + lspec.close(); + lnospec.close(); + bkspec.close(); + bknospec.close(); + } + } + + /** + * Test that if more than one replica is down, we can still read, as long as the quorum + * size is larger than the number of down replicas. + */ + @Test + public void testSpeculativeReadMultipleReplicasDown() throws Exception { + long id = getLedgerToRead(5, 5); + int timeout = 5000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookie 1, 2 & 4 + CountDownLatch sleepLatch = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2), sleepLatch); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(4), sleepLatch); + + try { + // read first entry, should complete faster than timeout + // as bookie 0 has the entry + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); + latch0.expectSuccess(timeout / 2); + + // second should have to hit two timeouts (bookie 1 & 2) + // bookie 3 has the entry + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); + latch1.expectTimeout(timeout); + latch1.expectSuccess(timeout * 2); + LOG.info("Timeout {} latch1 duration {}", timeout, latch1.getDuration()); + assertTrue("should have taken longer than two timeouts, but less than 3", + latch1.getDuration() >= timeout * 2 + && latch1.getDuration() < timeout * 3); + + // bookies 1 & 2 should be registered as slow bookies because of speculative reads + Set expectedSlowBookies = new HashSet<>(); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1)); + expectedSlowBookies.add(l.getLedgerMetadata().getAllEnsembles().get(0L).get(2)); + assertEquals(((RackawareEnsemblePlacementPolicy) bkspec.getPlacementPolicy()).slowBookies.asMap().keySet(), + expectedSlowBookies); + + // third should not hit timeouts since bookies 1 & 2 are registered as slow + // bookie 3 has the entry + LatchCallback latch2 = new LatchCallback(); + l.asyncBatchReadEntries(2, 1, 1024, latch2, null); + latch2.expectSuccess(timeout); + + // fourth should have no timeout + // bookie 3 has the entry + LatchCallback latch3 = new LatchCallback(); + l.asyncBatchReadEntries(3, 1, 1024, latch3, null); + latch3.expectSuccess(timeout / 2); + + // fifth should hit one timeout, (bookie 4) + // bookie 0 has the entry + LatchCallback latch4 = new LatchCallback(); + l.asyncBatchReadEntries(4, 1, 1024, latch4, null); + latch4.expectTimeout(timeout / 2); + latch4.expectSuccess(timeout); + LOG.info("Timeout {} latch4 duration {}", timeout, latch4.getDuration()); + assertTrue("should have taken longer than one timeout, but less than 2", + latch4.getDuration() >= timeout + && latch4.getDuration() < timeout * 2); + } finally { + sleepLatch.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Test that if after a speculative read is kicked off, the original read completes + * nothing bad happens. + */ + @Test + public void testSpeculativeReadFirstReadCompleteIsOk() throws Exception { + long id = getLedgerToRead(2, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + // sleep bookies + CountDownLatch sleepLatch0 = new CountDownLatch(1); + CountDownLatch sleepLatch1 = new CountDownLatch(1); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(0), sleepLatch0); + sleepBookie(l.getLedgerMetadata().getAllEnsembles().get(0L).get(1), sleepLatch1); + + try { + // read goes to first bookie, spec read timeout occurs, + // goes to second + LatchCallback latch0 = new LatchCallback(); + l.asyncBatchReadEntries(0, 1, 1024, latch0, null); + latch0.expectTimeout(timeout); + + // wake up first bookie + sleepLatch0.countDown(); + latch0.expectSuccess(timeout / 2); + + sleepLatch1.countDown(); + + // check we can read next entry without issue + LatchCallback latch1 = new LatchCallback(); + l.asyncBatchReadEntries(1, 1, 1024, latch1, null); + latch1.expectSuccess(timeout / 2); + } finally { + sleepLatch0.countDown(); + sleepLatch1.countDown(); + l.close(); + bkspec.close(); + } + } + + /** + * Unit test to check if the scheduled speculative task gets cancelled + * on successful read. + */ + @Test + public void testSpeculativeReadScheduledTaskCancel() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + BatchedReadOp op = null; + try { + op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + op.initiate(); + op.future().get(); + } finally { + assertNull("Speculative Read tasks must be null", op.getSpeculativeTask()); + } + } + + /** + * Unit test for the speculative read scheduling method. + */ + @Test + public void testSpeculativeReadScheduling() throws Exception { + long id = getLedgerToRead(3, 2); + int timeout = 1000; + BookKeeper bkspec = createClient(timeout); + + LedgerHandle l = bkspec.openLedger(id, digestType, passwd); + + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BitSet allHosts = new BitSet(ensemble.size()); + for (int i = 0; i < ensemble.size(); i++) { + allHosts.set(i, true); + } + BitSet noHost = new BitSet(ensemble.size()); + BitSet secondHostOnly = new BitSet(ensemble.size()); + secondHostOnly.set(1, true); + BatchedReadOp.LedgerEntryRequest req0 = null, req2 = null, req4 = null; + try { + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + // if we've already heard from all hosts, + // we only send the initial read + req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); + assertTrue("Should have sent to first", + req0.maybeSendSpeculativeRead(allHosts).equals(ensemble.get(0))); + assertNull("Should not have sent another", + req0.maybeSendSpeculativeRead(allHosts)); + + // if we have heard from some hosts, but not one we have sent to + // send again + req2 = op.new SequenceReadRequest(ensemble, l.getId(), 2, 1, 1024); + assertTrue("Should have sent to third", + req2.maybeSendSpeculativeRead(noHost).equals(ensemble.get(2))); + assertTrue("Should have sent to first", + req2.maybeSendSpeculativeRead(secondHostOnly).equals(ensemble.get(0))); + + // if we have heard from some hosts, which includes one we sent to + // do not read again + req4 = op.new SequenceReadRequest(ensemble, l.getId(), 4, 1, 1024); + assertTrue("Should have sent to second", + req4.maybeSendSpeculativeRead(noHost).equals(ensemble.get(1))); + assertNull("Should not have sent another", + req4.maybeSendSpeculativeRead(secondHostOnly)); + } finally { + for (BatchedReadOp.LedgerEntryRequest req + : new BatchedReadOp.LedgerEntryRequest[] { req0, req2, req4 }) { + if (req != null) { + int i = 0; + while (!req.isComplete()) { + if (i++ > 10) { + break; // wait for up to 10 seconds + } + Thread.sleep(1000); + } + assertTrue("Request should be done", req.isComplete()); + } + } + + l.close(); + bkspec.close(); + } + } + + @Test + public void testSequenceReadLocalEnsemble() throws Exception { + ClientConfiguration conf = new ClientConfiguration() + .setSpeculativeReadTimeout(1000) + .setEnsemblePlacementPolicy(LocalBookieEnsemblePlacementPolicy.class) + .setReorderReadSequenceEnabled(true) + .setEnsemblePlacementPolicySlowBookies(true) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + BookKeeper bkspec = new BookKeeperTestClient(conf, new TestStatsProvider()); + LedgerHandle l = bkspec.createLedger(1, 1, digestType, passwd); + List ensemble = l.getLedgerMetadata().getAllEnsembles().get(0L); + BatchedReadOp op = new BatchedReadOp(l, bkspec.getClientCtx(), 0, 5, 5120, false); + BatchedReadOp.LedgerEntryRequest req0 = op.new SequenceReadRequest(ensemble, l.getId(), 0, 1, 1024); + assertNotNull(req0.writeSet); + } +} diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java new file mode 100644 index 00000000000..6252bb71be9 --- /dev/null +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/checksum/CompositeByteBufUnwrapBugReproduceTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.bookkeeper.proto.checksum; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +import com.scurrilous.circe.checksum.IntHash; +import com.scurrilous.circe.checksum.Java8IntHash; +import com.scurrilous.circe.checksum.Java9IntHash; +import com.scurrilous.circe.checksum.JniIntHash; +import com.scurrilous.circe.crc.Sse42Crc32C; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.util.ReferenceCounted; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import org.apache.bookkeeper.proto.BookieProtoEncoding; +import org.apache.bookkeeper.proto.BookieProtocol; +import org.apache.bookkeeper.util.ByteBufList; +import org.apache.bookkeeper.util.ByteBufVisitor; +import org.apache.commons.lang3.RandomUtils; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * This test class was added to reproduce a bug in the checksum calculation when + * the payload is a CompositeByteBuf and this buffer has a reader index state other than 0. + * The reader index state gets lost in the unwrapping process. + * + * There were at least 2 different bugs. One that occured when the + * payload was >= BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD and the other when + * it was < BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD. + * This test covers both useV2Protocol=true and useV2Protocol=false since the bug was triggered differently. + * + * The bug has been fixed and this test is here to make sure it doesn't happen again. + */ +@RunWith(Parameterized.class) +public class CompositeByteBufUnwrapBugReproduceTest { + final byte[] testPayLoad; + final int defaultBufferPrefixLength; + private final boolean useV2Protocol; + + // set to 0 to 3 to run a single scenario for debugging purposes + private static final int RUN_SINGLE_SCENARIO_FOR_DEBUGGING = -1; + + @Parameterized.Parameters + public static Collection testScenarios() { + List scenarios = Arrays.asList(new Object[][] { + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD - 1, false}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, true}, + {BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD, false} + }); + if (RUN_SINGLE_SCENARIO_FOR_DEBUGGING >= 0) { + // pick a single scenario for debugging + scenarios = scenarios.subList(RUN_SINGLE_SCENARIO_FOR_DEBUGGING, 1); + } + return scenarios; + } + + public CompositeByteBufUnwrapBugReproduceTest(int payloadSize, boolean useV2Protocol) { + this.testPayLoad = createTestPayLoad(payloadSize); + this.defaultBufferPrefixLength = payloadSize / 7; + this.useV2Protocol = useV2Protocol; + } + + private static byte[] createTestPayLoad(int payloadSize) { + byte[] payload = new byte[payloadSize]; + for (int i = 0; i < payloadSize; i++) { + payload[i] = (byte) i; + } + return payload; + } + + + /** + * A DigestManager that uses the given IntHash implementation for testing. + */ + static class TestIntHashDigestManager extends DigestManager { + private final IntHash intHash; + + public TestIntHashDigestManager(IntHash intHash, long ledgerId, boolean useV2Protocol, + ByteBufAllocator allocator) { + super(ledgerId, useV2Protocol, allocator); + this.intHash = intHash; + } + + @Override + int getMacCodeLength() { + return 4; + } + + @Override + boolean isInt32Digest() { + return true; + } + + @Override + void populateValueAndReset(int digest, ByteBuf buf) { + buf.writeInt(digest); + } + + @Override + int internalUpdate(int digest, ByteBuf data, int offset, int len) { + return intHash.resume(digest, data, offset, len); + } + + @Override + int internalUpdate(int digest, byte[] buffer, int offset, int len) { + return intHash.resume(digest, buffer, offset, len); + } + + @Override + boolean acceptsMemoryAddressBuffer() { + return intHash.acceptsMemoryAddressBuffer(); + } + } + + @Test + public void shouldCalculateChecksumForCompositeBuffer() { + ByteBuf testPayload = Unpooled.wrappedBuffer(testPayLoad); + byte[] referenceOutput = computeDigestAndPackageForSending(new Java8IntHash(), testPayload.retainedDuplicate()); + assertDigestAndPackageMatchesReference(new Java8IntHash(), testPayload, referenceOutput); + assertDigestAndPackageMatchesReference(new Java9IntHash(), testPayload, referenceOutput); + if (Sse42Crc32C.isSupported()) { + assertDigestAndPackageMatchesReference(new JniIntHash(), testPayload, referenceOutput); + } + testPayload.release(); + } + + private void assertDigestAndPackageMatchesReference(IntHash intHash, ByteBuf payload, byte[] referenceOutput) { + assertDigestAndPackageScenario(intHash, payload.retainedDuplicate(), referenceOutput, testPayLoad, + "plain payload, no wrapping"); + + ByteBuf payload2 = wrapWithPrefixAndCompositeByteBufWithReaderIndexState(payload.retainedDuplicate(), + defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload2, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf with readerIndex state"); + + ByteBuf payload3 = wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate( + payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload3, referenceOutput, testPayLoad, + "payload with prefix wrapped in 2 layers of CompositeByteBuf with readerIndex state in the outer " + + "composite. In addition, the outer composite is duplicated twice."); + + ByteBuf payload4 = wrapInCompositeByteBufAndSlice(payload.retainedDuplicate(), defaultBufferPrefixLength); + assertDigestAndPackageScenario(intHash, payload4, referenceOutput, testPayLoad, + "payload with prefix wrapped in CompositeByteBuf and sliced"); + } + + private void assertDigestAndPackageScenario(IntHash intHash, ByteBuf payload, byte[] referenceOutput, + byte[] testPayLoadArray, + String scenario) { + // this validates that the readable bytes in the payload match the TEST_PAYLOAD content + assertArrayEquals(testPayLoadArray, ByteBufUtil.getBytes(payload.duplicate()), + "input is invalid for scenario '" + scenario + "'"); + + ByteBuf visitedCopy = Unpooled.buffer(payload.readableBytes()); + ByteBufVisitor.visitBuffers(payload, payload.readerIndex(), payload.readableBytes(), + new ByteBufVisitor.ByteBufVisitorCallback() { + @Override + public void visitBuffer(Void context, ByteBuf visitBuffer, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitBuffer, visitIndex, visitLength); + } + + @Override + public void visitArray(Void context, byte[] visitArray, int visitIndex, int visitLength) { + visitedCopy.writeBytes(visitArray, visitIndex, visitLength); + } + }, null); + + assertArrayEquals(ByteBufUtil.getBytes(visitedCopy), testPayLoadArray, + "visited copy is invalid for scenario '" + scenario + "'. Bug in ByteBufVisitor?"); + + // compute the digest and package + byte[] output = computeDigestAndPackageForSending(intHash, payload.duplicate()); + if (referenceOutput == null) { + referenceOutput = + computeDigestAndPackageForSending(new Java8IntHash(), Unpooled.wrappedBuffer(testPayLoadArray)); + } + // this validates that the output matches the reference output + assertArrayEquals(referenceOutput, output, "output is invalid for scenario '" + scenario + "'"); + } + + private byte[] computeDigestAndPackageForSending(IntHash intHash, ByteBuf data) { + DigestManager digestManager = new TestIntHashDigestManager(intHash, 1, useV2Protocol, ByteBufAllocator.DEFAULT); + ReferenceCounted packagedBuffer = + digestManager.computeDigestAndPackageForSending(1, 0, data.readableBytes(), data, + MacDigestManager.EMPTY_LEDGER_KEY, BookieProtocol.FLAG_NONE); + return packagedBufferToBytes(packagedBuffer); + } + + ByteBuf wrapWithPrefixAndCompositeByteBufWithReaderIndexState(ByteBuf payload, int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, prefixedPayload); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite; + } + + ByteBuf wrapWithPrefixAndMultipleCompositeByteBufWithReaderIndexStateAndMultipleLayersOfDuplicate(ByteBuf payload, + int bufferPrefixLength) { + // create a new buffer with a prefix and the actual payload + ByteBuf prefixedPayload = ByteBufAllocator.DEFAULT.buffer(bufferPrefixLength + payload.readableBytes()); + prefixedPayload.writeBytes(RandomUtils.nextBytes(bufferPrefixLength)); + prefixedPayload.writeBytes(payload); + + CompositeByteBuf innerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + innerComposite.addComponent(true, prefixedPayload); + innerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // wrap the buffer in a composite buffer + CompositeByteBuf outerComposite = ByteBufAllocator.DEFAULT.compositeBuffer(); + outerComposite.addComponent(true, innerComposite); + outerComposite.addComponent(true, Unpooled.EMPTY_BUFFER); + + // set reader index state. this is the state that gets lost in the unwrapping process + outerComposite.readerIndex(bufferPrefixLength); + + return outerComposite.duplicate().duplicate(); + } + + ByteBuf wrapInCompositeByteBufAndSlice(ByteBuf payload, int bufferPrefixLength) { + // create a composite buffer + CompositeByteBuf compositeWithPrefix = ByteBufAllocator.DEFAULT.compositeBuffer(); + compositeWithPrefix.addComponent(true, Unpooled.wrappedBuffer(RandomUtils.nextBytes(bufferPrefixLength))); + compositeWithPrefix.addComponent(true, payload); + + // return a slice of the composite buffer so that it returns the payload + return compositeWithPrefix.slice(bufferPrefixLength, payload.readableBytes()); + } + + private static byte[] packagedBufferToBytes(ReferenceCounted packagedBuffer) { + byte[] output; + if (packagedBuffer instanceof ByteBufList) { + ByteBufList bufList = (ByteBufList) packagedBuffer; + output = new byte[bufList.readableBytes()]; + bufList.getBytes(output); + for (int i = 0; i < bufList.size(); i++) { + bufList.getBuffer(i).release(); + } + } else if (packagedBuffer instanceof ByteBuf) { + output = ByteBufUtil.getBytes((ByteBuf) packagedBuffer); + packagedBuffer.release(); + } else { + throw new RuntimeException("Unexpected type: " + packagedBuffer.getClass()); + } + return output; + } +} \ No newline at end of file diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java index 5b96c52a0be..60f89159a04 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/BookieClientTest.java @@ -359,7 +359,7 @@ public void getBookieInfoComplete(int rc, BookieInfo bInfo, Object ctx) { } @Test - public void testBatchedRead() throws Exception { + public void testBatchRead() throws Exception { ClientConfiguration conf = new ClientConfiguration(); conf.setUseV2WireProtocol(true); BookieClient bc = new BookieClientImpl(conf, eventLoopGroup, diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java index ac7aca77226..88de17d0a9d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/ByteBufListTest.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; @@ -87,39 +86,6 @@ public void testDouble() throws Exception { assertEquals(b2.refCnt(), 0); } - @Test - public void testComposite() throws Exception { - ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b1.writerIndex(b1.capacity()); - ByteBuf b2 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); - b2.writerIndex(b2.capacity()); - - CompositeByteBuf composite = PooledByteBufAllocator.DEFAULT.compositeBuffer(); - composite.addComponent(b1); - composite.addComponent(b2); - - ByteBufList buf = ByteBufList.get(composite); - - // composite is unwrapped into two parts - assertEquals(2, buf.size()); - // and released - assertEquals(composite.refCnt(), 0); - - assertEquals(256, buf.readableBytes()); - assertEquals(b1, buf.getBuffer(0)); - assertEquals(b2, buf.getBuffer(1)); - - assertEquals(buf.refCnt(), 1); - assertEquals(b1.refCnt(), 1); - assertEquals(b2.refCnt(), 1); - - buf.release(); - - assertEquals(buf.refCnt(), 0); - assertEquals(b1.refCnt(), 0); - assertEquals(b2.refCnt(), 0); - } - @Test public void testClone() throws Exception { ByteBuf b1 = PooledByteBufAllocator.DEFAULT.heapBuffer(128, 128); diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java index 65a77b1492b..d90f8b7ea5d 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Crc32cIntChecksum.java @@ -71,12 +71,30 @@ public static int resumeChecksum(int previousChecksum, ByteBuf payload) { /** * Computes incremental checksum with input previousChecksum and input payload * - * @param previousChecksum : previously computed checksum - * @param payload - * @return + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum */ public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) { return CRC32C_HASH.resume(previousChecksum, payload, offset, len); } + /** + * Computes incremental checksum with input previousChecksum and input payload + * + * @param previousChecksum the previously computed checksum + * @param payload the data for which the checksum is to be computed + * @param offset the starting position in the payload + * @param len the number of bytes to include in the checksum computation + * @return the updated checksum + */ + public static int resumeChecksum(int previousChecksum, byte[] payload, int offset, int len) { + return CRC32C_HASH.resume(previousChecksum, payload, offset, len); + } + + public static boolean acceptsMemoryAddressBuffer() { + return CRC32C_HASH.acceptsMemoryAddressBuffer(); + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java index e8922e3a16b..be98ae19be1 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/IntHash.java @@ -28,4 +28,8 @@ public interface IntHash { int resume(int current, ByteBuf buffer); int resume(int current, ByteBuf buffer, int offset, int len); + + int resume(int current, byte[] buffer, int offset, int len); + + boolean acceptsMemoryAddressBuffer(); } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java index fd548bc4de4..2825c610b11 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java8IntHash.java @@ -53,4 +53,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return false; + } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java index 31af153666e..2e779a92766 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/Java9IntHash.java @@ -19,7 +19,6 @@ package com.scurrilous.circe.checksum; import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; import io.netty.util.concurrent.FastThreadLocal; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -77,7 +76,7 @@ public int calculate(ByteBuf buffer, int offset, int len) { return resume(0, buffer, offset, len); } - private int resume(int current, long address, int offset, int length) { + private int updateDirectByteBuffer(int current, long address, int offset, int length) { try { return (int) UPDATE_DIRECT_BYTEBUFFER.invoke(null, current, address, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -85,7 +84,20 @@ private int resume(int current, long address, int offset, int length) { } } - private int resume(int current, byte[] array, int offset, int length) { + @Override + public int resume(int current, byte[] array, int offset, int length) { + // the bit-wise complementing of the input and output is explained in the resume method below + current = ~current; + current = updateBytes(current, array, offset, length); + return ~current; + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } + + private static int updateBytes(int current, byte[] array, int offset, int length) { try { return (int) UPDATE_BYTES.invoke(null, current, array, offset, offset + length); } catch (IllegalAccessException | InvocationTargetException e) { @@ -100,33 +112,37 @@ public int resume(int current, ByteBuf buffer) { @Override public int resume(int current, ByteBuf buffer, int offset, int len) { - int negCrc = ~current; + // The input value is bit-wise complemented for two reasons: + // 1. The CRC32C algorithm is designed to start with a seed value where all bits are set to 1 (0xffffffff). + // When 0 is initially passed in, ~0 results in the correct initial value (0xffffffff). + // 2. The CRC32C algorithm complements the final value as the last step. This method will always complement + // the return value. Therefore, when the algorithm is used iteratively, it is necessary to complement + // the input value to continue calculations. + // This allows the algorithm to be used incrementally without needing separate initialization and + // finalization steps. + current = ~current; if (buffer.hasMemoryAddress()) { - negCrc = resume(negCrc, buffer.memoryAddress(), offset, len); + current = updateDirectByteBuffer(current, buffer.memoryAddress(), offset, len); } else if (buffer.hasArray()) { int arrayOffset = buffer.arrayOffset() + offset; - negCrc = resume(negCrc, buffer.array(), arrayOffset, len); - } else if (buffer instanceof CompositeByteBuf) { - CompositeByteBuf compositeByteBuf = (CompositeByteBuf) buffer; - int loopedCurrent = current; - for (int i = 0; i < compositeByteBuf.numComponents(); i ++) { - loopedCurrent = resume(loopedCurrent, compositeByteBuf.component(i)); - } - return loopedCurrent; + current = updateBytes(current, buffer.array(), arrayOffset, len); } else { byte[] b = TL_BUFFER.get(); int toRead = len; int loopOffset = offset; while (toRead > 0) { int length = Math.min(toRead, b.length); - buffer.slice(loopOffset, length).readBytes(b, 0, length); - negCrc = resume(negCrc, b, 0, length); + buffer.getBytes(loopOffset, b, 0, length); + current = updateBytes(current, b, 0, length); toRead -= length; loopOffset += length; } } - return ~negCrc; + // The current value is complemented to align with the finalization step of the CRC32C algorithm. + // If there is a subsequent resume step, the value will be complemented again to initiate the next step + // as described in the comments in the beginning of this method. + return ~current; } } diff --git a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java index e8e87bf6b1a..dc5bed0fc1c 100644 --- a/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java +++ b/circe-checksum/src/main/java/com/scurrilous/circe/checksum/JniIntHash.java @@ -51,4 +51,14 @@ public int resume(int current, ByteBuf buffer, int offset, int len) { return hash.resume(current, buffer.slice(offset, len).nioBuffer()); } } + + @Override + public int resume(int current, byte[] buffer, int offset, int len) { + return hash.resume(current, buffer, offset, len); + } + + @Override + public boolean acceptsMemoryAddressBuffer() { + return true; + } } diff --git a/pom.xml b/pom.xml index 5692c0d2a3d..9aa62e2bea0 100644 --- a/pom.xml +++ b/pom.xml @@ -143,7 +143,7 @@ 9.4.53.v20231009 1.37 2.8.2 - 1.14.3 + 1.15.3 4.13.2 1.3 5.10.1