Skip to content

Commit 6afe123

Browse files
committed
feat: add node health status for CLUSTER SLOTS and SHARDS
1 parent 8d6a184 commit 6afe123

File tree

6 files changed

+146
-55
lines changed

6 files changed

+146
-55
lines changed

src/server/cluster/cluster_config.cc

+24-22
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) {
3131

3232
for (const auto& shard : new_config) {
3333
if (!CheckAndInsertNode(shard.master.id)) {
34-
LOG(WARNING) << "Master " << shard.master.id << " appears more than once";
34+
LOG(ERROR) << "Master " << shard.master.id << " appears more than once";
3535
return false;
3636
}
3737
for (const auto& replica : shard.replicas) {
3838
if (!CheckAndInsertNode(replica.id)) {
39-
LOG(WARNING) << "Replica " << replica.id << " appears more than once";
39+
LOG(ERROR) << "Replica " << replica.id << " appears more than once";
4040
return false;
4141
}
4242
}
@@ -56,21 +56,21 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
5656
for (const auto& shard : new_config) {
5757
for (const auto& slot_range : shard.slot_ranges) {
5858
if (slot_range.start > slot_range.end) {
59-
LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start
60-
<< " is larger than end=" << slot_range.end;
59+
LOG(ERROR) << "Invalid cluster config: start=" << slot_range.start
60+
<< " is larger than end=" << slot_range.end;
6161
return false;
6262
}
6363

6464
for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
6565
if (slot >= slots_found.size()) {
66-
LOG(WARNING) << "Invalid cluster config: slot=" << slot
67-
<< " is bigger than allowed max=" << slots_found.size();
66+
LOG(ERROR) << "Invalid cluster config: slot=" << slot
67+
<< " is bigger than allowed max=" << slots_found.size();
6868
return false;
6969
}
7070

7171
if (slots_found[slot]) {
72-
LOG(WARNING) << "Invalid cluster config: slot=" << slot
73-
<< " was already configured by another slot range.";
72+
LOG(ERROR) << "Invalid cluster config: slot=" << slot
73+
<< " was already configured by another slot range.";
7474
return false;
7575
}
7676

@@ -80,7 +80,7 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
8080
}
8181

8282
if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
83-
LOG(WARNING) << "Invalid cluster config: some slots were missing.";
83+
LOG(ERROR) << "Invalid cluster config: some slots were missing.";
8484
return false;
8585
}
8686

@@ -129,7 +129,7 @@ constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;
129129

130130
template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
131131
if (!obj.is_number()) {
132-
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
132+
LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj;
133133
return nullopt;
134134
}
135135

@@ -138,15 +138,15 @@ template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
138138

139139
optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
140140
if (!slots.is_array()) {
141-
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
141+
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
142142
return nullopt;
143143
}
144144

145145
std::vector<SlotRange> ranges;
146146

147147
for (const auto& range : slots.array_range()) {
148148
if (!range.is_object()) {
149-
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
149+
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
150150
return nullopt;
151151
}
152152

@@ -164,7 +164,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
164164

165165
optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
166166
if (!json.is_object()) {
167-
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
167+
LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json;
168168
return nullopt;
169169
}
170170

@@ -173,7 +173,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
173173
{
174174
auto id = json.at_or_null("id");
175175
if (!id.is_string()) {
176-
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
176+
LOG(ERROR) << kInvalidConfigPrefix << "invalid id for node " << json;
177177
return nullopt;
178178
}
179179
node.id = std::move(id).as_string();
@@ -182,7 +182,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
182182
{
183183
auto ip = json.at_or_null("ip");
184184
if (!ip.is_string()) {
185-
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
185+
LOG(ERROR) << kInvalidConfigPrefix << "invalid ip for node " << json;
186186
return nullopt;
187187
}
188188
node.ip = std::move(ip).as_string();
@@ -200,7 +200,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
200200
auto health = json.at_or_null("health");
201201
if (!health.is_null()) {
202202
if (!health.is_string()) {
203-
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json;
203+
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node " << json;
204204
} else {
205205
auto health_str = std::move(health).as_string();
206206
if (absl::EqualsIgnoreCase(health_str, "FAIL")) {
@@ -209,8 +209,10 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
209209
node.health = NodeHealth::LOADING;
210210
} else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) {
211211
node.health = NodeHealth::ONLINE;
212+
} else if (absl::EqualsIgnoreCase(health_str, "HIDDEN")) {
213+
node.health = NodeHealth::HIDDEN;
212214
} else {
213-
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
215+
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
214216
}
215217
}
216218
}
@@ -237,7 +239,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
237239
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
238240

239241
if (!node_id.is_string() || !ip.is_string() || !port || !slots) {
240-
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
242+
LOG(ERROR) << kInvalidConfigPrefix << "invalid migration json " << json;
241243
return nullopt;
242244
}
243245

@@ -253,15 +255,15 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
253255
std::vector<ClusterShardInfo> config;
254256

255257
if (!json.is_array()) {
256-
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
258+
LOG(ERROR) << kInvalidConfigPrefix << "not an array " << json;
257259
return nullopt;
258260
}
259261

260262
for (const auto& element : json.array_range()) {
261263
ClusterShardInfo shard;
262264

263265
if (!element.is_object()) {
264-
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
266+
LOG(ERROR) << kInvalidConfigPrefix << "shard element is not an object " << element;
265267
return nullopt;
266268
}
267269

@@ -279,7 +281,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
279281

280282
auto replicas = element.at_or_null("replicas");
281283
if (!replicas.is_array()) {
282-
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
284+
LOG(ERROR) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
283285
return nullopt;
284286
}
285287

@@ -309,7 +311,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
309311
std::string_view json_str) {
310312
optional<JsonType> json_config = JsonFromString(json_str, PMR_NS::get_default_resource());
311313
if (!json_config.has_value()) {
312-
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
314+
LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str;
313315
return nullptr;
314316
}
315317

src/server/cluster/cluster_defs.cc

+18
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <absl/strings/str_cat.h>
88
#include <absl/strings/str_join.h>
99

10+
#include "base/logging.h"
1011
#include "cluster_config.h"
1112
#include "facade/error.h"
1213
#include "slot_set.h"
@@ -73,4 +74,21 @@ facade::ErrorReply SlotOwnershipError(SlotId slot_id) {
7374
}
7475
return facade::ErrorReply{facade::OpStatus::OK};
7576
}
77+
78+
std::string_view ToString(NodeHealth nh) {
79+
switch (nh) {
80+
case NodeHealth::FAIL:
81+
return "fail";
82+
case NodeHealth::LOADING:
83+
return "loading";
84+
case NodeHealth::ONLINE:
85+
return "online";
86+
case NodeHealth::HIDDEN:
87+
DCHECK(false); // shouldn't be used
88+
return "hidden";
89+
}
90+
DCHECK(false);
91+
return "undefined_health";
92+
}
93+
7694
} // namespace dfly::cluster

src/server/cluster/cluster_defs.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,11 @@ struct ClusterNodeInfo {
9494
}
9595
};
9696

97-
enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE };
97+
enum class NodeHealth : std::uint8_t { FAIL, LOADING, ONLINE, HIDDEN };
98+
std::string_view ToString(NodeHealth nh);
9899

99100
struct ClusterExtendedNodeInfo : ClusterNodeInfo {
100-
NodeHealth health = NodeHealth::NONE;
101+
NodeHealth health = NodeHealth::FAIL;
101102
};
102103

103104
struct MigrationInfo {
@@ -159,6 +160,10 @@ class ClusterShardInfos {
159160
return infos_ != r.infos_;
160161
}
161162

163+
auto Unwrap() const {
164+
return infos_;
165+
}
166+
162167
private:
163168
std::vector<ClusterShardInfo> infos_;
164169
};

src/server/cluster/cluster_family.cc

+34-19
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,12 @@ ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
7272
return config;
7373
}
7474

