Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
M(RegionPersisterRunMode) \
M(S3Requests) \
M(S3RandomAccessFile) \
M(S3ActiveGetObjectStreams) \
M(GlobalStorageRunMode) \
M(GlobalThread) \
M(GlobalThreadActive) \
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,24 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_head_object, {{"type", "head_object"}}, ExpBuckets{0.001, 2, 20}), \
F(type_read_stream, {{"type", "read_stream"}}, ExpBuckets{0.0001, 2, 20}), \
F(type_read_stream_err, {{"type", "read_stream_err"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_s3_read_limiter, \
"S3 read limiter counters", \
Counter, \
F(type_stream_wait_count, {{"type", "stream_wait_count"}}), \
F(type_byte_wait_count, {{"type", "byte_wait_count"}}), \
F(type_direct_read_bytes, {{"type", "direct_read_bytes"}}), \
F(type_filecache_download_bytes, {{"type", "filecache_download_bytes"}})) \
M(tiflash_storage_s3_read_limiter_wait_seconds, \
"S3 read limiter wait duration in seconds", \
Histogram, \
F(type_stream_wait, {{"type", "stream_wait"}}, ExpBuckets{0.0001, 2, 20}), \
F(type_byte_wait, {{"type", "byte_wait"}}, ExpBuckets{0.0001, 2, 20})) \
M(tiflash_storage_s3_read_limiter_status, \
"S3 read limiter status", \
Gauge, \
F(type_active_get_object_streams, {{"type", "active_get_object_streams"}}), \
F(type_max_get_object_streams, {{"type", "max_get_object_streams"}}), \
F(type_max_read_bytes_per_sec, {{"type", "max_read_bytes_per_sec"}})) \
M(tiflash_storage_s3_http_request_seconds, \
"S3 request duration breakdown in seconds", \
Histogram, \
Expand Down Expand Up @@ -883,6 +901,10 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_dtfile_full, {"type", "dtfile_full"}), \
F(type_dtfile_download, {"type", "dtfile_download"}), \
F(type_dtfile_download_failed, {"type", "dtfile_download_failed"}), \
F(type_wait_on_downloading, {"type", "wait_on_downloading"}), \
F(type_wait_on_downloading_hit, {"type", "wait_on_downloading_hit"}), \
F(type_wait_on_downloading_timeout, {"type", "wait_on_downloading_timeout"}), \
F(type_wait_on_downloading_failed, {"type", "wait_on_downloading_failed"}), \
F(type_page_hit, {"type", "page_hit"}), \
F(type_page_miss, {"type", "page_miss"}), \
F(type_page_evict, {"type", "page_evict"}), \
Expand All @@ -897,6 +919,10 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_page_evict_bytes, {"type", "page_evict_bytes"}), \
F(type_page_download_bytes, {"type", "page_download_bytes"}), \
F(type_page_read_bytes, {"type", "page_read_bytes"})) \
M(tiflash_storage_remote_cache_status, \
"Remote cache status", \
Gauge, \
F(type_bg_downloading_count, {{"type", "bg_downloading_count"}})) \
M(tiflash_storage_io_limiter_pending_seconds, \
"I/O limiter pending duration in seconds", \
Histogram, \
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/IO/BaseFile/IORateLimitConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ void IORateLimitConfig::parse(const String & storage_io_rate_limit, const Logger
readConfig(config, "max_bytes_per_sec", max_bytes_per_sec);
readConfig(config, "max_read_bytes_per_sec", max_read_bytes_per_sec);
readConfig(config, "max_write_bytes_per_sec", max_write_bytes_per_sec);
readConfig(config, "s3_max_read_bytes_per_sec", s3_max_read_bytes_per_sec);
readConfig(config, "s3_max_get_object_streams", s3_max_get_object_streams);
readConfig(config, "foreground_write_weight", fg_write_weight);
readConfig(config, "background_write_weight", bg_write_weight);
readConfig(config, "foreground_read_weight", fg_read_weight);
Expand All @@ -72,6 +74,7 @@ std::string IORateLimitConfig::toString() const
{
return fmt::format(
"IORateLimitConfig{{max_bytes_per_sec={} max_read_bytes_per_sec={} max_write_bytes_per_sec={} "
"s3_max_read_bytes_per_sec={} s3_max_get_object_streams={} "
"use_max_bytes_per_sec={} "
"fg_write_weight={} bg_write_weight={} fg_read_weight={} bg_read_weight={} "
"fg_write_max_bytes_per_sec={} bg_write_max_bytes_per_sec={} "
Expand All @@ -80,6 +83,8 @@ std::string IORateLimitConfig::toString() const
max_bytes_per_sec,
max_read_bytes_per_sec,
max_write_bytes_per_sec,
s3_max_read_bytes_per_sec,
s3_max_get_object_streams,
use_max_bytes_per_sec,
fg_write_weight,
bg_write_weight,
Expand Down Expand Up @@ -165,9 +170,12 @@ UInt64 IORateLimitConfig::getReadMaxBytesPerSec() const
bool IORateLimitConfig::operator==(const IORateLimitConfig & config) const
{
return config.max_bytes_per_sec == max_bytes_per_sec && config.max_read_bytes_per_sec == max_read_bytes_per_sec
&& config.max_write_bytes_per_sec == max_write_bytes_per_sec && config.bg_write_weight == bg_write_weight
&& config.fg_write_weight == fg_write_weight && config.bg_read_weight == bg_read_weight
&& config.fg_read_weight == fg_read_weight && config.emergency_pct == emergency_pct
&& config.max_write_bytes_per_sec == max_write_bytes_per_sec
&& config.s3_max_read_bytes_per_sec == s3_max_read_bytes_per_sec
&& config.s3_max_get_object_streams == s3_max_get_object_streams
&& config.bg_write_weight == bg_write_weight && config.fg_write_weight == fg_write_weight
&& config.bg_read_weight == bg_read_weight && config.fg_read_weight == fg_read_weight
&& config.emergency_pct == emergency_pct
&& config.high_pct == high_pct && config.medium_pct == medium_pct && config.tune_base == tune_base
&& config.min_bytes_per_sec == min_bytes_per_sec && config.auto_tune_sec == auto_tune_sec;
}
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/IO/BaseFile/IORateLimitConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ struct IORateLimitConfig
// For disk that read bandwidth and write bandwith are calculated separately, such as GCP's persistent disks.
UInt64 max_read_bytes_per_sec;
UInt64 max_write_bytes_per_sec;
UInt64 s3_max_read_bytes_per_sec;
UInt64 s3_max_get_object_streams;

// only true when both max_read_bytes_per_sec and max_write_bytes_per_sec are 0
bool use_max_bytes_per_sec;
Expand All @@ -54,6 +56,8 @@ struct IORateLimitConfig
: max_bytes_per_sec(0)
, max_read_bytes_per_sec(0)
, max_write_bytes_per_sec(0)
, s3_max_read_bytes_per_sec(0)
, s3_max_get_object_streams(0)
, use_max_bytes_per_sec(true)
// only limit background write by default
, fg_write_weight(0)
Expand Down
21 changes: 21 additions & 0 deletions dbms/src/IO/BaseFile/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Common/TiFlashMetrics.h>
#include <IO/BaseFile/RateLimiter.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Storages/S3/S3ReadLimiter.h>
#include <boost_wrapper/string.h>
#include <common/likely.h>
#include <common/logger_useful.h>
Expand Down Expand Up @@ -482,6 +483,12 @@ ReadLimiterPtr IORateLimiter::getReadLimiter()
return is_background_thread ? bg_read_limiter : fg_read_limiter;
}

std::shared_ptr<S3::S3ReadLimiter> IORateLimiter::getS3ReadLimiter()
{
std::lock_guard lock(limiter_mtx);
return s3_read_limiter;
}

void IORateLimiter::updateConfig(Poco::Util::AbstractConfiguration & config_)
{
if (!reloadConfig(config_))
Expand Down Expand Up @@ -518,6 +525,20 @@ void IORateLimiter::updateLimiterByConfig(const IORateLimitConfig & cfg)
std::lock_guard lock(limiter_mtx);
updateReadLimiter(cfg.getBgReadMaxBytesPerSec(), cfg.getFgReadMaxBytesPerSec());
updateWriteLimiter(cfg.getBgWriteMaxBytesPerSec(), cfg.getFgWriteMaxBytesPerSec());
if (cfg.s3_max_read_bytes_per_sec == 0 && cfg.s3_max_get_object_streams == 0)
{
s3_read_limiter = nullptr;
}
else if (s3_read_limiter == nullptr)
{
s3_read_limiter = std::make_shared<S3::S3ReadLimiter>(
cfg.s3_max_read_bytes_per_sec,
cfg.s3_max_get_object_streams);
}
else
{
s3_read_limiter->updateConfig(cfg.s3_max_read_bytes_per_sec, cfg.s3_max_get_object_streams);
}
}

void IORateLimiter::updateReadLimiter(Int64 bg_bytes, Int64 fg_bytes)
Expand Down
6 changes: 6 additions & 0 deletions dbms/src/IO/BaseFile/RateLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ namespace DB
{
class LimiterStat;
class IOLimitTuner;
namespace S3
{
class S3ReadLimiter;
}

enum class LimiterType
{
Expand Down Expand Up @@ -216,6 +220,7 @@ class IORateLimiter
WriteLimiterPtr getBgWriteLimiter();

ReadLimiterPtr getReadLimiter();
std::shared_ptr<S3::S3ReadLimiter> getS3ReadLimiter();
void init(Poco::Util::AbstractConfiguration & config_);
void updateConfig(Poco::Util::AbstractConfiguration & config_);

Expand Down Expand Up @@ -250,6 +255,7 @@ class IORateLimiter
// Background read and foreground read
ReadLimiterPtr bg_read_limiter;
ReadLimiterPtr fg_read_limiter;
std::shared_ptr<S3::S3ReadLimiter> s3_read_limiter;

std::mutex bg_thread_ids_mtx;
std::vector<pid_t> bg_thread_ids;
Expand Down
29 changes: 29 additions & 0 deletions dbms/src/IO/BaseFile/tests/gtest_rate_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

#include <Common/Exception.h>
#include <IO/BaseFile/RateLimiter.h>
#include <Storages/S3/S3ReadLimiter.h>
#include <TestUtils/TiFlashTestBasic.h>
#include <fcntl.h>
#include <gtest/gtest.h>
#include <unistd.h>

#include <ctime>
#include <future>
#include <random>
#include <thread>

Expand Down Expand Up @@ -374,6 +376,33 @@ TEST(ReadLimiterTest, ReadMany)
ASSERT_EQ(read_limiter.alloc_bytes, 100);
}

TEST(S3ReadLimiterTest, StreamTokenBlocksUntilRelease)
{
auto limiter = std::make_shared<S3::S3ReadLimiter>(0, 1);
auto token1 = limiter->acquireStream();
ASSERT_NE(token1, nullptr);
ASSERT_EQ(limiter->activeStreams(), 1);

auto future = std::async(std::launch::async, [&]() { return limiter->acquireStream(); });
ASSERT_EQ(future.wait_for(50ms), std::future_status::timeout);

token1.reset();
auto token2 = future.get();
ASSERT_NE(token2, nullptr);
ASSERT_EQ(limiter->activeStreams(), 1);
token2.reset();
ASSERT_EQ(limiter->activeStreams(), 0);
}

TEST(S3ReadLimiterTest, ByteRequestsWaitForRefill)
{
S3::S3ReadLimiter limiter(1000, 0, 100);
limiter.requestBytes(100, S3::S3ReadSource::DirectRead);
AtomicStopwatch watch;
limiter.requestBytes(100, S3::S3ReadSource::DirectRead);
ASSERT_GE(watch.elapsedMilliseconds(), 80);
}

#ifdef __linux__
TEST(IORateLimiterTest, IOStat)
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ struct Settings
M(SettingDouble, dt_filecache_downloading_count_scale, 2.0, "Max concurrency of download task count of FileCache = number of logical cpu cores * dt_filecache_downloading_count_scale.") \
M(SettingDouble, dt_filecache_max_downloading_count_scale, 10.0, "Max queue size of download task count of FileCache = number of logical cpu cores * dt_filecache_max_downloading_count_scale.") \
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
M(SettingUInt64, dt_filecache_wait_on_downloading_ms, 0, "When a remote cache lookup sees the same key is already being downloaded, wait up to this many milliseconds for that download to finish. 0 disables the bounded wait.") \
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \
M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,7 @@ try

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);
S3::ClientFactory::instance().setS3ReadLimiter(global_context->getIORateLimiter().getS3ReadLimiter());

global_context->setServerInfo(server_info);
if (server_info.memory_info.capacity == 0)
Expand Down Expand Up @@ -971,6 +972,7 @@ try
buildLoggers(*config);
global_context->getTMTContext().reloadConfig(*config);
global_context->getIORateLimiter().updateConfig(*config);
S3::ClientFactory::instance().setS3ReadLimiter(global_context->getIORateLimiter().getS3ReadLimiter());
global_context->reloadDeltaTreeConfig(*config);
DM::SegmentReadTaskScheduler::instance().updateConfig(global_context->getSettingsRef());
if (FileCache::instance() != nullptr)
Expand Down
Loading