Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1095,3 +1095,28 @@ rocksdb.sst_file_delete_rate_bytes_per_sec 0

################################ NAMESPACE #####################################
# namespace.test change.me

################################ HOTKEY ANALYZE #####################################
# Server will bootstrap hotkey analyze when startup if set yes, default no
hotkey-bootstrap no

# LRU cache init capacity
hotkey-init-lru-capacity 200000

# LRU cache maximum capacity
hotkey-max-lru-capacity 500000

# Hotkey deque init size
hotkey-init-deque-size 500000

# Hotkey deque maximum size
hotkey-max-deque-size 1000000

# Hotkey init threshold
hotkey-init-threshold 5000

# Hotkey maximum threshold
hotkey-max-threshold 50000

# The maximum hotkey entries return when execute fetch commands
hotkey-max-fetch-entries 10000
157 changes: 156 additions & 1 deletion src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1549,6 +1549,160 @@ class CommandFlushBlockCache : public Commander {
}
};

// hotkey enable {capacity} {deque_size} {threshold}
// hotkey disable
// hotkey stats
// hotkey threshold {threshold}
// hotkey getbykey {key} {begin_timestamp_ms} {end_timestamp_ms}
// hotkey getbythreshold {threshold} {begin_timestamp_ms} {end_timestamp_ms}
// hotkey dumplogfile {off|info|warning}
// hotkey timerange {begin_timestamp_ms} {end_timestamp_ms}
class CommandHotkey : public Commander {
public:
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn,
std::string *output) override {
std::string sub_command = util::ToLower(args_[1]);
if ((sub_command == "enable" && args_.size() != 5) || (sub_command == "disable" && args_.size() != 2) ||
(sub_command == "stats" && args_.size() != 2) || (sub_command == "threshold" && args_.size() != 3) ||
(sub_command == "getbykey" && args_.size() != 3 && args_.size() != 5) ||
(sub_command == "getbythreshold" && args_.size() != 3 && args_.size() != 5) ||
(sub_command == "dumplogfile" && args_.size() != 3) ||
(sub_command == "timerange" && args_.size() != 2 && args_.size() != 4)) {
return {Status::RedisExecErr, errWrongNumOfArguments};
}

Config *config = srv->GetConfig();
int min_capacity = 100000, min_deque_size = 100000, min_threshold = 1;
if (sub_command == "enable") {
auto capacity = ParseInt<int>(args_[2], 10);
if (!capacity || *capacity < min_capacity || *capacity > config->hotkey_max_lru_capacity) {
return {Status::RedisParseErr, fmt::format("capacity must be an integer between {} and {}", min_capacity,
config->hotkey_max_lru_capacity)};
}
auto deque_size = ParseInt<int>(args_[3], 10);
if (!deque_size || *deque_size < min_deque_size || *deque_size > config->hotkey_max_deque_size) {
return {Status::RedisParseErr, fmt::format("deque size must be an integer between {} and {}", min_deque_size,
config->hotkey_max_deque_size)};
}
auto threshold = ParseInt<int>(args_[4], 10);
if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) {
return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold,
config->hotkey_max_threshold)};
}
auto s = srv->hotkey.Enable(*capacity, *deque_size, *threshold);
if (!s.IsOK()) {
return s;
}
*output = redis::RESP_OK;
} else if (sub_command == "disable") {
auto s = srv->hotkey.Disable();
if (!s.IsOK()) {
return s;
}
*output = redis::RESP_OK;
} else if (sub_command == "stats") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
*output = srv->hotkey.GetStats();
} else if (sub_command == "threshold") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
auto threshold = ParseInt<int>(args_[2], 10);
if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) {
return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold,
config->hotkey_max_threshold)};
}
srv->hotkey.SetThreshold(*threshold);
*output = redis::RESP_OK;
} else if (sub_command == "timerange") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
if (args_.size() == 2) {
auto now = util::GetTimeStamp();
uint64_t begin_timestamp_ms = (now - 1) * 1000;
uint64_t end_timestamp_ms = now * 1000;
*output = srv->hotkey.SearchByTimeRange(config->hotkey_max_fetch_entries, begin_timestamp_ms, end_timestamp_ms);
} else {
auto begin_timestamp_ms = ParseInt<int64_t>(args_[2], 10);
if (!begin_timestamp_ms || *begin_timestamp_ms < 1) {
return {Status::RedisExecErr, "begin timestamp invalid"};
}
auto end_timestamp_ms = ParseInt<int64_t>(args_[3], 10);
if (!end_timestamp_ms || *end_timestamp_ms < 1) {
return {Status::RedisExecErr, "end timestamp invalid"};
}
*output =
srv->hotkey.SearchByTimeRange(config->hotkey_max_fetch_entries, *begin_timestamp_ms, *end_timestamp_ms);
}
} else if (sub_command == "getbykey") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
if (args_.size() == 3) {
*output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, args_[2], 0, 0, 0);
} else {
auto begin_timestamp_ms = ParseInt<int64_t>(args_[3], 10);
if (!begin_timestamp_ms || *begin_timestamp_ms < 1) {
return {Status::RedisExecErr, "begin timestamp invalid"};
}
auto end_timestamp_ms = ParseInt<int64_t>(args_[4], 10);
if (!end_timestamp_ms || *end_timestamp_ms < 1) {
return {Status::RedisExecErr, "end timestamp invalid"};
}
*output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, args_[2], 0, *begin_timestamp_ms,
*end_timestamp_ms);
}
} else if (sub_command == "getbythreshold") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
auto threshold = ParseInt<int>(args_[2], 10);
if (!threshold || *threshold < min_threshold || *threshold > config->hotkey_max_threshold) {
return {Status::RedisParseErr, fmt::format("threshold must be an integer between {} and {}", min_threshold,
config->hotkey_max_threshold)};
}
if (args_.size() == 3) {
*output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, "", *threshold, 0, 0);
} else {
auto begin_timestamp_ms = ParseInt<int64_t>(args_[3], 10);
if (!begin_timestamp_ms || *begin_timestamp_ms < 1) {
return {Status::RedisExecErr, "begin timestamp invalid"};
}
auto end_timestamp_ms = ParseInt<int64_t>(args_[4], 10);
if (!end_timestamp_ms || *end_timestamp_ms < 1) {
return {Status::RedisExecErr, "end timestamp invalid"};
}
*output = srv->hotkey.GetByKeyOrThreshold(config->hotkey_max_fetch_entries, "", *threshold, *begin_timestamp_ms,
*end_timestamp_ms);
}
} else if (sub_command == "dumplogfile") {
if (!srv->hotkey.enable_analyze) {
return {Status::RedisExecErr, "please enable hotkey analyze at first"};
}
spdlog::level::level_enum level = spdlog::level::off;
if (args_[2] == "info") {
level = spdlog::level::info;
} else if (args_[2] == "warning") {
level = spdlog::level::warn;
} else if (args_[2] == "off") {
level = spdlog::level::off;
} else {
return {Status::RedisExecErr, "dump logfile level should be one of off,info,warning"};
}
srv->hotkey.SetDumpToLogfileLevel(level);
*output = redis::RESP_OK;
} else {
return {Status::RedisExecErr,
"HOTKEY subcommand must be one of ENABLE, DISABLE, TIMERANGE, THRESHOLD, GETBYKEY, GETBYTHRESHOLD, "
"DUMPLOGFILE, STATS"};
}
return Status::OK();
}
};

