Skip to content

Commit c390c67

Browse files
authored
Add support for priority load balancing (whatyouhide#89)
1 parent 3fd0ede commit c390c67

File tree

4 files changed

+93
-30
lines changed

4 files changed

+93
-30
lines changed

README.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ This library is in its early stages when it comes to features, but we're already
2020
* batch queries
2121
* page streaming
2222
* compression
23-
* clustering (only random load balancing for now)
23+
* clustering (random and priority load balancing for now)
2424
* customizable retry strategies for failed queries
2525
* user-defined types
2626
* authentication
2727

28-
In the future, we plan to add more features, like more strategies for clustering. See [the documentation][documentation] for detailed explanation of how the supported features work.
28+
In the future, we plan to add more features, like more load balancing strategies for clustering. See [the documentation][documentation] for detailed explanation of how the supported features work.
2929

3030
## Installation
3131

lib/xandra/cluster.ex

+66-22
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,13 @@ defmodule Xandra.Cluster do
4444
4545
## Load balancing strategies
4646
47-
For now, the only load balancing "strategy" implemented is random choice of
48-
nodes: when you execute a query against a `Xandra.Cluster` connection, it will
49-
choose one of connected nodes at random and execute the query on that node.
47+
For now, there are two load balancing "strategies" implemented:
48+
49+
* `:random` - it will choose one of the connected nodes at random and
50+
execute the query on that node.
51+
52+
* `:priority` - it will choose a node to execute the query according
53+
to the order nodes appear in `:nodes`.
5054
5155
## Disconnections and reconnections
5256
@@ -67,6 +71,8 @@ defmodule Xandra.Cluster do
6771
* `:underlying_pool` - (module) the `DBConnection.Pool` pool used to pool
6872
connections to each of the specified nodes.
6973
74+
* `:load_balancing` - (atom) load balancing "strategy".
75+
7076
To pass options to the underlying pool, you can just pass them alongside other
7177
options to `Xandra.start_link/1`.
7278
"""
@@ -76,13 +82,21 @@ defmodule Xandra.Cluster do
7682
@behaviour DBConnection.Pool
7783

7884
@default_pool_module DBConnection.Connection
85+
@default_load_balancing :random
7986

8087
alias __MODULE__.{ControlConnection, StatusChange}
8188
alias Xandra.ConnectionError
8289

8390
require Logger
8491

85-
defstruct [:options, :pool_supervisor, :pool_module, pools: %{}]
92+
defstruct [
93+
:options,
94+
:node_refs,
95+
:load_balancing,
96+
:pool_supervisor,
97+
:pool_module,
98+
pools: %{},
99+
]
86100

87101
def ensure_all_started(options, type) do
88102
{pool_module, options} = Keyword.pop(options, :underlying_pool, @default_pool_module)
@@ -95,17 +109,22 @@ defmodule Xandra.Cluster do
95109

96110
def start_link(Xandra.Connection, options) do
97111
{pool_module, options} = Keyword.pop(options, :underlying_pool, @default_pool_module)
112+
{load_balancing, options} = Keyword.pop(options, :load_balancing, @default_load_balancing)
98113
{nodes, options} = Keyword.pop(options, :nodes)
99114
{name, options} = Keyword.pop(options, :name)
100115

101-
state = %__MODULE__{options: Keyword.delete(options, :pool), pool_module: pool_module}
116+
state = %__MODULE__{
117+
options: Keyword.delete(options, :pool),
118+
load_balancing: load_balancing,
119+
pool_module: pool_module,
120+
}
102121
GenServer.start_link(__MODULE__, {state, nodes}, name: name)
103122
end
104123

105124
def init({%__MODULE__{options: options} = state, nodes}) do
106125
{:ok, pool_supervisor} = Supervisor.start_link([], strategy: :one_for_one, max_restarts: 0)
107-
start_control_connections(nodes, options)
108-
{:ok, %{state | pool_supervisor: pool_supervisor}}
126+
node_refs = start_control_connections(nodes, options)
127+
{:ok, %{state | node_refs: node_refs, pool_supervisor: pool_supervisor}}
109128
end
110129

111130
def checkout(cluster, options) do
@@ -132,25 +151,30 @@ defmodule Xandra.Cluster do
132151
pool_module.stop(pool_ref, error, state, options)
133152
end
134153

135-
def activate(cluster, address, port) do
136-
GenServer.cast(cluster, {:activate, address, port})
154+
def activate(cluster, node_ref, address, port) do
155+
GenServer.cast(cluster, {:activate, node_ref, address, port})
137156
end
138157

139158
def update(cluster, status_change) do
140159
GenServer.cast(cluster, {:update, status_change})
141160
end
142161

143-
def handle_call(:checkout, _from, %__MODULE__{pools: pools} = state) do
162+
def handle_call(:checkout, _from, %__MODULE__{} = state) do
163+
%{node_refs: node_refs,
164+
load_balancing: load_balancing,
165+
pool_module: pool_module,
166+
pools: pools} = state
167+
144168
if Enum.empty?(pools) do
145169
{:reply, {:error, :empty}, state}
146170
else
147-
{_address, pool} = Enum.random(pools)
148-
{:reply, {:ok, state.pool_module, pool}, state}
171+
pool = select_pool(load_balancing, pools, node_refs)
172+
{:reply, {:ok, pool_module, pool}, state}
149173
end
150174
end
151175

152-
def handle_cast({:activate, address, port}, %__MODULE__{} = state) do
153-
{:noreply, start_pool(state, address, port)}
176+
def handle_cast({:activate, node_ref, address, port}, %__MODULE__{} = state) do
177+
{:noreply, start_pool(state, node_ref, address, port)}
154178
end
155179

156180
def handle_cast({:update, %StatusChange{} = status_change}, %__MODULE__{} = state) do
@@ -159,19 +183,26 @@ defmodule Xandra.Cluster do
159183

160184
defp start_control_connections(nodes, options) do
161185
cluster = self()
162-
Enum.each(nodes, fn({address, port}) ->
163-
ControlConnection.start_link(cluster, address, port, options)
186+
Enum.map(nodes, fn {address, port} ->
187+
node_ref = make_ref()
188+
ControlConnection.start_link(cluster, node_ref, address, port, options)
189+
{node_ref, nil}
164190
end)
165191
end
166192

167-
defp start_pool(state, address, port) do
168-
%{options: options, pool_module: pool_module,
169-
pools: pools, pool_supervisor: pool_supervisor} = state
193+
defp start_pool(state, node_ref, address, port) do
194+
%{options: options,
195+
node_refs: node_refs,
196+
pool_module: pool_module,
197+
pool_supervisor: pool_supervisor,
198+
pools: pools} = state
199+
170200
options = [address: address, port: port] ++ options
171201
child_spec = pool_module.child_spec(Xandra.Connection, options, id: address)
172202
case Supervisor.start_child(pool_supervisor, child_spec) do
173203
{:ok, pool} ->
174-
%{state | pools: Map.put(pools, address, pool)}
204+
node_refs = List.keystore(node_refs, node_ref, 0, {node_ref, address})
205+
%{state | node_refs: node_refs, pools: Map.put(pools, address, pool)}
175206
{:error, {:already_started, _pool}} ->
176207
Logger.warn(fn ->
177208
"Xandra cluster #{inspect(name())} " <>
@@ -190,7 +221,8 @@ defmodule Xandra.Cluster do
190221
end
191222

192223
defp toggle_pool(state, %{effect: "UP", address: address}) do
193-
%{pools: pools, pool_supervisor: pool_supervisor} = state
224+
%{pool_supervisor: pool_supervisor, pools: pools} = state
225+
194226
case Supervisor.restart_child(pool_supervisor, address) do
195227
{:error, reason} when reason in [:not_found, :running, :restarting] ->
196228
state
@@ -200,8 +232,20 @@ defmodule Xandra.Cluster do
200232
end
201233

202234
defp toggle_pool(state, %{effect: "DOWN", address: address}) do
203-
%{pools: pools, pool_supervisor: pool_supervisor} = state
235+
%{pool_supervisor: pool_supervisor, pools: pools} = state
236+
204237
Supervisor.terminate_child(pool_supervisor, address)
205238
%{state | pools: Map.delete(pools, address)}
206239
end
240+
241+
defp select_pool(:random, pools, _node_refs) do
242+
{_address, pool} = Enum.random(pools)
243+
pool
244+
end
245+
246+
defp select_pool(:priority, pools, node_refs) do
247+
Enum.find_value(node_refs, fn {_node_ref, address} ->
248+
Map.get(pools, address)
249+
end)
250+
end
207251
end

lib/xandra/cluster/control_connection.ex

+12-5
Original file line numberDiff line numberDiff line change
@@ -10,17 +10,24 @@ defmodule Xandra.Cluster.ControlConnection do
1010
@socket_options [packet: :raw, mode: :binary, active: false]
1111

1212
defstruct [
13+
:cluster,
14+
:node_ref,
1315
:address,
1416
:port,
15-
:cluster,
1617
:socket,
1718
:options,
1819
new: true,
1920
buffer: <<>>,
2021
]
2122

22-
def start_link(cluster, address, port, options) do
23-
state = %__MODULE__{cluster: cluster, address: address, port: port, options: options}
23+
def start_link(cluster, node_ref, address, port, options) do
24+
state = %__MODULE__{
25+
cluster: cluster,
26+
node_ref: node_ref,
27+
address: address,
28+
port: port,
29+
options: options,
30+
}
2431
Connection.start_link(__MODULE__, state)
2532
end
2633

@@ -70,9 +77,9 @@ defmodule Xandra.Cluster.ControlConnection do
7077
{:ok, state}
7178
end
7279

73-
defp report_active(%{new: true, cluster: cluster, socket: socket} = state) do
80+
defp report_active(%{new: true, cluster: cluster, node_ref: node_ref, socket: socket} = state) do
7481
with {:ok, {address, port}} <- :inet.peername(socket) do
75-
Xandra.Cluster.activate(cluster, address, port)
82+
Xandra.Cluster.activate(cluster, node_ref, address, port)
7683
{:ok, %{state | new: false, address: address}}
7784
end
7885
end

test/integration/clustering_test.exs

+13-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ defmodule ClusteringTest do
2222
statement = "USE #{keyspace}"
2323

2424
log = capture_log(fn ->
25-
start_options = [nodes: ["127.0.0.1", "127.0.0.1", "127.0.0.2"], name: TestCluster]
25+
start_options = [
26+
nodes: ["127.0.0.1", "127.0.0.1", "127.0.0.2"],
27+
name: TestCluster,
28+
load_balancing: :random,
29+
]
2630
{:ok, cluster} = Xandra.start_link(call_options ++ start_options)
2731

2832
assert await_connected(cluster, call_options, &Xandra.execute!(&1, statement))
@@ -31,4 +35,12 @@ defmodule ClusteringTest do
3135

3236
assert Xandra.execute!(TestCluster, statement, [], call_options)
3337
end
38+
39+
test "priority load balancing", %{keyspace: keyspace} do
40+
call_options = [pool: Xandra.Cluster]
41+
start_options = [load_balancing: :priority]
42+
{:ok, cluster} = Xandra.start_link(call_options ++ start_options)
43+
44+
assert await_connected(cluster, call_options, &Xandra.execute!(&1, "USE #{keyspace}"))
45+
end
3446
end

0 commit comments

Comments
 (0)