Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class StreamBlockInputStream extends BlockExtendedInputStream {
private XceiverClientFactory xceiverClientFactory;
private XceiverClientGrpc xceiverClient;

private ByteBuffer buffer;
private ReadBuffer readBuffer;
private long position = 0;
private long requestedLength = 0;
private StreamingReader streamingReader;
Expand Down Expand Up @@ -110,6 +110,8 @@ public StreamBlockInputStream(
this.responseDataSize = config.getStreamReadResponseDataSize();
this.readTimeout = config.getStreamReadTimeout();
this.readTimeoutNanos = readTimeout.toNanos();

LOG.debug("{}: new StreamBlockInputStream", name);
}

@Override
Expand All @@ -130,11 +132,12 @@ public synchronized long getPos() {
@Override
public synchronized int read() throws IOException {
checkOpen();
if (!dataAvailableToRead(1, true)) {
final boolean preRead = true;
if (!dataAvailableToRead(1, preRead)) {
return EOF;
}
int value = buffer.get();
advancePosition(1);
final int value = readBuffer.getByteBuffer().get();
advancePosition(1, preRead);
return value;
}

Expand All @@ -156,12 +159,14 @@ synchronized int readFully(ByteBuffer targetBuf, boolean preRead) throws IOExcep
if (!dataAvailableToRead(targetBuf.remaining(), preRead)) {
break;
}

final ByteBuffer buffer = readBuffer.getByteBuffer();
int toCopy = Math.min(buffer.remaining(), targetBuf.remaining());
ByteBuffer tmpBuf = buffer.duplicate();
tmpBuf.limit(tmpBuf.position() + toCopy);
targetBuf.put(tmpBuf);
buffer.position(tmpBuf.position());
advancePosition(toCopy);
advancePosition(toCopy, preRead);
read += toCopy;
}
return read > 0 ? read : EOF;
Expand All @@ -173,22 +178,23 @@ private synchronized boolean dataAvailableToRead(int length, boolean preRead) th
}
initialize();

if (bufferHasRemaining()) {
return true;
if (!hasRemaining(readBuffer)) {
readBuffer = streamingReader.read(length, preRead);
}
buffer = streamingReader.read(length, preRead);
return bufferHasRemaining();
Preconditions.assertTrue(hasRemaining(readBuffer));
return true;
}

private synchronized void advancePosition(long delta) {
private synchronized void advancePosition(long delta, boolean preRead) {
LOG.debug("{}: advance {} -> {}", getName(streamingReader), position, position + delta);
position += delta;
if (position >= blockLength && streamingReader != null) {
closeStream();
if (preRead && position >= blockLength) {
closeReader("advancePosition");
}
}

private synchronized boolean bufferHasRemaining() {
return buffer != null && buffer.hasRemaining();
private static boolean hasRemaining(ReadBuffer read) {
return read != null && read.getByteBuffer().hasRemaining();
}

@Override
Expand All @@ -208,12 +214,40 @@ public synchronized void seek(long pos) throws IOException {
if (pos == position) {
return;
}
LOG.debug("{}: seek {} -> {}", this, position, pos);
closeStream();
LOG.debug("{}: seek {} -> {}", getName(streamingReader), position, pos);
readBuffer = reuseReadBuffer(readBuffer, pos);
position = pos;
requestedLength = pos;
}

static ReadBuffer reuseReadBuffer(ReadBuffer previous, long blockOffset) {
if (previous != null) {
final ByteBuffer buffer = getByteBuffer(previous.getProto(), blockOffset);
if (buffer != null && buffer.hasRemaining()) {
previous.getByteBuffer().position(buffer.position());
Preconditions.assertSame(buffer.remaining(), previous.getByteBuffer().remaining(), "remaining");
return previous;
}
}
return null;
}

static ByteBuffer getByteBuffer(ReadBlockResponseProto proto, long blockOffset) {
final ByteBuffer buffer = proto.getData().asReadOnlyByteBuffer();
// Adjust buffer position since the server always returns data starting at checksum boundary.
final long protoOffset = proto.getOffset();
if (blockOffset < protoOffset) {
// This can happen after seek, just drop it for now
// TODO: consider to cache the proto, which will be useful when seeking back.
return null;
}
final long offset = blockOffset - protoOffset;
if (offset > 0) {
buffer.position(Math.toIntExact(Math.min(offset, buffer.limit())));
}
return buffer;
}

@Override
// The seekable interface indicates that seekToNewSource should seek to a new source of the data,
// ie a different datanode. This is not supported for now.
Expand All @@ -226,19 +260,15 @@ public synchronized void unbuffer() {
releaseClient();
}

private synchronized void closeStream() {
private synchronized void closeReader(String reason) {
readBuffer = null;
if (streamingReader == null) {
buffer = null;
return;
}

final StreamingReader reader = streamingReader;
streamingReader = null;
buffer = null;

if (LOG.isDebugEnabled()) {
LOG.debug("Closing {}", reader);
}
LOG.debug("{} closeReader for {}", getName(reader), reason);

reader.onCompleted();

Expand Down Expand Up @@ -293,6 +323,7 @@ private synchronized void initialize() throws IOException {
try {
acquireClient();
final StreamingReader reader = new StreamingReader();
LOG.debug("{}: new StreamingReader", getName(reader));
xceiverClient.initStreamRead(blockID, reader);
streamingReader = reader;
} catch (IOException ioe) {
Expand Down Expand Up @@ -342,7 +373,7 @@ private void handleExceptions(IOException cause) throws IOException {

protected synchronized void releaseClient() {
if (xceiverClientFactory != null && xceiverClient != null) {
closeStream();
closeReader("releaseClient");
xceiverClientFactory.releaseClientForReadData(xceiverClient, false);
xceiverClient = null;
}
Expand Down Expand Up @@ -382,6 +413,35 @@ public Duration getReadTimeout() {
return readTimeout;
}

private Object getName(StreamingReader reader) {
return reader != null ? reader : name;
}

static class ReadBuffer {
private final ReadBlockResponseProto proto;
private final ByteBuffer buffer;

ReadBuffer(ReadBlockResponseProto proto, ByteBuffer buffer) {
this.proto = proto;
this.buffer = buffer;
}

ReadBlockResponseProto getProto() {
return proto;
}

ByteBuffer getByteBuffer() {
return buffer;
}

@Override
public String toString() {
return "ReadBuffer: offset=" + proto.getOffset()
+ ", dataSize=" + proto.getData().size()
+ ", buffer=" + buffer;
}
}

/**
* Implementation of a StreamObserver used to received and buffer streaming GRPC reads.
*/
Expand Down Expand Up @@ -427,15 +487,18 @@ ReadBlockResponseProto poll() throws IOException {
}

final long elapsedNanos = System.nanoTime() - startTime;
if (elapsedNanos >= readTimeoutNanos) {
setFailedAndThrow(new TimeoutIOException(
"Timed out waiting for response after " + readTimeout));
if (elapsedNanos >= readTimeoutNanos && !future.isDone()) {
final TimeoutIOException e = new TimeoutIOException(
this + ": Failed to poll a response, timed out " + readTimeout);
if (setFailed(e)) {
throw e;
}
return null;
}
}
}

private ByteBuffer read(int length, boolean preRead) throws IOException {
private ReadBuffer read(int length, boolean preRead) throws IOException {
checkError();
if (future.isDone()) {
return null; // Stream ended
Expand All @@ -444,36 +507,16 @@ private ByteBuffer read(int length, boolean preRead) throws IOException {
readBlock(length, preRead);

while (true) {
final ByteBuffer buf = readFromQueue();
if (buf != null && buf.hasRemaining()) {
return buf;
final ReadBlockResponseProto proto = poll();
final ByteBuffer buffer = getByteBuffer(proto, getPos());
final ReadBuffer read = buffer != null ? new ReadBuffer(proto, buffer) : null;
if (hasRemaining(read)) {
LOG.debug("{}: read(length={}, preRead={}) returns {}", name, length, preRead, read);
return read;
}
}
}

ByteBuffer readFromQueue() throws IOException {
final ReadBlockResponseProto readBlock = poll();
// The server always returns data starting from the last checksum boundary. Therefore if the reader position is
// ahead of the position we received from the server, we need to adjust the buffer position accordingly.
// If the reader position is behind
final ByteString data = readBlock.getData();
final ByteBuffer dataBuffer = data.asReadOnlyByteBuffer();
final long blockOffset = readBlock.getOffset();
final long pos = getPos();
if (pos < blockOffset) {
// This should not happen, and if it does, we have a bug.
setFailedAndThrow(new IllegalStateException(
this + ": out of order, position " + pos + " < block offset " + blockOffset));
}
final long offset = pos - blockOffset;
if (offset > 0) {
dataBuffer.position(Math.toIntExact(Math.min(offset, dataBuffer.limit())));
}
LOG.debug("{}: return response positon {}, length {} (block offset {}, length {})",
name, pos, dataBuffer.remaining(), blockOffset, data.size());
return dataBuffer;
}

private void releaseResources() {
if (semaphoreReleased.compareAndSet(false, true)) {
releaseStreamResources();
Expand Down Expand Up @@ -527,12 +570,6 @@ StreamingReadResponse getResponse() {
return response.get();
}

private <T extends Throwable> void setFailedAndThrow(T throwable) throws T {
if (setFailed(throwable)) {
throw throwable;
}
}

private boolean setFailed(Throwable throwable) {
final boolean completed = future.completeExceptionally(throwable);
if (!completed) {
Expand Down