REDIS_REGISTER_COMMANDS(
Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth", NO_KEY),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY),
Expand Down Expand Up @@ -1592,5 +1746,6 @@ REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", NO_KEY),
MakeCmdAttr<CommandSST>("sst", -3, "write exclusive admin", 1, 1, 1),
MakeCmdAttr<CommandFlushMemTable>("flushmemtable", -1, "exclusive write", NO_KEY),
MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive write", NO_KEY), )
MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive write", NO_KEY),
MakeCmdAttr<CommandHotkey>("hotkey", -2, "read-only", NO_KEY), )
} // namespace redis
39 changes: 39 additions & 0 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,14 @@ Config::Config() {
{"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)},
{"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)},
{"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")},
{"hotkey-bootstrap", false, new YesNoField(&hotkey_bootstrap, false)},
{"hotkey-init-lru-capacity", false, new IntField(&hotkey_init_lru_capacity, 200000, 100000, INT_MAX)},
{"hotkey-max-lru-capacity", false, new IntField(&hotkey_max_lru_capacity, 500000, 100000, INT_MAX)},
{"hotkey-init-deque-size", false, new IntField(&hotkey_init_deque_size, 500000, 100000, INT_MAX)},
{"hotkey-max-deque-size", false, new IntField(&hotkey_max_deque_size, 1000000, 100000, INT_MAX)},
{"hotkey-init-threshold", false, new IntField(&hotkey_init_threshold, 5000, 1, INT_MAX)},
{"hotkey-max-threshold", false, new IntField(&hotkey_max_threshold, 50000, 1, INT_MAX)},
{"hotkey-max-fetch-entries", false, new IntField(&hotkey_max_fetch_entries, 10000, 1, INT_MAX)},

/* rocksdb options */
{"rocksdb.compression", false,
Expand Down Expand Up @@ -470,6 +478,31 @@ void Config::initFieldCallback() {
return Status::OK();
};

auto set_hotkey_cb = [this]([[maybe_unused]] Server *srv, [[maybe_unused]] const std::string &k,
[[maybe_unused]] const std::string &v) -> Status {
if ((k == "hotkey-init-lru-capacity" || k == "hotkey-max-lru-capacity") &&
hotkey_init_lru_capacity > hotkey_max_lru_capacity) {
if (k == "hotkey-init-lru-capacity") {
return {Status::NotOK, "hotkey-init-lru-capacity should <= hotkey-max-lru-capacity"};
}
return {Status::NotOK, "hotkey-max-lru-capacity should >= hotkey-init-lru-capacity"};
}
if ((k == "hotkey-init-deque-size" || k == "hotkey-max-deque-size") &&
hotkey_init_deque_size > hotkey_max_deque_size) {
if (k == "hotkey-init-deque-size") {
return {Status::NotOK, "hotkey-init-deque-size should <= hotkey-max-deque-size"};
}
return {Status::NotOK, "hotkey-max-deque-size should >= hotkey-init-deque-size"};
}
if ((k == "hotkey-init-threshold" || k == "hotkey-max-threshold") && hotkey_init_threshold > hotkey_max_threshold) {
if (k == "hotkey-init-threshold") {
return {Status::NotOK, "hotkey-init-threshold should <= hotkey-max-threshold"};
}
return {Status::NotOK, "hotkey-max-threshold should >= hotkey-init-threshold"};
}
return Status::OK();
};

std::map<std::string, CallbackFn> callbacks =
{
{"workers",
Expand Down Expand Up @@ -803,6 +836,12 @@ void Config::initFieldCallback() {
}
return Status::OK();
}},
{"hotkey-init-lru-capacity", set_hotkey_cb},
{"hotkey-max-lru-capacity", set_hotkey_cb},
{"hotkey-init-deque-size", set_hotkey_cb},
{"hotkey-max-deque-size", set_hotkey_cb},
{"hotkey-init-threshold", set_hotkey_cb},
{"hotkey-max-threshold", set_hotkey_cb},
};
for (const auto &iter : callbacks) {
auto field_iter = fields_.find(iter.first);
Expand Down
10 changes: 10 additions & 0 deletions src/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,16 @@ struct Config {

std::vector<double> histogram_bucket_boundaries;

// hotkey analyze
bool hotkey_bootstrap = false;
int hotkey_init_lru_capacity = 200000;
int hotkey_max_lru_capacity = 500000;
int hotkey_init_deque_size = 500000;
int hotkey_max_deque_size = 1000000;
int hotkey_init_threshold = 5000;
int hotkey_max_threshold = 50000;
int hotkey_max_fetch_entries = 10000;

struct RocksDB {
int block_size;
bool cache_index_and_filter_blocks;
Expand Down
41 changes: 41 additions & 0 deletions src/server/redis_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,39 @@ static bool IsCmdAllowedInStaleData(const std::string &cmd_name) {
return cmd_name == "info" || cmd_name == "slaveof" || cmd_name == "config";
}

static std::string GetRedisTypeOfUserKey([[maybe_unused]] const std::string &cmd_name, CommandCategory cmd_cat) {
switch (cmd_cat) {
case CommandCategory::Bit:
return "BitMap";
case CommandCategory::BloomFilter:
return "BloomFilter";
case CommandCategory::Geo:
return "Geo";
case CommandCategory::Hash:
return "Hash";
case CommandCategory::HLL:
return "HyperLogLog";
case CommandCategory::JSON:
return "JSON";
case CommandCategory::List:
return "List";
case CommandCategory::Set:
return "Set";
case CommandCategory::SortedInt:
return "SortedInt";
case CommandCategory::Stream:
return "Stream";
case CommandCategory::String:
return "String";
case CommandCategory::ZSet:
return "ZSet";
case CommandCategory::TDigest:
return "TDigest";
default:
return "none";
}
}

void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {
const Config *config = srv_->GetConfig();
std::string reply;
Expand Down Expand Up @@ -596,6 +629,14 @@ void Connection::ExecuteCommands(std::deque<CommandTokens> *to_process_cmds) {

if (!reply.empty()) Reply(reply);
reply.clear();

// Hotkey analyze
if (srv_->hotkey.enable_analyze) {
std::string redis_type = GetRedisTypeOfUserKey(cmd_name, attributes->category);
if (redis_type != "none") {
srv_->hotkey.UpdateCounter(cmd_tokens[1], redis_type);
}
}
}
}

Expand Down
13 changes: 13 additions & 0 deletions src/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@ Server::Server(engine::Storage *storage, Config *config)
slow_log_.SetMaxEntries(config->slowlog_max_len);
slow_log_.SetDumpToLogfileLevel(config->slowlog_dump_logfile_level);
perf_log_.SetMaxEntries(config->profiling_sample_record_max_len);

if (config->hotkey_bootstrap) {
uint32_t capacity = config->hotkey_init_lru_capacity, deque_size = config->hotkey_init_deque_size,
threshold = config->hotkey_init_threshold;
Status s = hotkey.Enable(capacity, deque_size, threshold);
if (!s.IsOK()) {
error("[server] Failed to enable hotkey analyze with capacity: {}, deque size: {}, threshold: {}", capacity,
deque_size, threshold);
exit(1);
}
info("[server] Enable hotkey analyze with capacity: {}, deque size: {}, threshold: {}", capacity, deque_size,
threshold);
}
}

Server::~Server() {
Expand Down
3 changes: 3 additions & 0 deletions src/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
#include "search/index_manager.h"
#include "search/indexer.h"
#include "server/redis_connection.h"
#include "stats/hot_key.h"
#include "stats/log_collector.h"
#include "stats/stats.h"
#include "storage/redis_metadata.h"
Expand Down Expand Up @@ -327,6 +328,8 @@ class Server {
std::shared_lock<std::shared_mutex> WorkConcurrencyGuard();
std::unique_lock<std::shared_mutex> WorkExclusivityGuard();

Hotkey hotkey;

Stats stats;
engine::Storage *storage;
MemoryProfiler memory_profiler;
Expand Down
Loading