Skip to content

Commit 0eaf83b

Browse files
authored
Merge pull request ClickHouse#56994 from ClickHouse/revert-56992-revert-56314-s3-aggressive-timeouts
Revert "Revert "s3 adaptive timeouts""
2 parents 7f30fb7 + a7fc8d4 commit 0eaf83b

38 files changed

+431
-202
lines changed

base/poco/Net/src/HTTPServerSession.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ HTTPServerSession::HTTPServerSession(const StreamSocket& socket, HTTPServerParam
2626
_maxKeepAliveRequests(pParams->getMaxKeepAliveRequests())
2727
{
2828
setTimeout(pParams->getTimeout());
29-
this->socket().setReceiveTimeout(pParams->getTimeout());
3029
}
3130

3231

base/poco/Net/src/HTTPSession.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,34 @@ void HTTPSession::setTimeout(const Poco::Timespan& timeout)
9393

9494
void HTTPSession::setTimeout(const Poco::Timespan& connectionTimeout, const Poco::Timespan& sendTimeout, const Poco::Timespan& receiveTimeout)
9595
{
96-
_connectionTimeout = connectionTimeout;
97-
_sendTimeout = sendTimeout;
98-
_receiveTimeout = receiveTimeout;
96+
try
97+
{
98+
_connectionTimeout = connectionTimeout;
99+
100+
if (_sendTimeout.totalMicroseconds() != sendTimeout.totalMicroseconds()) {
101+
_sendTimeout = sendTimeout;
102+
103+
if (connected())
104+
_socket.setSendTimeout(_sendTimeout);
105+
}
106+
107+
if (_receiveTimeout.totalMicroseconds() != receiveTimeout.totalMicroseconds()) {
108+
_receiveTimeout = receiveTimeout;
109+
110+
if (connected())
111+
_socket.setReceiveTimeout(_receiveTimeout);
112+
}
113+
}
114+
catch (NetException &)
115+
{
116+
#ifndef NDEBUG
117+
throw;
118+
#else
119+
// mute exceptions in release
120+
// just in case when changing settings on socket is not allowed
121+
// however it should be OK for timeouts
122+
#endif
123+
}
99124
}
100125

101126

docs/en/operations/settings/settings.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4826,3 +4826,10 @@ When set to `true` the metadata files are written with `VERSION_FULL_OBJECT_KEY`
48264826
When set to `false` the metadata files are written with the previous format version, `VERSION_INLINE_DATA`. With that format only suffixes of object storage key names are are written to the metadata files. The prefix for all of object storage key names is set in configurations files at `storage_configuration.disks` section.
48274827

48284828
Default value: `false`.
4829+
4830+
## s3_use_adaptive_timeouts {#s3_use_adaptive_timeouts}
4831+
4832+
When set to `true` than for all s3 requests first two attempts are made with low send and receive timeouts.
4833+
When set to `false` than all attempts are made with identical timeouts.
4834+
4835+
Default value: `true`.

src/Backups/BackupIO_S3.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ namespace
5555
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
5656
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
5757
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
58-
/* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler,
58+
/* for_disk_s3 = */ false,
59+
request_settings.get_request_throttler,
60+
request_settings.put_request_throttler,
5961
s3_uri.uri.getScheme());
6062

6163
client_configuration.endpointOverride = s3_uri.endpoint;
@@ -167,7 +169,6 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
167169
blob_path.size(), mode);
168170

