Skip to content

HADOOP-19609. ABFS: Apache Client Connection Pool Relook #7817

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: trunk
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,20 @@ public class AbfsConfiguration{
FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES, DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;

/**
* Max idle TTL configuration for connection given in
* {@value org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys#FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL}
* with default of
* {@value org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations#DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME}
*/
@LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL,
DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME)
private long maxApacheHttpClientConnectionIdleTime;
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE, DefaultValue = DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS,
MinValue = MIN_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS, MaxValue = MAX_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS)
private int maxApacheHttpClientCacheConnections;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CACHE_WARMUP_CONNECTION_COUNT, DefaultValue = DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT,
MinValue = 0, MaxValue = MIN_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS)
private int cacheWarmupConnections;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_CACHE_REFRESH_CONNECTION_COUNT, DefaultValue = DEFAULT_APACHE_CACHE_REFRESH_CONNECTION_COUNT,
MinValue = 0, MaxValue = MIN_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS)
private int cacheRefreshConnections;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID,
DefaultValue = DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID)
Expand Down Expand Up @@ -1115,11 +1120,16 @@ public int getMaxApacheHttpClientIoExceptionsRetries() {
return maxApacheHttpClientIoExceptionsRetries;
}

/**
* @return {@link #maxApacheHttpClientConnectionIdleTime}.
*/
public long getMaxApacheHttpClientConnectionIdleTime() {
return maxApacheHttpClientConnectionIdleTime;
public int getMaxApacheHttpClientCacheConnections() {
return maxApacheHttpClientCacheConnections;
}

public int getCacheWarmupConnections() {
return cacheWarmupConnections;
}

public int getCacheRefreshConnections() {
return cacheRefreshConnections;
}

public boolean getIsClientTransactionIdEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -819,8 +819,7 @@ public synchronized void close() throws IOException {
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
}
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager,
getAbfsClient());
IOUtils.cleanupWithLogger(LOG, abfsStore, delegationTokenManager);
this.isClosed = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Closing Abfs: {}", toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ public void close() throws IOException {
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
IOUtils.cleanupWithLogger(LOG, getClient());
IOUtils.cleanupWithLogger(LOG, getClientHandler().getDfsClient(),
getClientHandler().getBlobClient());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,16 @@ public static String containerProperty(String property, String fsName, String ac
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = "fs.azure.apache.http.client.max.io.exception.retries";
/**Maximum ApacheHttpClient-connection cache size at filesystem level: {@value}*/
public static final String FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_CONNECTION_SIZE = "fs.azure.apache.http.client.max.cache.connection.size";
/**Maximum idle time for a ApacheHttpClient-connection: {@value}*/
public static final String FS_AZURE_APACHE_HTTP_CLIENT_IDLE_CONNECTION_TTL = "fs.azure.apache.http.client.idle.connection.ttl";
/**
* Defines number of connections to establish during warmup phase
* of ApacheHttpClient connection cache: {@value}
*/
public static final String FS_AZURE_CACHE_WARMUP_CONNECTION_COUNT = "fs.azure.apache.http.client.cache.warmup.connection.count";
/**
* Defines number of connections to establish during refresh phase
* of ApacheHttpClient connection cache: {@value}
*/
public static final String FS_AZURE_CACHE_REFRESH_CONNECTION_COUNT = "fs.azure.apache.http.client.cache.refresh.connection.count";
/**
* Blob copy API is an async API, this configuration defines polling duration
* for checking copy status: {@value}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,19 @@ public final class FileSystemConfigurations {
public static final long THOUSAND = 1000L;

public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
= HttpOperationType.JDK_HTTP_URL_CONNECTION;
= HttpOperationType.APACHE_HTTP_CLIENT;

public static final int DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES = 3;

public static final long DEFAULT_HTTP_CLIENT_CONN_MAX_IDLE_TIME = 5_000L;
public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 10;

public static final int DEFAULT_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5;
public static final int MIN_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 5;

public static final int MAX_HTTP_CLIENT_CONN_MAX_CACHED_CONNECTIONS = 20;

public static final int DEFAULT_APACHE_CACHE_WARMUP_CONNECTION_COUNT = 5;

public static final int DEFAULT_APACHE_CACHE_REFRESH_CONNECTION_COUNT = 5;

public static final long DEFAULT_AZURE_BLOB_COPY_PROGRESS_WAIT_MILLIS = 1_000L;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.URL;

import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
Expand Down Expand Up @@ -71,15 +73,18 @@ static boolean usable() {
}

AbfsApacheHttpClient(DelegatingSSLSocketFactory delegatingSSLSocketFactory,
final int readTimeout, final KeepAliveCache keepAliveCache) {
final AbfsConfiguration abfsConfiguration, final KeepAliveCache keepAliveCache,
URL baseUrl) {
final AbfsConnectionManager connMgr = new AbfsConnectionManager(
createSocketFactoryRegistry(
new SSLConnectionSocketFactory(delegatingSSLSocketFactory,
getDefaultHostnameVerifier())),
new AbfsHttpClientConnectionFactory(), keepAliveCache);
new AbfsHttpClientConnectionFactory(), keepAliveCache,
abfsConfiguration, baseUrl);
final HttpClientBuilder builder = HttpClients.custom();
builder.setConnectionManager(connMgr)
.setRequestExecutor(new AbfsManagedHttpRequestExecutor(readTimeout))
.setRequestExecutor(
new AbfsManagedHttpRequestExecutor(abfsConfiguration.getHttpReadTimeout()))
.disableContentCompression()
.disableRedirectHandling()
.disableAutomaticRetries()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,7 @@ private AbfsClient(final URL baseUrl,

abfsApacheHttpClient = new AbfsApacheHttpClient(
DelegatingSSLSocketFactory.getDefaultFactory(),
abfsConfiguration.getHttpReadTimeout(),
keepAliveCache);
abfsConfiguration, keepAliveCache, baseUrl);
}

this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
Expand Down
Loading