Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: raft join failed #181 #188

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,9 @@
#include "base_cmd.h"
#include "client.h"
#include "config.h"
#include "env.h"
#include "kiwi.h"
#include "raft/raft.h"
#include "slow_log.h"
#include "std/log.h"
#include "std/std_string.h"

namespace kiwi {

Expand Down Expand Up @@ -303,6 +300,7 @@ bool PClient::isPeerMaster() const {
return repl_addr.GetIP() == PeerIP() && repl_addr.GetPort() == PeerPort();
}

// check if the client is the target of the cluster command
bool PClient::isClusterCmdTarget() const {
return RAFT_INST.GetClusterCmdCtx().GetPeerIp() == PeerIP() && RAFT_INST.GetClusterCmdCtx().GetPort() == PeerPort();
}
Expand Down
39 changes: 25 additions & 14 deletions src/cmd_raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ void RaftNodeCmd::DoCmdRemove(PClient* client) {
auto ret =
RAFT_INST.GetClusterCmdCtx().Set(ClusterCmdType::kRemove, client, std::move(peer_ip), port, std::move(peer_id));
if (!ret) { // other clients have removed
return client->SetRes(CmdRes::kErrOther, "Other clients have removed");
client->SetRes(CmdRes::kErrOther, "Other clients have removed");
return;
}
RAFT_INST.GetClusterCmdCtx().ConnectTargetNode();
INFO("Sent remove request to leader successfully");
Expand Down Expand Up @@ -150,7 +151,8 @@ bool RaftClusterCmd::DoInitial(PClient* client) {

void RaftClusterCmd::DoCmd(PClient* client) {
if (RAFT_INST.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther, "Already cluster member");
client->SetRes(CmdRes::kErrOther, "Already cluster member");
return;
}

auto cmd = client->argv_[1];
Expand All @@ -164,22 +166,25 @@ void RaftClusterCmd::DoCmd(PClient* client) {

void RaftClusterCmd::DoCmdInit(PClient* client) {
if (client->argv_.size() != 2 && client->argv_.size() != 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return;
}

std::string cluster_id;
if (client->argv_.size() == 3) {
cluster_id = client->argv_[2];
if (cluster_id.size() != RAFT_GROUPID_LEN) {
return client->SetRes(CmdRes::kInvalidParameter,
"Cluster id must be " + std::to_string(RAFT_GROUPID_LEN) + " characters");
client->SetRes(CmdRes::kInvalidParameter,
"Cluster id must have " + std::to_string(RAFT_GROUPID_LEN) + " characters");
return;
}
} else {
cluster_id = kstd::RandomHexChars(RAFT_GROUPID_LEN);
}
auto s = RAFT_INST.Init(cluster_id, false);
if (!s.ok()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str()));
client->SetRes(CmdRes::kErrOther, fmt::format("Failed to init node: ", s.error_str()));
return;
}
client->SetRes(CmdRes::kOK);
}
Expand All @@ -198,35 +203,41 @@ static inline std::optional<std::pair<std::string, int32_t>> GetIpAndPortFromEnd
void RaftClusterCmd::DoCmdJoin(PClient* client) {
// If the node has been initialized, it needs to close the previous initialization and rejoin the other group
if (RAFT_INST.IsInitialized()) {
return client->SetRes(CmdRes::kErrOther,
"A node that has been added to a cluster must be removed \
client->SetRes(CmdRes::kErrOther,
"A node that has been added to a cluster must be removed \
from the old cluster before it can be added to the new cluster");
return;
}

if (client->argv_.size() < 3) {
return client->SetRes(CmdRes::kWrongNum, client->CmdName());
client->SetRes(CmdRes::kWrongNum, client->CmdName());
return;
}

// (KKorpse)TODO: Support multiple nodes join at the same time.
// (KKorpse) TODO: Support multiple nodes join at the same time.
if (client->argv_.size() > 3) {
return client->SetRes(CmdRes::kInvalidParameter, "Too many arguments");
client->SetRes(CmdRes::kInvalidParameter, "Too many arguments");
return;
}

auto addr = client->argv_[2];
if (braft::PeerId(addr).is_empty()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
return;
}

auto ip_port = GetIpAndPortFromEndPoint(addr);
if (!ip_port.has_value()) {
return client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
client->SetRes(CmdRes::kErrOther, fmt::format("Invalid ip::port: {}", addr));
return;
}
auto& [peer_ip, port] = *ip_port;

// Connect target
auto ret = RAFT_INST.GetClusterCmdCtx().Set(ClusterCmdType::kJoin, client, std::move(peer_ip), port);
if (!ret) { // other clients have joined
return client->SetRes(CmdRes::kErrOther, "Other clients have joined");
client->SetRes(CmdRes::kErrOther, "Other clients have joined");
return;
}
RAFT_INST.GetClusterCmdCtx().ConnectTargetNode();
INFO("Sent join request to leader successfully");
Expand Down
3 changes: 3 additions & 0 deletions src/net/thread_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,9 @@ uint64_t ThreadManager<T>::DoTCPConnect(T &t, int fd, const std::shared_ptr<Conn
}

readThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_READ);
if (writeThread_) {
writeThread_->AddNewEvent(connId, fd, BaseEvent::EVENT_NULL); // add null event to write_thread epoll
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想问一下向写线程添加空事件的目的是啥

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raft join failed是因为当读写线程分离时,尝试通过写线程output,结果发现写事件里没有对应的Id,导致write错误,真正解决问题的就是这里

}
return connId;
}

Expand Down
9 changes: 4 additions & 5 deletions src/raft/raft.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <cassert>

#include "braft/raft.h"
#include "braft/snapshot.h"
#include "braft/util.h"
#include "brpc/server.h"
#include "gflags/gflags.h"
Expand Down Expand Up @@ -218,7 +217,7 @@ std::string Raft::GetLeaderAddress() const {
auto id = node_->leader_id();
// The cluster does not have a leader.
if (id.is_empty()) {
return std::string();
return "";
}

id.addr.port -= g_config.raft_port_offset;
Expand Down Expand Up @@ -311,7 +310,7 @@ void Raft::SendNodeRequest(PClient* client) {
void Raft::SendNodeInfoRequest(PClient* client, const std::string& info_type) {
assert(client);

client->AppendArrayLen(int64_t(2));
client->AppendArrayLen(2);
client->AppendString("INFO");
client->AppendString(info_type);
client->SendPacket();
Expand All @@ -326,7 +325,7 @@ void Raft::SendNodeAddRequest(PClient* client) {
auto port = g_config.port + kiwi::g_config.raft_port_offset;
auto raw_addr = g_config.ip + ":" + std::to_string(port);

client->AppendArrayLen(int64_t(4));
client->AppendArrayLen((4));
client->AppendString("RAFT.NODE");
client->AppendString("ADD");
client->AppendString(std::to_string(unused_node_id));
Expand All @@ -337,7 +336,7 @@ void Raft::SendNodeAddRequest(PClient* client) {

void Raft::SendNodeRemoveRequest(PClient* client) {
assert(client);
client->AppendArrayLen(int64_t(3));
client->AppendArrayLen(3);
client->AppendString("RAFT.NODE");
client->AppendString("REMOVE");
client->AppendString(cluster_cmd_ctx_.GetPeerID());
Expand Down
4 changes: 1 addition & 3 deletions src/raft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@

#pragma once

#include <filesystem>
#include <future>
#include <mutex>
#include <string>
#include <tuple>
#include <vector>

#include "braft/file_system_adaptor.h"
Expand Down Expand Up @@ -39,7 +37,7 @@ namespace kiwi {
// class EventLoop;
class Binlog;

enum ClusterCmdType {
enum ClusterCmdType : int8_t {
kNone,
kJoin,
kRemove,
Expand Down
1 change: 0 additions & 1 deletion src/raft/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "std/log.h"
#include "std/std_string.h"

#include "config.h"
#include "raft.h"
#include "store.h"

Expand Down
3 changes: 3 additions & 0 deletions src/replication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ void PReplication::SendToSlaves(const std::vector<PString>& params) {
void PReplication::Cron() {
static unsigned pingCron = 0;

// Every 50 calls to Cron, this method will traverse the slaves_ list,
// sending PING requests to the online slave nodes to confirm their status.
// If the reference to a slave node has become invalid, it will be removed from the list.
if (pingCron++ % 50 == 0) {
for (auto it = slaves_.begin(); it != slaves_.end();) {
auto cli = it->lock();
Expand Down
4 changes: 2 additions & 2 deletions src/replication.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ inline void SaveCommand(const std::vector<PString>& params, DEST& dst) {
}

// master side
enum PSlaveState {
enum PSlaveState : int8_t {
kPSlaveStateNone,
kPSlaveStateWaitBgsaveStart, // 有非sync的bgsave进行 要等待
kPSlaveStateWaitBgsaveEnd, // sync bgsave正在进行
Expand All @@ -77,7 +77,7 @@ struct PSlaveInfo {
};

// slave side
enum PReplState {
enum PReplState : int8_t {
kPReplStateNone,
kPReplStateConnecting,
kPReplStateConnected,
Expand Down
Loading