169171
copyS3File(
170-
client,
171172
client,
172173
s3_uri.bucket,
173174
fs::path(s3_uri.key) / path_in_backup,
@@ -229,7 +230,6 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
229230
{
230231
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
231232
copyS3File(
232-
client,
233233
client,
234234
/* src_bucket */ blob_path[1],
235235
/* src_key= */ blob_path[0],
@@ -268,7 +268,7 @@ void BackupWriterS3::copyFile(const String & destination, const String & source,
268268

269269
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
270270
{
271-
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
271+
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
272272
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
273273
}
274274

@@ -298,7 +298,6 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
298298
{
299299
return std::make_unique<WriteBufferFromS3>(
300300
client,
301-
client, // already has long timeout
302301
s3_uri.bucket,
303302
fs::path(s3_uri.key) / file_name,
304303
DBMS_DEFAULT_BUFFER_SIZE,

src/Coordination/KeeperSnapshotManagerS3.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,6 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
148148
const auto create_writer = [&](const auto & key)
149149
{
150150
return WriteBufferFromS3(
151-
s3_client->client,
152151
s3_client->client,
153152
s3_client->uri.bucket,
154153
key,

src/Core/Settings.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ class IColumn;
9494
M(UInt64, s3_max_put_rps, 0, "Limit on S3 PUT request per second rate before throttling. Zero means unlimited.", 0) \
9595
M(UInt64, s3_max_put_burst, 0, "Max number of requests that can be issued simultaneously before hitting request per second limit. By default (0) equals to `s3_max_put_rps`", 0) \
9696
M(UInt64, s3_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
97+
M(Bool, s3_use_adaptive_timeouts, true, "When adaptive timeouts are enabled first two attempts are made with low receive and send timeout", 0) \
9798
M(UInt64, azure_list_object_keys_size, 1000, "Maximum number of files that could be returned in batch by ListObject request", 0) \
9899
M(Bool, s3_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables.", 0) \
99100
M(Bool, azure_truncate_on_insert, false, "Enables or disables truncate before insert in azure engine tables.", 0) \
@@ -104,7 +105,7 @@ class IColumn;
104105
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
105106
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
106107
M(UInt64, s3_retry_attempts, 100, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
107-
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
108+
M(UInt64, s3_request_timeout_ms, 30000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
108109
M(UInt64, s3_http_connection_pool_size, 1000, "How many reusable open connections to keep per S3 endpoint. Only applies to the S3 table engine and table function, not to S3 disks (for disks, use disk config instead). Global setting, can only be set in config, overriding it per session or per query has no effect.", 0) \
109110
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
110111
M(String, s3queue_default_zookeeper_path, "/clickhouse/s3queue/", "Default zookeeper path prefix for S3Queue engine", 0) \

src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class S3IteratorAsync final : public IObjectStorageIteratorAsync
155155
bool S3ObjectStorage::exists(const StoredObject & object) const
156156
{
157157
auto settings_ptr = s3_settings.get();
158-
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
158+
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
159159
}
160160

161161
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@@ -174,7 +174,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
174174
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
175175
{
176176
return std::make_unique<ReadBufferFromS3>(
177-
clients.get()->client,
177+
client.get(),
178178
bucket,
179179
path,
180180
version_id,
@@ -224,7 +224,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
224224
{
225225
auto settings_ptr = s3_settings.get();
226226
return std::make_unique<ReadBufferFromS3>(
227-
clients.get()->client,
227+
client.get(),
228228
bucket,
229229
object.remote_path,
230230
version_id,
@@ -249,10 +249,8 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
249249
if (write_settings.s3_allow_parallel_part_upload)
250250
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
251251

252-
auto clients_ = clients.get();
253252
return std::make_unique<WriteBufferFromS3>(
254-
clients_->client,
255-
clients_->client_with_long_timeout,
253+
client.get(),
256254
bucket,
257255
object.remote_path,
258256
buf_size,
@@ -266,15 +264,12 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
266264
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
267265
{
268266
auto settings_ptr = s3_settings.get();
269-
auto client_ptr = clients.get()->client;
270-
271-
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
267+
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client.get(), settings_ptr->list_object_keys_size);
272268
}
273269

274270
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
275271
{
276272
auto settings_ptr = s3_settings.get();
277-
auto client_ptr = clients.get()->client;
278273

279274
S3::ListObjectsV2Request request;
280275
request.SetBucket(bucket);
@@ -289,7 +284,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
289284
{
290285
ProfileEvents::increment(ProfileEvents::S3ListObjects);
291286
ProfileEvents::increment(ProfileEvents::DiskS3ListObjects);
292-
outcome = client_ptr->ListObjectsV2(request);
287+
outcome = client.get()->ListObjectsV2(request);
293288
throwIfError(outcome);
294289

295290
auto result = outcome.GetResult();
@@ -320,14 +315,12 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
320315

321316
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
322317
{
323-
auto client_ptr = clients.get()->client;
324-
325318
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
326319
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
327320
S3::DeleteObjectRequest request;
328321
request.SetBucket(bucket);
329322
request.SetKey(object.remote_path);
330-
auto outcome = client_ptr->DeleteObject(request);
323+
auto outcome = client.get()->DeleteObject(request);
331324

332325
throwIfUnexpectedError(outcome, if_exists);
333326

@@ -346,7 +339,6 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
346339
}
347340
else
348341
{
349-
auto client_ptr = clients.get()->client;
350342
auto settings_ptr = s3_settings.get();
351343

352344
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
@@ -375,7 +367,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
375367
S3::DeleteObjectsRequest request;
376368
request.SetBucket(bucket);
377369
request.SetDelete(delkeys);
378-
auto outcome = client_ptr->DeleteObjects(request);
370+
auto outcome = client.get()->DeleteObjects(request);
379371

380372
throwIfUnexpectedError(outcome, if_exists);
381373

@@ -407,7 +399,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
407399
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
408400
{
409401
auto settings_ptr = s3_settings.get();
410-
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
402+
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
411403

412404
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
413405
return {};
@@ -423,7 +415,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
423415
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
424416
{
425417
auto settings_ptr = s3_settings.get();
426-
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
418+
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
427419

428420
ObjectMetadata result;
429421
result.size_bytes = object_info.size;
@@ -444,12 +436,12 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
444436
/// Shortcut for S3
445437
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
446438
{
447-
auto clients_ = clients.get();
439+
auto client_ = client.get();
448440
auto settings_ptr = s3_settings.get();
449-
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
441+
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
450442
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
451-
copyS3File(clients_->client,
452-
clients_->client_with_long_timeout,
443+
copyS3File(
444+
client.get(),
453445
bucket,
454446
object_from.remote_path,
455447
0,
@@ -473,12 +465,11 @@ void S3ObjectStorage::copyObject( // NOLINT
473465
const WriteSettings &,
474466
std::optional<ObjectAttributes> object_to_attributes)
475467
{
476-
auto clients_ = clients.get();
468+
auto client_ = client.get();
477469
auto settings_ptr = s3_settings.get();
478-
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
470+
auto size = S3::getObjectSize(*client_, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
479471
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
480-
copyS3File(clients_->client,
481-
clients_->client_with_long_timeout,
472+
copyS3File(client_,
482473
bucket,
483474
object_from.remote_path,
484475
0,
@@ -499,31 +490,25 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
499490

500491
void S3ObjectStorage::shutdown()
501492
{
502-
auto clients_ptr = clients.get();
503493
/// This call stops any next retry attempts for ongoing S3 requests.
504494
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
505495
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
506496
/// This should significantly speed up shutdown process if S3 is unhealthy.
507-
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
508-
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
497+
const_cast<S3::Client &>(*client.get()).DisableRequestProcessing();
509498
}
510499

511500
void S3ObjectStorage::startup()
512501
{
513-
auto clients_ptr = clients.get();
514-
515502
/// Need to be enabled if it was disabled during shutdown() call.
516-
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
517-
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
503+
const_cast<S3::Client &>(*client.get()).EnableRequestProcessing();
518504
}
519505

520506
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
521507
{
522508
auto new_s3_settings = getSettings(config, config_prefix, context);
523509
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
524-
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
525510
s3_settings.set(std::move(new_s3_settings));
526-
clients.set(std::move(new_clients));
511+
client.set(std::move(new_client));
527512
}
528513

529514
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
@@ -538,9 +523,6 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
538523
endpoint, object_key_prefix);
539524
}
540525

541-
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
542-
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
543-
544526
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string &) const
545527
{
546528
/// Path to store the new S3 object.

src/Disks/ObjectStorages/S3/S3ObjectStorage.h

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -39,16 +39,6 @@ struct S3ObjectStorageSettings
3939

4040
class S3ObjectStorage : public IObjectStorage
4141
{
42-
public:
43-
struct Clients
44-
{
45-
std::shared_ptr<S3::Client> client;
46-
std::shared_ptr<S3::Client> client_with_long_timeout;
47-
48-
Clients() = default;
49-
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
50-
};
51-
5242
private:
5343
friend class S3PlainObjectStorage;
5444

@@ -63,7 +53,7 @@ class S3ObjectStorage : public IObjectStorage
6353
String object_key_prefix_)
6454
: bucket(std::move(bucket_))
6555
, object_key_prefix(std::move(object_key_prefix_))
66-
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
56+
, client(std::move(client_))
6757
, s3_settings(std::move(s3_settings_))
6858
, s3_capabilities(s3_capabilities_)
6959
, version_id(std::move(version_id_))
@@ -184,7 +174,8 @@ class S3ObjectStorage : public IObjectStorage
184174
std::string bucket;
185175
String object_key_prefix;
186176

187-
MultiVersion<Clients> clients;
177+
178+
MultiVersion<S3::Client> client;
188179
MultiVersion<S3ObjectStorageSettings> s3_settings;
189180
S3Capabilities s3_capabilities;
190181

src/Disks/ObjectStorages/S3/diskSettings.cpp

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,13 +60,15 @@ std::unique_ptr<S3::Client> getClient(
6060
uri.uri.getScheme());
6161

6262
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
63-
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
63+
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
6464
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
6565
client_configuration.endpointOverride = uri.endpoint;
66-
client_configuration.http_keep_alive_timeout_ms
67-
= config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
66+
client_configuration.http_keep_alive_timeout_ms = config.getUInt(
67+
config_prefix + ".http_keep_alive_timeout_ms", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT * 1000);
6868
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
6969
client_configuration.wait_on_pool_size_limit = false;
70+
client_configuration.s3_use_adaptive_timeouts = config.getBool(
71+
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
7072

7173
/*
7274
* Override proxy configuration for backwards compatibility with old configuration format.

0 commit comments

Comments
 (0)