75-
// We can't mutate `config` so we copy it over
76-
std::vector<ClusterShardInfo> infos;
77-
infos.reserve(config.size());
78-
79-
for (auto& node : config) {
80-
infos.push_back(node);
81-
infos.rbegin()->replicas.clear();
75+
auto shards_info = config.Unwrap();
76+
for (auto& node : shards_info) {
77+
node.replicas.clear();
8278
}
8379

84-
return ClusterShardInfos{std::move(infos)};
80+
return shards_info;
8581
}
8682

8783
} // namespace
@@ -149,14 +145,15 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
149145
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port))
150146
: cluster_announce_port;
151147

152-
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE};
148+
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port},
149+
NodeHealth::ONLINE};
153150

154151
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
155152
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
156153
info.replicas.push_back({{.id = replica.id,
157154
.ip = replica.address,
158155
.port = static_cast<uint16_t>(replica.listening_port)},
159-
NodeHealth::NONE});
156+
NodeHealth::ONLINE});
160157
}
161158
}
162159
} else {
@@ -166,7 +163,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
166163
info.replicas.push_back({{.id = id_,
167164
.ip = cntx->conn()->LocalBindAddress(),
168165
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))},
169-
NodeHealth::NONE});
166+
NodeHealth::ONLINE});
170167
}
171168

