-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add node health status for CLUSTER SLOTS and SHARDS #4767
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -72,16 +72,12 @@ ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) { | |
return config; | ||
} | ||
|
||
// We can't mutate `config` so we copy it over | ||
std::vector<ClusterShardInfo> infos; | ||
infos.reserve(config.size()); | ||
|
||
for (auto& node : config) { | ||
infos.push_back(node); | ||
infos.rbegin()->replicas.clear(); | ||
auto shards_info = config.Unwrap(); | ||
for (auto& node : shards_info) { | ||
node.replicas.clear(); | ||
} | ||
|
||
return ClusterShardInfos{std::move(infos)}; | ||
return shards_info; | ||
} | ||
|
||
} // namespace | ||
|
@@ -149,14 +145,15 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co | |
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port)) | ||
: cluster_announce_port; | ||
|
||
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE}; | ||
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, | ||
NodeHealth::ONLINE}; | ||
|
||
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) { | ||
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) { | ||
info.replicas.push_back({{.id = replica.id, | ||
.ip = replica.address, | ||
.port = static_cast<uint16_t>(replica.listening_port)}, | ||
NodeHealth::NONE}); | ||
NodeHealth::ONLINE}); | ||
} | ||
} | ||
} else { | ||
|
@@ -166,7 +163,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co | |
info.replicas.push_back({{.id = id_, | ||
.ip = cntx->conn()->LocalBindAddress(), | ||
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))}, | ||
NodeHealth::NONE}); | ||
NodeHealth::ONLINE}); | ||
} | ||
|
||
return info; | ||
|
@@ -196,7 +193,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde | |
constexpr unsigned int kEntrySize = 4; | ||
auto* rb = static_cast<RedisReplyBuilder*>(builder); | ||
|
||
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) { | ||
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role) { | ||
constexpr unsigned int kNodeSize = 14; | ||
rb->StartArray(kNodeSize); | ||
rb->SendBulkString("id"); | ||
|
@@ -212,7 +209,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde | |
rb->SendBulkString("replication-offset"); | ||
rb->SendLong(0); | ||
rb->SendBulkString("health"); | ||
rb->SendBulkString("online"); | ||
rb->SendBulkString(ToString(node.health)); | ||
}; | ||
|
||
rb->StartArray(config.size()); | ||
|
@@ -237,15 +234,22 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde | |
} // namespace | ||
|
||
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) { | ||
auto shard_infos = GetShardInfos(cntx); | ||
if (shard_infos) { | ||
return ClusterShardsImpl(*shard_infos, builder); | ||
auto config = GetShardInfos(cntx); | ||
if (config) { | ||
// we need to remove hiden replicas | ||
auto shards_info = config->Unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do so many unnecessary copies on a relatively large data structure. We copy it here by value and then another time on line 246. And it's not only here, I was going over cluster code and we seem to copy by value a lot for not good reason (when const& is perfectly fine on those accessors) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes we have some resource wasting in cluster code, but it is not important now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, there is no reason to copy by value and it's an easy fix so... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. config is constant and shouldn't be changed There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not objecting that. |
||
for (auto& shard : shards_info) { | ||
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), | ||
[](const auto& r) { return r.health == NodeHealth::HIDDEN; }); | ||
shard.replicas.erase(new_end, shard.replicas.end()); | ||
} | ||
return ClusterShardsImpl({shards_info}, builder); | ||
} | ||
return builder->SendError(kClusterNotConfigured); | ||
} | ||
|
||
namespace { | ||
void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) { | ||
void ClusterSlotsImpl(ClusterShardInfos config, SinkReplyBuilder* builder) { | ||
// For more details https://redis.io/commands/cluster-slots/ | ||
auto* rb = static_cast<RedisReplyBuilder*>(builder); | ||
|
||
|
@@ -258,10 +262,19 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder | |
}; | ||
|
||
unsigned int slot_ranges = 0; | ||
for (const auto& shard : config) { | ||
|
||
// we need to remove hiden and fail replicas | ||
auto shards_info = config.Unwrap(); | ||
for (auto& shard : shards_info) { | ||
slot_ranges += shard.slot_ranges.Size(); | ||
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), [](const auto& r) { | ||
return r.health == NodeHealth::HIDDEN || r.health == NodeHealth::FAIL; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we should include We can't have clients connecting to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's just a status to be compatible with redis There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Though the original goal for adding this node health state was to avoid clients connecting to replicas syncing with the master (that aren't reachable in Dragonfly cloud) - which won't be fixed if we include
So should we mark those replicas as hidden then when they aren't yet synced with the master? (In which case we'll never use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You control this info from config. So if you decide that loading state isn't needed you can send hidden There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @BorysTheDev I think that when the replica is in loading state there are some cluster client commands which should return the node and there are other commands which should not return it. So I believe this logic should be in dragonfly and not in cluster manager There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the client is using cluster shards command it should see the loading state and it should know not to redirect traffic to it |
||
}); | ||
shard.replicas.erase(new_end, shard.replicas.end()); | ||
} | ||
|
||
config = {shards_info}; | ||
|
||
rb->StartArray(slot_ranges); | ||
for (const auto& shard : config) { | ||
for (const auto& slot_range : shard.slot_ranges) { | ||
|
@@ -324,7 +337,9 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, | |
WriteNode(shard.master, "master", "-", shard.slot_ranges); | ||
for (const auto& replica : shard.replicas) { | ||
// Only the master prints ranges, so we send an empty set for replicas. | ||
WriteNode(replica, "slave", shard.master.id, {}); | ||
if (replica.health != NodeHealth::HIDDEN) { | ||
BorysTheDev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
WriteNode(replica, "slave", shard.master.id, {}); | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because we shouldn't show it, I've added for consistency