Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,41 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;

@BooleanConfigurationValidatorAnnotation(
ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2,
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
private boolean isReadAheadV2Enabled;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
private int minReadAheadV2ThreadPoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE)
private int maxReadAheadV2ThreadPoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE)
private int minReadAheadV2BufferPoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
private int maxReadAheadV2BufferPoolSize;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
private int readAheadExecutorServiceTTLMillis;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
private int readAheadV2CachedBufferTTLMillis;

@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
Expand Down Expand Up @@ -1368,6 +1403,54 @@ public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}

public int getMinReadAheadV2ThreadPoolSize() {
if (minReadAheadV2ThreadPoolSize <= 0) {
// If the minReadAheadV2ThreadPoolSize is not set, use the default value
return 2 * Runtime.getRuntime().availableProcessors();
}
return minReadAheadV2ThreadPoolSize;
}

public int getMaxReadAheadV2ThreadPoolSize() {
if (maxReadAheadV2ThreadPoolSize <= 0) {
// If the maxReadAheadV2ThreadPoolSize is not set, use the default value
return 4 * Runtime.getRuntime().availableProcessors();
}
return maxReadAheadV2ThreadPoolSize;
}

public int getMinReadAheadV2BufferPoolSize() {
if (minReadAheadV2BufferPoolSize <= 0) {
// If the minReadAheadV2BufferPoolSize is not set, use the default value
return 2 * Runtime.getRuntime().availableProcessors();
}
return minReadAheadV2BufferPoolSize;
}

public int getMaxReadAheadV2BufferPoolSize() {
if (maxReadAheadV2BufferPoolSize <= 0) {
// If the maxReadAheadV2BufferPoolSize is not set, use the default value
return 4 * Runtime.getRuntime().availableProcessors();
}
return maxReadAheadV2BufferPoolSize;
}

public int getReadAheadExecutorServiceTTLInMillis() {
return readAheadExecutorServiceTTLMillis;
}

public int getReadAheadV2CachedBufferTTLMillis() {
return readAheadV2CachedBufferTTLMillis;
}

/**
* Checks if the read-ahead v2 feature is enabled by user.
* @return true if read-ahead v2 is enabled, false otherwise.
*/
public boolean isReadAheadV2Enabled() {
return this.isReadAheadV2Enabled;
}

@VisibleForTesting
void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,46 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";

/**
* Enable or disable readahead buffer in AbfsInputStream.
* Enable or disable readahead V1 in AbfsInputStream.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
/**
* Enable or disable readahead V2 in AbfsInputStream. This will work independent of V1.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2 = "fs.azure.enable.readahead.v2";

/**
* Minimum number of prefetch threads in the thread pool for readahead V2.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE = "fs.azure.readahead.v2.min.thread.pool.size";
/**
* Maximum number of prefetch threads in the thread pool for readahead V2.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE = "fs.azure.readahead.v2.max.thread.pool.size";
/**
* Minimum size of the buffer pool for caching prefetched data for readahead V2.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.min.buffer.pool.size";
/**
* Maximum size of the buffer pool for caching prefetched data for readahead V2.
* {@value }
*/
public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = "fs.azure.readahead.v2.max.buffer.pool.size";

/**
* TTL in milliseconds for the idle threads in executor service used by read ahead v2.
*/
public static final String FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = "fs.azure.readahead.v2.executor.service.ttl.millis";

/**
* TTL in milliseconds for the cached buffers in buffer pool used by read ahead v2.
*/
public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = "fs.azure.readahead.v2.cached.buffer.ttl.millis";

/** Setting this true will make the driver use it's own RemoteIterator implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ public final class FileSystemConfigurations {
public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;

public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 3_000;
public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 6_000;

public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private final String eTag; // eTag of the path when InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
private final boolean readAheadV2Enabled; // whether enable readAhead V2;
private final String inputStreamId;
private final boolean alwaysReadBufferSize;
/*
Expand Down Expand Up @@ -130,6 +131,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,

/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
private ReadBufferManager readBufferManager;

public AbfsInputStream(
final AbfsClient client,
Expand All @@ -150,6 +152,7 @@ public AbfsInputStream(
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
this.readAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled();
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.bufferedPreadDisabled = abfsInputStreamContext
Expand All @@ -173,9 +176,19 @@ public AbfsInputStream(
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();

// Propagate the config values to ReadBufferManager so that the first instance
// to initialize can set the readAheadBlockSize
ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
/*
* Initialize the ReadBufferManager based on whether readAheadV2 is enabled or not.
* Precedence is given to ReadBufferManagerV2.
* If none of the V1 and V2 are enabled, then no read ahead will be done.
*/
if (readAheadV2Enabled) {
ReadBufferManagerV2.setReadBufferManagerConfigs(
readAheadBlockSize, client.getAbfsConfiguration());
readBufferManager = ReadBufferManagerV2.getBufferManager();
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be under else if (readAheadEnabled) instead of else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always had RBM initlialised today.
Wanted to retain it to avoid any unexplored usage through NPE.

ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
readBufferManager = ReadBufferManagerV1.getBufferManager();
}
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
Expand Down Expand Up @@ -491,7 +504,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){

private int readInternal(final long position, final byte[] b, final int offset, final int length,
final boolean bypassReadAhead) throws IOException {
if (readAheadEnabled && !bypassReadAhead) {
if (isReadAheadEnabled() && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have non-zero buffer offsets");
Expand All @@ -510,7 +523,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
Expand All @@ -519,7 +532,7 @@ private int readInternal(final long position, final byte[] b, final int offset,
}

// try reading from buffers first
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
receivedBytes = readBufferManager.getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
Expand Down Expand Up @@ -720,7 +733,9 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
if (readBufferManager != null) {
readBufferManager.purgeBuffersForStream(this);
}
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
contextEncryptionAdapter.destroy();
Expand Down Expand Up @@ -773,9 +788,14 @@ byte[] getBuffer() {
return buffer;
}

/**
* Checks if any version of read ahead is enabled.
* If both are disabled, then skip read ahead logic.
* @return true if read ahead is enabled, false otherwise.
*/
@VisibleForTesting
public boolean isReadAheadEnabled() {
return readAheadEnabled;
return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != null;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean isReadAheadEnabled = true;

private boolean isReadAheadV2Enabled;

private boolean alwaysReadBufferSize;

private int readAheadBlockSize;
Expand Down Expand Up @@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled(
return this;
}

public AbfsInputStreamContext isReadAheadV2Enabled(
final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
return this;
}

public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
Expand Down Expand Up @@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}

public boolean isReadAheadV2Enabled() {
return isReadAheadV2Enabled;
}

public int getReadAheadRange() {
return readAheadRange;
}
Expand Down
Loading