Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update slot mapping with time interval #595

Merged
merged 2 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2299,6 +2299,8 @@ If master is down, the cluster will promote one of its replicas to be the new ma
- When the master is down, *redis-plus-plus* losts connection to it. In this case, if you try to send commands to this master, *redis-plus-plus* will try to update slot-node mapping from other nodes. If the mapping remains unchanged, i.e. new master hasn't been elected yet, it fails to send command to Redis Cluster and throws exception.
- When the new master has been elected, the slot-node mapping will be updated by the cluster. In this case, if you send commands to the cluster, *redis-plus-plus* can get an update-to-date mapping, and sends commands to the new master.

Since redis-plus-plus 1.3.13, it also updates the slot-node mapping every `ClusterOptions::slot_map_refresh_interval` time interval (by default, it updates every 10 seconds).

### Redis Sentinel

[Redis Sentinel provides high availability for Redis](https://redis.io/topics/sentinel). If Redis master is down, Redis Sentinels will elect a new master from slaves, i.e. failover. Besides, Redis Sentinel can also act like a configuration provider for clients, and clients can query master or slave address from Redis Sentinel. So that if a failover occurs, clients can ask the new master address from Redis Sentinel.
Expand Down
2 changes: 0 additions & 2 deletions src/sw/redis++/async_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,6 @@ class ClusterEvent : public CommandEvent<Result, ResultParser> {
// i.e. ClosedError or IoError, we need to update node-slot mapping.
try {
std::rethrow_exception(err);
} catch (const SlotUncoveredError &) {
detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent));
} catch (const IoError &) {
detail::update_shards(_key, _pool, AsyncEventUPtr(new UpdateShardsEvent));
} catch (const ClosedError &) {
Expand Down
20 changes: 13 additions & 7 deletions src/sw/redis++/async_redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,24 @@ AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
_loop = std::make_shared<EventLoop>();
}

_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role);
_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role, ClusterOptions{});
}

AsyncRedisCluster::AsyncRedisCluster(const ConnectionOptions &opts,
const ConnectionPoolOptions &pool_opts,
Role role,
const ClusterOptions &cluster_opts,
const EventLoopSPtr &loop) : _loop(loop) {
if (!_loop) {
_loop = std::make_shared<EventLoop>();
}

_pool = std::make_shared<AsyncShardsPool>(_loop, pool_opts, opts, role, cluster_opts);
}

AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool.
Expand All @@ -52,8 +62,6 @@ AsyncRedis AsyncRedisCluster::redis(const StringView &hash_tag, bool new_connect
AsyncSubscriber AsyncRedisCluster::subscriber() {
assert(_pool);

_pool->update();

auto opts = _pool->connection_options();

auto connection = std::make_shared<AsyncConnection>(opts, _loop.get());
Expand All @@ -65,8 +73,6 @@ AsyncSubscriber AsyncRedisCluster::subscriber() {
AsyncSubscriber AsyncRedisCluster::subscriber(const StringView &hash_tag) {
assert(_pool);

_pool->update();

auto opts = _pool->connection_options(hash_tag);

auto connection = std::make_shared<AsyncConnection>(opts, _loop.get());
Expand Down
6 changes: 6 additions & 0 deletions src/sw/redis++/async_redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class AsyncRedisCluster {
Role role = Role::MASTER,
const EventLoopSPtr &loop = nullptr);

AsyncRedisCluster(const ConnectionOptions &opts,
const ConnectionPoolOptions &pool_opts,
Role role,
const ClusterOptions &cluster_opts,
const EventLoopSPtr &loop = nullptr);

explicit AsyncRedisCluster(const std::string &uri) : AsyncRedisCluster(Uri(uri)) {}

AsyncRedisCluster(const AsyncRedisCluster &) = delete;
Expand Down
14 changes: 10 additions & 4 deletions src/sw/redis++/async_shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ const std::size_t AsyncShardsPool::SHARDS;
AsyncShardsPool::AsyncShardsPool(const EventLoopSPtr &loop,
const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role) :
Role role,
const ClusterOptions &cluster_opts) :
_pool_opts(pool_opts),
_connection_opts(connection_opts),
_role(role),
_cluster_opts(cluster_opts),
_loop(loop) {
assert(loop);

Expand Down Expand Up @@ -208,8 +210,12 @@ auto AsyncShardsPool::_fetch_events() -> std::queue<RedeliverEvent> {
std::queue<RedeliverEvent> events;

std::unique_lock<std::mutex> lock(_mutex);
if (_events.empty()) {
_cv.wait(lock, [this]() { return !(this->_events).empty(); } );

if (!_cv.wait_for(lock,
_cluster_opts.slot_map_refresh_interval,
[this]() { return !(this->_events).empty(); })) {
// Reach timeout, but there's still no event, put an update event.
_events.push(RedeliverEvent{{}, AsyncEventUPtr(new UpdateShardsEvent)});
}

events.swap(_events);
Expand Down Expand Up @@ -238,7 +244,7 @@ Shards AsyncShardsPool::_get_shards(const std::string &host, int port) {
auto opts = _connection_opts;
opts.host = host;
opts.port = port;
ShardsPool pool(_pool_opts, opts, _role);
ShardsPool pool(_pool_opts, opts, _role, _cluster_opts);

return pool.shards();
}
Expand Down
5 changes: 4 additions & 1 deletion src/sw/redis++/async_shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class AsyncShardsPool {
AsyncShardsPool(const EventLoopSPtr &loop,
const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role);
Role role,
const ClusterOptions &cluster_opts);

AsyncConnectionPoolSPtr fetch(const StringView &key);

Expand Down Expand Up @@ -101,6 +102,8 @@ class AsyncShardsPool {

Role _role = Role::MASTER;

ClusterOptions _cluster_opts;

Shards _shards;

NodeMap _pools;
Expand Down
2 changes: 1 addition & 1 deletion src/sw/redis++/crc16.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
* Output for "123456789" : 31C3
*/

#include "utils.h"
#include "sw/redis++/utils.h"
#include <cstdint>

namespace sw {
Expand Down
10 changes: 0 additions & 10 deletions src/sw/redis++/redis_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ RedisCluster::RedisCluster(const Uri &uri) :
Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -46,8 +44,6 @@ Redis RedisCluster::redis(const StringView &hash_tag, bool new_connection) {
Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -60,8 +56,6 @@ Pipeline RedisCluster::pipeline(const StringView &hash_tag, bool new_connection)
Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bool new_connection) {
assert(_pool);

_pool->async_update();

auto pool = _pool->fetch(hash_tag);
if (new_connection) {
// Create a new pool
Expand All @@ -74,17 +68,13 @@ Transaction RedisCluster::transaction(const StringView &hash_tag, bool piped, bo
Subscriber RedisCluster::subscriber() {
assert(_pool);

_pool->async_update();

auto opts = _pool->connection_options();
return Subscriber(Connection(opts));
}

Subscriber RedisCluster::subscriber(const StringView &hash_tag) {
assert(_pool);

_pool->async_update();

auto opts = _pool->connection_options(hash_tag);
return Subscriber(Connection(opts));
}
Expand Down
3 changes: 2 additions & 1 deletion src/sw/redis++/redis_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class RedisCluster {
public:
explicit RedisCluster(const ConnectionOptions &connection_opts,
const ConnectionPoolOptions &pool_opts = {},
Role role = Role::MASTER) : _pool(new ShardsPool(pool_opts, connection_opts, role)) {}
Role role = Role::MASTER,
const ClusterOptions &cluster_opts = {}) : _pool(new ShardsPool(pool_opts, connection_opts, role, cluster_opts)) {}

// Construct RedisCluster with URI:
// "tcp://127.0.0.1" or "tcp://127.0.0.1:6379"
Expand Down
5 changes: 0 additions & 5 deletions src/sw/redis++/redis_cluster.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1351,11 +1351,6 @@ ReplyUPtr RedisCluster::_command(Cmd cmd, const StringView &key, Args &&...args)
SafeConnection safe_connection(*pool);

return _command(cmd, safe_connection.connection(), std::forward<Args>(args)...);
} catch (const SlotUncoveredError &) {
// Some slot is not covered, update asynchronously to see if new node added.
// Check https://github.com/sewenew/redis-plus-plus/issues/255 for detail.
// TODO: should we replace other 'update's with 'async_update's?
_pool->async_update();
} catch (const IoError &) {
// When master is down, one of its replicas will be promoted to be the new master.
// If we try to send command to the old master, we'll get an *IoError*.
Expand Down
54 changes: 13 additions & 41 deletions src/sw/redis++/shards_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const std::size_t ShardsPool::SHARDS;

ShardsPool::ShardsPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role) :
Role role,
const ClusterOptions &cluster_opts) :
_pool_opts(pool_opts),
_connection_opts(connection_opts),
_role(role) {
_role(role),
_cluster_opts(cluster_opts) {
if (_connection_opts.type != ConnectionType::TCP) {
throw Error("Only support TCP connection for Redis Cluster");
}
Expand All @@ -47,7 +49,7 @@ ShardsPool::~ShardsPool() {
{
std::lock_guard<std::mutex> lock(_mutex);

_update_status = UpdateStatus::STOP;
_stop = true;
}

_cv.notify_one();
Expand Down Expand Up @@ -172,22 +174,6 @@ std::vector<ConnectionPoolSPtr> ShardsPool::pools() {
return nodes;
}

void ShardsPool::async_update() {
bool should_update = false;
{
std::lock_guard<std::mutex> lock(_mutex);

if (_update_status == UpdateStatus::UPDATED) {
should_update = true;
_update_status = UpdateStatus::STALE;
}
}

if (should_update) {
_cv.notify_one();
}
}

void ShardsPool::_init_pool(const Shards &shards) {
for (const auto &shard : shards) {
_add_node(shard.second);
Expand Down Expand Up @@ -385,34 +371,20 @@ auto ShardsPool::_add_node(const Node &node) -> NodeMap::iterator {
void ShardsPool::_run() {
while (true) {
std::unique_lock<std::mutex> lock(_mutex);
if (_update_status == UpdateStatus::UPDATED) {
_cv.wait(lock, [this]() { return this->_update_status != UpdateStatus::UPDATED; });
}

if (_update_status == UpdateStatus::STOP) {
if (_cv.wait_for(lock,
_cluster_opts.slot_map_refresh_interval,
[this]() { return this->_stop; })) {
break;
} else if (_update_status == UpdateStatus::STALE) {
lock.unlock();

_do_async_update();
} else {
assert("invalid UpdateStatus");
}
}
}

void ShardsPool::_do_async_update() {
try {
update();

std::lock_guard<std::mutex> lock(_mutex);
lock.unlock();

if (_update_status != UpdateStatus::STOP) {
_update_status = UpdateStatus::UPDATED;
try {
update();
} catch (...) {
// Ignore exceptions.
}
} catch (...) {
// Ignore exceptions.
// TODO: should we sleep a while?
}
}

Expand Down
22 changes: 11 additions & 11 deletions src/sw/redis++/shards_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define SEWENEW_REDISPLUSPLUS_SHARDS_POOL_H

#include <cassert>
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>
Expand All @@ -34,10 +35,13 @@ namespace sw {

namespace redis {

struct ClusterOptions {
// Automatically update slot map every `slot_map_refresh_interval`.
std::chrono::milliseconds slot_map_refresh_interval = std::chrono::seconds(10);
};

class ShardsPool {
public:
ShardsPool() = default;

ShardsPool(const ShardsPool &that) = delete;
ShardsPool& operator=(const ShardsPool &that) = delete;

Expand All @@ -48,7 +52,8 @@ class ShardsPool {

ShardsPool(const ConnectionPoolOptions &pool_opts,
const ConnectionOptions &connection_opts,
Role role);
Role role,
const ClusterOptions &cluster_opts);

// Fetch a connection by key.
ConnectionPoolSPtr fetch(const StringView &key);
Expand All @@ -69,8 +74,6 @@ class ShardsPool {

std::vector<ConnectionPoolSPtr> pools();

void async_update();

private:
void _init_pool(const Shards &shards);

Expand Down Expand Up @@ -117,12 +120,7 @@ class ShardsPool {

NodeMap _pools;

enum class UpdateStatus {
STALE = 0,
UPDATED,
STOP
};
UpdateStatus _update_status = UpdateStatus::UPDATED;
bool _stop = false;

std::thread _worker;

Expand All @@ -132,6 +130,8 @@ class ShardsPool {

Role _role = Role::MASTER;

ClusterOptions _cluster_opts;

static const std::size_t SHARDS = 16383;
};

Expand Down
Loading