Skip to content
This repository was archived by the owner on Aug 26, 2024. It is now read-only.

Commit e69b80e

Browse files
martin-suchaPeter Navrátil
andcommitted
Add rack-aware load balancing policy
We need to prefer local rack as there are higher network costs when communicating with nodes in remote rack. This policy prefers nodes from the local rack, then local datacenter and then other nodes. The new RackAwarePolicy is similar to DCAwarePolicy, but does not have the deprecated options. TokenAwarePolicy and other code needed to be modified so that the local rack is propagated. The TokenAware policy was changed to prefer replicas in remote rack / remote DC before trying non-replica nodes. It does not make much sense to not try the replicas and trying the replicas simplifies the codes as now we have three levels local/remote/remote2. This change might not be backwards-compatible, we don't know what exactly this project guarantees in terms of backwards compatibility. Co-Authored-By: Peter Navrátil <[email protected]>
1 parent 4a98a98 commit e69b80e

31 files changed

+999
-88
lines changed

include/cassandra.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2213,6 +2213,50 @@ cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster,
22132213
unsigned used_hosts_per_remote_dc,
22142214
cass_bool_t allow_remote_dcs_for_local_cl);
22152215

2216+
/**
2217+
* Configures the cluster to use Rack-aware load balancing.
2218+
* For each query, all live nodes in a primary 'local' rack are tried first,
2219+
* followed by nodes from local DC and then nodes from other DCs.
2220+
*
2221+
* With empty local_rack and local_dc, default local_dc and local_rack
2222+
* is chosen from the first connected contact point,
2223+
* and no remote hosts are considered in query plans.
2224+
* If relying on this mechanism, be sure to use only contact
2225+
* points from the local rack.
2226+
*
2227+
* @public @memberof CassCluster
2228+
*
2229+
* @param[in] cluster
2230+
* @param[in] local_dc The primary data center to try first
2231+
* @param[in] local_rack The primary rack to try first
2232+
* @return CASS_OK if successful, otherwise an error occurred
2233+
*/
2234+
CASS_EXPORT CassError
2235+
cass_cluster_set_load_balance_rack_aware(CassCluster* cluster,
2236+
const char* local_dc,
2237+
const char* local_rack);
2238+
2239+
2240+
/**
2241+
* Same as cass_cluster_set_load_balance_rack_aware(), but with lengths for string
2242+
* parameters.
2243+
*
2244+
* @public @memberof CassCluster
2245+
*
2246+
* @param[in] cluster
2247+
* @param[in] local_dc
2248+
* @param[in] local_dc_length
2249+
* @return same as cass_cluster_set_load_balance_dc_aware()
2250+
*
2251+
* @see cass_cluster_set_load_balance_dc_aware()
2252+
*/
2253+
CASS_EXPORT CassError
2254+
cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster,
2255+
const char* local_dc,
2256+
size_t local_dc_length,
2257+
const char* local_rack,
2258+
size_t local_rack_length);
2259+
22162260
/**
22172261
* Configures the cluster to use token-aware request routing or not.
22182262
*

src/cluster.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
#include "constants.hpp"
2020
#include "dc_aware_policy.hpp"
21+
#include "rack_aware_policy.hpp"
2122
#include "external.hpp"
2223
#include "logger.hpp"
2324
#include "resolver.hpp"
@@ -240,6 +241,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
240241
const ControlConnectionSchema& schema,
241242
const LoadBalancingPolicy::Ptr& load_balancing_policy,
242243
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
244+
const String& local_rack,
243245
const StringMultimap& supported_options, const ClusterSettings& settings)
244246
: connection_(connection)
245247
, listener_(listener ? listener : &nop_cluster_listener__)
@@ -251,6 +253,7 @@ Cluster::Cluster(const ControlConnection::Ptr& connection, ClusterListener* list
251253
, connected_host_(connected_host)
252254
, hosts_(hosts)
253255
, local_dc_(local_dc)
256+
, local_rack_(local_rack)
254257
, supported_options_(supported_options)
255258
, is_recording_events_(settings.disable_events_on_startup) {
256259
static const auto optimized_msg = "===== Using optimized driver!!! =====\n";

src/cluster.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ class Cluster
257257
* determining the next control connection host.
258258
* @param load_balancing_policies
259259
* @param local_dc The local datacenter determined by the metadata service for initializing the
260+
* @param local_rack The local rack determined by the metadata service for initializing the
260261
* load balancing policies.
261262
* @param supported_options Supported options discovered during control connection.
262263
* @param settings The control connection settings to use for reconnecting the
@@ -267,6 +268,7 @@ class Cluster
267268
const ControlConnectionSchema& schema,
268269
const LoadBalancingPolicy::Ptr& load_balancing_policy,
269270
const LoadBalancingPolicy::Vec& load_balancing_policies, const String& local_dc,
271+
const String& local_rack,
270272
const StringMultimap& supported_options, const ClusterSettings& settings);
271273

272274
/**
@@ -361,6 +363,7 @@ class Cluster
361363
const Host::Ptr& connected_host() const { return connected_host_; }
362364
const TokenMap::Ptr& token_map() const { return token_map_; }
363365
const String& local_dc() const { return local_dc_; }
366+
const String& local_rack() const { return local_rack_; }
364367
const VersionNumber& dse_server_version() const { return connection_->dse_server_version(); }
365368
const StringMultimap& supported_options() const { return supported_options_; }
366369
const ShardPortCalculator* shard_port_calculator() const { return shard_port_calculator_.get(); }
@@ -449,6 +452,7 @@ class Cluster
449452
PreparedMetadata prepared_metadata_;
450453
TokenMap::Ptr token_map_;
451454
String local_dc_;
455+
String local_rack_;
452456
StringMultimap supported_options_;
453457
Timer timer_;
454458
bool is_recording_events_;

src/cluster_config.cpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,27 @@ CassError cass_cluster_set_load_balance_dc_aware_n(CassCluster* cluster, const c
301301
return CASS_OK;
302302
}
303303

304+
CassError cass_cluster_set_load_balance_rack_aware(CassCluster* cluster, const char* local_dc,
305+
const char* local_rack) {
306+
if (local_dc == NULL || local_rack == NULL) {
307+
return CASS_ERROR_LIB_BAD_PARAMS;
308+
}
309+
return cass_cluster_set_load_balance_rack_aware_n(cluster, local_dc, SAFE_STRLEN(local_dc),
310+
local_rack, SAFE_STRLEN(local_rack));
311+
}
312+
313+
CassError cass_cluster_set_load_balance_rack_aware_n(CassCluster* cluster, const char* local_dc,
314+
size_t local_dc_length,
315+
const char* local_rack,
316+
size_t local_rack_length) {
317+
if (local_dc == NULL || local_dc_length == 0 || local_rack == NULL || local_rack_length == 0) {
318+
return CASS_ERROR_LIB_BAD_PARAMS;
319+
}
320+
cluster->config().set_load_balancing_policy(new RackAwarePolicy(
321+
String(local_dc, local_dc_length), String(local_rack, local_rack_length)));
322+
return CASS_OK;
323+
}
324+
304325
void cass_cluster_set_token_aware_routing(CassCluster* cluster, cass_bool_t enabled) {
305326
cluster->config().set_token_aware_routing(enabled == cass_true);
306327
}

src/cluster_connector.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "cluster_connector.hpp"
1818
#include "dc_aware_policy.hpp"
19+
#include "rack_aware_policy.hpp"
1920
#include "protocol.hpp"
2021
#include "random.hpp"
2122
#include "round_robin_policy.hpp"
@@ -177,6 +178,7 @@ void ClusterConnector::on_resolve(ClusterMetadataResolver* resolver) {
177178
}
178179

179180
local_dc_ = resolver->local_dc();
181+
local_rack_ = resolver->local_rack();
180182
remaining_connector_count_ = resolved_contact_points.size();
181183
for (AddressVec::const_iterator it = resolved_contact_points.begin(),
182184
end = resolved_contact_points.end();
@@ -231,7 +233,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
231233
for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end();
232234
it != end; ++it) {
233235
LoadBalancingPolicy::Ptr policy(*it);
234-
policy->init(connected_host, hosts, random_, local_dc_);
236+
policy->init(connected_host, hosts, random_, local_dc_, local_rack_);
235237
policy->register_handles(event_loop_->loop());
236238
}
237239

@@ -248,6 +250,11 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
248250
message = "No hosts available for the control connection using the "
249251
"DC-aware load balancing policy. "
250252
"Check to see if the configured local datacenter is valid";
253+
} else if (dynamic_cast<RackAwarePolicy::RackAwareQueryPlan*>(query_plan.get()) !=
254+
NULL) { // Check if Rack-aware
255+
message = "No hosts available for the control connection using the "
256+
"Rack-aware load balancing policy. "
257+
"Check to see if the configured local datacenter and rack is valid";
251258
} else {
252259
message = "No hosts available for the control connection using the "
253260
"configured load balancing policy";
@@ -258,7 +265,7 @@ void ClusterConnector::on_connect(ControlConnector* connector) {
258265

259266
cluster_.reset(new Cluster(connector->release_connection(), listener_, event_loop_,
260267
connected_host, hosts, connector->schema(), default_policy, policies,
261-
local_dc_, connector->supported_options(), settings_));
268+
local_dc_, local_rack_, connector->supported_options(), settings_));
262269

263270
// Clear any connection errors and set the final negotiated protocol version.
264271
error_code_ = CLUSTER_OK;

src/cluster_connector.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ class ClusterConnector : public RefCounted<ClusterConnector> {
169169
Random* random_;
170170
Metrics* metrics_;
171171
String local_dc_;
172+
String local_rack_;
172173
ClusterSettings settings_;
173174

174175
Callback callback_;

src/cluster_metadata_resolver.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {
4848

4949
const AddressVec& resolved_contact_points() const { return resolved_contact_points_; }
5050
const String& local_dc() const { return local_dc_; }
51+
const String& local_rack() const { return local_rack_; }
5152

5253
protected:
5354
virtual void internal_resolve(uv_loop_t* loop, const AddressVec& contact_points) = 0;
@@ -57,6 +58,7 @@ class ClusterMetadataResolver : public RefCounted<ClusterMetadataResolver> {
5758
protected:
5859
AddressVec resolved_contact_points_;
5960
String local_dc_;
61+
String local_rack_;
6062
Callback callback_;
6163
};
6264

src/dc_aware_policy.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ DCAwarePolicy::DCAwarePolicy(const String& local_dc, size_t used_hosts_per_remot
4343
DCAwarePolicy::~DCAwarePolicy() { uv_rwlock_destroy(&available_rwlock_); }
4444

4545
void DCAwarePolicy::init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
46-
const String& local_dc) {
46+
const String& local_dc, const String& local_rack) {
4747
if (local_dc_.empty()) { // Only override if no local DC was specified.
4848
local_dc_ = local_dc;
4949
}

src/dc_aware_policy.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class DCAwarePolicy : public LoadBalancingPolicy {
3737
~DCAwarePolicy();
3838

3939
virtual void init(const Host::Ptr& connected_host, const HostMap& hosts, Random* random,
40-
const String& local_dc);
40+
const String& local_dc, const String& local_rack);
4141

4242
virtual CassHostDistance distance(const Host::Ptr& host) const;
4343

src/execution_profile.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "cassandra.h"
2424
#include "constants.hpp"
2525
#include "dc_aware_policy.hpp"
26+
#include "rack_aware_policy.hpp"
2627
#include "dense_hash_map.hpp"
2728
#include "latency_aware_policy.hpp"
2829
#include "speculative_execution.hpp"

0 commit comments

Comments
 (0)