172169
return info;
@@ -196,7 +193,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
196193
constexpr unsigned int kEntrySize = 4;
197194
auto* rb = static_cast<RedisReplyBuilder*>(builder);
198195

199-
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) {
196+
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role) {
200197
constexpr unsigned int kNodeSize = 14;
201198
rb->StartArray(kNodeSize);
202199
rb->SendBulkString("id");
@@ -212,7 +209,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
212209
rb->SendBulkString("replication-offset");
213210
rb->SendLong(0);
214211
rb->SendBulkString("health");
215-
rb->SendBulkString("online");
212+
rb->SendBulkString(ToString(node.health));
216213
};
217214

218215
rb->StartArray(config.size());
@@ -237,15 +234,22 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
237234
} // namespace
238235

239236
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
240-
auto shard_infos = GetShardInfos(cntx);
241-
if (shard_infos) {
242-
return ClusterShardsImpl(*shard_infos, builder);
237+
auto config = GetShardInfos(cntx);
238+
if (config) {
239+
// we need to remove hiden replicas
240+
auto shards_info = config->Unwrap();
241+
for (auto& shard : shards_info) {
242+
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(),
243+
[](const auto& r) { return r.health == NodeHealth::HIDDEN; });
244+
shard.replicas.erase(new_end, shard.replicas.end());
245+
}
246+
return ClusterShardsImpl({shards_info}, builder);
243247
}
244248
return builder->SendError(kClusterNotConfigured);
245249
}
246250

247251
namespace {
248-
void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
252+
void ClusterSlotsImpl(ClusterShardInfos config, SinkReplyBuilder* builder) {
249253
// For more details https://redis.io/commands/cluster-slots/
250254
auto* rb = static_cast<RedisReplyBuilder*>(builder);
251255

@@ -258,10 +262,19 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder
258262
};
259263

260264
unsigned int slot_ranges = 0;
261-
for (const auto& shard : config) {
265+
266+
// we need to remove hiden and fail replicas
267+
auto shards_info = config.Unwrap();
268+
for (auto& shard : shards_info) {
262269
slot_ranges += shard.slot_ranges.Size();
270+
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), [](const auto& r) {
271+
return r.health == NodeHealth::HIDDEN || r.health == NodeHealth::FAIL;
272+
});
273+
shard.replicas.erase(new_end, shard.replicas.end());
263274
}
264275

276+
config = {shards_info};
277+
265278
rb->StartArray(slot_ranges);
266279
for (const auto& shard : config) {
267280
for (const auto& slot_range : shard.slot_ranges) {
@@ -324,7 +337,9 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
324337
WriteNode(shard.master, "master", "-", shard.slot_ranges);
325338
for (const auto& replica : shard.replicas) {
326339
// Only the master prints ranges, so we send an empty set for replicas.
327-
WriteNode(replica, "slave", shard.master.id, {});
340+
if (replica.health != NodeHealth::HIDDEN) {
341+
WriteNode(replica, "slave", shard.master.id, {});
342+
}
328343
}
329344
}
330345

0 commit comments

Comments
 (0)