Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.util.DirectBufferPool;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,13 +36,21 @@ public final class BufferUtils {

private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = {};

/** Pool for reusing direct {@link ByteBuffer}s allocated by {@link #assignByteBuffers}. */
public static final DirectBufferPool BUFFER_POOL = new DirectBufferPool();

/** Utility classes should not be constructed. **/
private BufferUtils() {

}

/**
* Assign an array of ByteBuffers.
* Allocate an array of direct {@link ByteBuffer}s drawn from
* {@link #BUFFER_POOL}, sized to hold {@code totalLen} bytes split into
* slices of at most {@code bufferCapacity} bytes each. Callers are
* responsible for returning each buffer via
* {@link DirectBufferPool#returnBuffer} when finished.
*
* @param totalLen total length of all ByteBuffers
* @param bufferCapacity max capacity of each ByteBuffer
*/
Expand All @@ -58,13 +67,15 @@ public static ByteBuffer[] assignByteBuffers(long totalLen,
long allocatedLen = 0;
// For each ByteBuffer (except the last) allocate bufferLen of capacity
for (int i = 0; i < numBuffers - 1; i++) {
dataBuffers[i] = ByteBuffer.allocate(bufferCapacity);
dataBuffers[i] = BUFFER_POOL.getBuffer(bufferCapacity);
dataBuffers[i].limit(bufferCapacity);
allocatedLen += bufferCapacity;
}
// For the last ByteBuffer, allocate as much space as is needed to fit
// remaining bytes
dataBuffers[numBuffers - 1] = ByteBuffer.allocate(
Math.toIntExact(totalLen - allocatedLen));
final int lastLen = Math.toIntExact(totalLen - allocatedLen);
dataBuffers[numBuffers - 1] = BUFFER_POOL.getBuffer(lastLen);
dataBuffers[numBuffers - 1].limit(lastLen);
return dataBuffers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,18 +202,25 @@ private static long writeDataToChannel(FileChannel channel, ChunkBuffer data,
@SuppressWarnings("checkstyle:parameternumber")
public static ChunkBuffer readData(long len, int bufferCapacity,
File file, long off, HddsVolume volume, int readMappedBufferThreshold, boolean mmapEnabled,
MappedBufferManager mappedBufferManager) throws StorageContainerException {
MappedBufferManager mappedBufferManager, DispatcherContext dispatcherContext)
throws StorageContainerException {
if (mmapEnabled && len > readMappedBufferThreshold && bufferCapacity > readMappedBufferThreshold) {
return readData(file, bufferCapacity, off, len, volume, mappedBufferManager);
return readData(file, bufferCapacity, off, len, volume, mappedBufferManager, dispatcherContext);
} else if (len == 0) {
return ChunkBuffer.wrap(Collections.emptyList());
} else {
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len, bufferCapacity);
readData(file, off, len, c -> c.position(off).read(buffers), volume);
Arrays.stream(buffers).forEach(ByteBuffer::flip);
if (dispatcherContext != null && dispatcherContext.isReleaseSupported()) {
dispatcherContext.setReleaseMethod(() -> {
for (ByteBuffer buf : buffers) {
BufferUtils.BUFFER_POOL.returnBuffer(buf);
}
});
}
return ChunkBuffer.wrap(Arrays.asList(buffers));
}

final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len,
bufferCapacity);
readData(file, off, len, c -> c.position(off).read(buffers), volume);
Arrays.stream(buffers).forEach(ByteBuffer::flip);
return ChunkBuffer.wrap(Arrays.asList(buffers));
}

private static void readData(File file, long offset, long len,
Expand Down Expand Up @@ -253,16 +260,23 @@ private static void readData(File file, long offset, long len,
* @return a list of {@link MappedByteBuffer} containing the data.
*/
private static ChunkBuffer readData(File file, int chunkSize,
long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager)
long offset, long length, HddsVolume volume, MappedBufferManager mappedBufferManager,
DispatcherContext dispatcherContext)
throws StorageContainerException {

final int bufferNum = Math.toIntExact((length - 1) / chunkSize) + 1;
if (!mappedBufferManager.getQuota(bufferNum)) {
// proceed with normal buffer
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length,
chunkSize);
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(length, chunkSize);
readData(file, offset, length, c -> c.position(offset).read(buffers), volume);
Arrays.stream(buffers).forEach(ByteBuffer::flip);
if (dispatcherContext != null && dispatcherContext.isReleaseSupported()) {
dispatcherContext.setReleaseMethod(() -> {
for (ByteBuffer buf : buffers) {
BufferUtils.BUFFER_POOL.returnBuffer(buf);
}
});
}
return ChunkBuffer.wrap(Arrays.asList(buffers));
} else {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,8 @@ public ChunkBufferToByteString readChunk(Container container, BlockID blockID,
return ChunkUtils.readData(chunkFile, bufferCapacity, offset, len, volume, dispatcherContext);
}
return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume,
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager,
dispatcherContext);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ public ChunkBufferToByteString readChunk(Container container, BlockID blockID,
return ChunkUtils.readData(file, bufferCapacity, offset, len, volume, dispatcherContext);
}
return ChunkUtils.readData(len, bufferCapacity, file, offset, volume,
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager);
readMappedBufferThreshold, readMappedBufferMaxCount > 0, mappedBufferManager,
dispatcherContext);
}
} catch (StorageContainerException ex) {
//UNABLE TO FIND chunk is not a problem as we will try with the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ static ChunkBuffer readData(File file, long off, long len)
throws StorageContainerException {
LOG.info("off={}, len={}", off, len);
return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null,
MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER);
MAPPED_BUFFER_THRESHOLD, true, MAPPED_BUFFER_MANAGER, null);
}

@Test
Expand Down Expand Up @@ -169,12 +169,15 @@ void concurrentReadWriteOfSameFile() {
assertEquals(1, buffers.size());
final ByteBuffer readBuffer = buffers.get(0);

int remaining = readBuffer.remaining();
byte[] readArray = new byte[remaining];
readBuffer.get(readArray);
LOG.info("Read data ({}): {}", threadNumber,
new String(readBuffer.array(), UTF_8));
if (!Arrays.equals(array, readBuffer.array())) {
new String(readArray, UTF_8));
if (!Arrays.equals(array, readArray)) {
fail.getAndIncrement();
}
assertEquals(len, readBuffer.remaining());
assertEquals(len, remaining);
} catch (Exception ee) {
LOG.error("Failed to read data ({})", threadNumber, ee);
fail.getAndIncrement();
Expand Down