Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add node health status for CLUSTER SLOTS and SHARDS #4767

Merged
merged 3 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 24 additions & 22 deletions src/server/cluster/cluster_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) {

for (const auto& shard : new_config) {
if (!CheckAndInsertNode(shard.master.id)) {
LOG(WARNING) << "Master " << shard.master.id << " appears more than once";
LOG(ERROR) << "Master " << shard.master.id << " appears more than once";
return false;
}
for (const auto& replica : shard.replicas) {
if (!CheckAndInsertNode(replica.id)) {
LOG(WARNING) << "Replica " << replica.id << " appears more than once";
LOG(ERROR) << "Replica " << replica.id << " appears more than once";
return false;
}
}
Expand All @@ -56,21 +56,21 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
if (slot_range.start > slot_range.end) {
LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
LOG(ERROR) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
return false;
}

for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
if (slot >= slots_found.size()) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
LOG(ERROR) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
return false;
}

if (slots_found[slot]) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
LOG(ERROR) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
return false;
}

Expand All @@ -80,7 +80,7 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
}

if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
LOG(WARNING) << "Invalid cluster config: some slots were missing.";
LOG(ERROR) << "Invalid cluster config: some slots were missing.";
return false;
}

Expand Down Expand Up @@ -129,7 +129,7 @@ constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;

template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}

Expand All @@ -138,15 +138,15 @@ template <typename T> optional<T> ReadNumeric(const JsonType& obj) {

optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}

std::vector<SlotRange> ranges;

for (const auto& range : slots.array_range()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}

Expand All @@ -164,7 +164,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {

optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}

Expand All @@ -173,7 +173,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
Expand All @@ -182,7 +182,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
Expand All @@ -200,7 +200,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
auto health = json.at_or_null("health");
if (!health.is_null()) {
if (!health.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node " << json;
} else {
auto health_str = std::move(health).as_string();
if (absl::EqualsIgnoreCase(health_str, "FAIL")) {
Expand All @@ -209,8 +209,10 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
node.health = NodeHealth::LOADING;
} else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) {
node.health = NodeHealth::ONLINE;
} else if (absl::EqualsIgnoreCase(health_str, "HIDDEN")) {
node.health = NodeHealth::HIDDEN;
} else {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
}
}
}
Expand All @@ -237,7 +239,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));

if (!node_id.is_string() || !ip.is_string() || !port || !slots) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid migration json " << json;
return nullopt;
}

Expand All @@ -253,15 +255,15 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
std::vector<ClusterShardInfo> config;

if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
LOG(ERROR) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}

for (const auto& element : json.array_range()) {
ClusterShardInfo shard;

if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
LOG(ERROR) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}

Expand All @@ -279,7 +281,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {

auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
LOG(ERROR) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}

Expand Down Expand Up @@ -309,7 +311,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
std::string_view json_str) {
optional<JsonType> json_config = JsonFromString(json_str, PMR_NS::get_default_resource());
if (!json_config.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str;
return nullptr;
}

Expand Down
18 changes: 18 additions & 0 deletions src/server/cluster/cluster_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>

#include "base/logging.h"
#include "cluster_config.h"
#include "facade/error.h"
#include "slot_set.h"
Expand Down Expand Up @@ -73,4 +74,21 @@ facade::ErrorReply SlotOwnershipError(SlotId slot_id) {
}
return facade::ErrorReply{facade::OpStatus::OK};
}

std::string_view ToString(NodeHealth nh) {
switch (nh) {
case NodeHealth::FAIL:
return "fail";
case NodeHealth::LOADING:
return "loading";
case NodeHealth::ONLINE:
return "online";
case NodeHealth::HIDDEN:
DCHECK(false); // shouldn't be used
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because we shouldn't show it, I've added for consistency

return "hidden";
}
DCHECK(false);
return "undefined_health";
}

} // namespace dfly::cluster
9 changes: 7 additions & 2 deletions src/server/cluster/cluster_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,11 @@ struct ClusterNodeInfo {
}
};

enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE };
enum class NodeHealth : std::uint8_t { FAIL, LOADING, ONLINE, HIDDEN };
std::string_view ToString(NodeHealth nh);

struct ClusterExtendedNodeInfo : ClusterNodeInfo {
NodeHealth health = NodeHealth::NONE;
NodeHealth health = NodeHealth::ONLINE;
};

struct MigrationInfo {
Expand Down Expand Up @@ -159,6 +160,10 @@ class ClusterShardInfos {
return infos_ != r.infos_;
}

auto Unwrap() const {
return infos_;
}

private:
std::vector<ClusterShardInfo> infos_;
};
Expand Down
59 changes: 38 additions & 21 deletions src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,12 @@ ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
return config;
}

// We can't mutate `config` so we copy it over
std::vector<ClusterShardInfo> infos;
infos.reserve(config.size());

for (auto& node : config) {
infos.push_back(node);
infos.rbegin()->replicas.clear();
auto shards_info = config.Unwrap();
for (auto& node : shards_info) {
node.replicas.clear();
}

return ClusterShardInfos{std::move(infos)};
return shards_info;
}

} // namespace
Expand Down Expand Up @@ -149,14 +145,15 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port))
: cluster_announce_port;

info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE};
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port},
NodeHealth::ONLINE};

if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({{.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)},
NodeHealth::NONE});
NodeHealth::ONLINE});
}
}
} else {
Expand All @@ -166,7 +163,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
info.replicas.push_back({{.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))},
NodeHealth::NONE});
NodeHealth::ONLINE});
}

return info;
Expand Down Expand Up @@ -196,7 +193,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
constexpr unsigned int kEntrySize = 4;
auto* rb = static_cast<RedisReplyBuilder*>(builder);

auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) {
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role) {
constexpr unsigned int kNodeSize = 14;
rb->StartArray(kNodeSize);
rb->SendBulkString("id");
Expand All @@ -212,7 +209,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
rb->SendBulkString("replication-offset");
rb->SendLong(0);
rb->SendBulkString("health");
rb->SendBulkString("online");
rb->SendBulkString(ToString(node.health));
};

rb->StartArray(config.size());
Expand All @@ -237,15 +234,22 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
} // namespace

void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterShardsImpl(*shard_infos, builder);
auto config = GetShardInfos(cntx);
if (config) {
// we need to remove hiden replicas
auto shards_info = config->Unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do so many unnecessary copies on a relatively large data structure. We copy it here by value and then another time on line 246.

And it's not only here, I was going over cluster code and we seem to copy by value a lot for not good reason (when const& is perfectly fine on those accessors)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes we have some resource wasting in cluster code, but it is not important now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is no reason to copy by value and it's an easy fix so...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config is constant and shouldn't be changed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not objecting that. return const& to avoid copies then 😄

for (auto& shard : shards_info) {
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(),
[](const auto& r) { return r.health == NodeHealth::HIDDEN; });
shard.replicas.erase(new_end, shard.replicas.end());
}
return ClusterShardsImpl({shards_info}, builder);
}
return builder->SendError(kClusterNotConfigured);
}

namespace {
void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
void ClusterSlotsImpl(ClusterShardInfos config, SinkReplyBuilder* builder) {
// For more details https://redis.io/commands/cluster-slots/
auto* rb = static_cast<RedisReplyBuilder*>(builder);

Expand All @@ -258,10 +262,20 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder
};

unsigned int slot_ranges = 0;
for (const auto& shard : config) {

// we need to remove hiden and fail replicas
auto shards_info = config.Unwrap();
for (auto& shard : shards_info) {
slot_ranges += shard.slot_ranges.Size();
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), [](const auto& r) {
return r.health == NodeHealth::HIDDEN || r.health == NodeHealth::FAIL ||
r.health == NodeHealth::LOADING;
});
shard.replicas.erase(new_end, shard.replicas.end());
}

config = {shards_info};

rb->StartArray(slot_ranges);
for (const auto& shard : config) {
for (const auto& slot_range : shard.slot_ranges) {
Expand Down Expand Up @@ -294,7 +308,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,

string result;

auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id,
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role, string_view master_id,
const SlotRanges& ranges) {
absl::StrAppend(&result, node.id, " ");

Expand All @@ -307,7 +321,8 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,

absl::StrAppend(&result, master_id, " ");

absl::StrAppend(&result, "0 0 0 connected");
absl::StrAppend(&result,
node.health != NodeHealth::FAIL ? "0 0 0 connected" : "0 0 0 disconnected");

for (const auto& range : ranges) {
absl::StrAppend(&result, " ", range.start);
Expand All @@ -324,7 +339,9 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
WriteNode(shard.master, "master", "-", shard.slot_ranges);
for (const auto& replica : shard.replicas) {
// Only the master prints ranges, so we send an empty set for replicas.
WriteNode(replica, "slave", shard.master.id, {});
if (replica.health != NodeHealth::HIDDEN) {
WriteNode(replica, "slave", shard.master.id, {});
}
}
}

Expand Down
Loading
Loading