Skip to content

Commit bdb6f9d

Browse files
committed
Use new RA callback
1 parent ab42742 commit bdb6f9d

File tree

8 files changed

+201
-24
lines changed

8 files changed

+201
-24
lines changed

BUILD.bazel

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ rabbitmq_home(
7676
plugins = PLUGINS,
7777
)
7878

79+
rabbitmq_run_command(
80+
name = "add-node",
81+
rabbitmq_run = ":rabbitmq-run",
82+
subcommand = "add-node",
83+
)
84+
7985
rabbitmq_run(
8086
name = "rabbitmq-run",
8187
home = ":broker-home",

MODULE.bazel

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -271,11 +271,17 @@ erlang_package.hex_package(
271271
version = "0.2.1",
272272
)
273273

274-
erlang_package.hex_package(
275-
name = "ra",
274+
#erlang_package.hex_package(
275+
# name = "ra",
276+
# build_file = "@rabbitmq-server//bazel:BUILD.ra",
277+
# sha256 = "13b03f02cf6c1837c527edd4a953f0c09da0abad0af6985b64bfd66943c4c5c3",
278+
# version = "2.5.1",
279+
#)
280+
281+
erlang_package.git_package(
282+
branch = "sunge/handle_status_callback",
283+
repository = "SimonUnge/ra",
276284
build_file = "@rabbitmq-server//bazel:BUILD.ra",
277-
sha256 = "13b03f02cf6c1837c527edd4a953f0c09da0abad0af6985b64bfd66943c4c5c3",
278-
version = "2.5.1",
279285
)
280286

281287
erlang_package.hex_package(

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,6 +776,13 @@ end}.
776776
{datatype, [integer, {list, string}]}
777777
]}.
778778

779+
{mapping, "default_policies.operator.$id.target_group_size", "rabbit.default_policies.operator", [
780+
{include_default, 1},
781+
{commented, 1},
782+
{validators, ["non_zero_positive_integer"]},
783+
{datatype, integer}
784+
]}.
785+
779786
{translation, "rabbit.default_policies.operator", fun(Conf) ->
780787
Props = rabbit_cuttlefish:aggregate_props(
781788
Conf,

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
tick/2,
2727
overview/1,
2828

29+
eval_members/3,
30+
2931
get_checked_out/4,
3032
%% versioning
3133
version/0,
@@ -899,6 +901,10 @@ tick(Ts, #?MODULE{cfg = #cfg{name = _Name,
899901
[{aux, {handle_tick, [QName, overview(State), all_nodes(State)]}}]
900902
end.
901903

904+
eval_members({ClusterName, _} = _Leader, Cluster,
905+
#?MODULE{cfg = #cfg{resource = QName}} = _State) ->
906+
rabbit_quorum_queue:eval_members(ClusterName, Cluster, QName).
907+
902908
-spec overview(state()) -> map().
903909
overview(#?MODULE{consumers = Cons,
904910
enqueuers = Enqs,

deps/rabbit/src/rabbit_process.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,5 @@ is_registered_process_alive(Name) ->
8888

8989
is_process_hibernated(Pid) when is_pid(Pid) ->
9090
{current_function,{erlang,hibernate,3}} == erlang:process_info(Pid, current_function);
91-
is_process_hibernated(_) ->
92-
%% some queue types, eg QQs, have a tuple as a Pid, but they are never hibernated
91+
is_process_hibernated(Pid) when is_tuple(Pid) ->
9392
false.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 142 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
-module(rabbit_quorum_queue).
99

1010
-behaviour(rabbit_queue_type).
11+
-behaviour(rabbit_policy_validator).
12+
-behaviour(rabbit_policy_merge_strategy).
1113

1214
-export([init/1,
1315
close/1,
@@ -30,7 +32,7 @@
3032
-export([cluster_state/1, status/2]).
3133
-export([update_consumer_handler/8, update_consumer/9]).
3234
-export([cancel_consumer_handler/2, cancel_consumer/3]).
33-
-export([become_leader/2, handle_tick/3, spawn_deleter/1]).
35+
-export([become_leader/2, handle_tick/3, spawn_deleter/1, eval_members/3]).
3436
-export([rpc_delete_metrics/1]).
3537
-export([format/1]).
3638
-export([open_files/1]).
@@ -65,6 +67,7 @@
6567
is_compatible/3,
6668
declare/2,
6769
is_stateful/0]).
70+
-export([validate_policy/1, merge_policy_value/3]).
6871

6972
-import(rabbit_queue_type_util, [args_policy_lookup/3,
7073
qname_to_internal_name/1]).
@@ -111,6 +114,34 @@
111114
-define(ADD_MEMBER_TIMEOUT, 5000).
112115
-define(SNAPSHOT_INTERVAL, 8192). %% the ra default is 4096
113116

117+
-define(EVAL_MEMBERS_TIMEOUT, 60000).
118+
-define(EVAL_MEMBERS_EVENT_TIMEOUT, 30000).
119+
120+
121+
%%----------- QQ policies ---------------------------------------------------
122+
123+
-rabbit_boot_step(
124+
{?MODULE,
125+
[{description, "QQ policy validation"},
126+
{mfa, {rabbit_registry, register,
127+
[policy_validator, <<"target-group-size">>, ?MODULE]}},
128+
{mfa, {rabbit_registry, register,
129+
[operator_policy_validator, <<"target-group-size">>, ?MODULE]}},
130+
{mfa, {rabbit_registry, register,
131+
[policy_merge_strategy, <<"target-group-size">>, ?MODULE]}},
132+
{requires, rabbit_registry},
133+
{enables, recovery}]}).
134+
135+
validate_policy(Args) ->
136+
Count = proplists:get_value(<<"target-group-size">>, Args, none),
137+
case is_integer(Count) andalso Count > 0 of
138+
true -> ok;
139+
false -> {error, "~tp is not a valid qq target count value", [Count]}
140+
end.
141+
142+
merge_policy_value(<<"target-group-size">>, _Val, OpVal) ->
143+
OpVal.
144+
114145
%%----------- rabbit_queue_type ---------------------------------------------
115146

116147
-spec is_enabled() -> boolean().
@@ -177,7 +208,7 @@ start_cluster(Q) ->
177208
Arguments = amqqueue:get_arguments(Q),
178209
Opts = amqqueue:get_options(Q),
179210
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
180-
QuorumSize = get_default_quorum_initial_group_size(Arguments),
211+
QuorumSize = get_default_quorum_initial_group_size(Arguments, Q),
181212
RaName = case qname_to_internal_name(QName) of
182213
{ok, A} ->
183214
A;
@@ -193,11 +224,7 @@ start_cluster(Q) ->
193224
[QuorumSize, rabbit_misc:rs(QName), Leader]),
194225
case rabbit_amqqueue:internal_declare(NewQ1, false) of
195226
{created, NewQ} ->
196-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
197-
?TICK_TIMEOUT),
198-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
199-
?SNAPSHOT_INTERVAL),
200-
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout, SnapshotInterval)
227+
RaConfs = [make_ra_conf(NewQ, ServerId)
201228
|| ServerId <- members(NewQ)],
202229
try erpc_call(Leader, ra, start_cluster,
203230
[?RA_SYSTEM, RaConfs, ?START_CLUSTER_TIMEOUT],
@@ -550,6 +577,90 @@ reductions(Name) ->
550577
0
551578
end.
552579

580+
eval_members(ClusterName, Cluster, QName) ->
581+
MemberNodes = [N || {_, N} <- Cluster],
582+
ExpectedNodes = rabbit_nodes:list_members(),
583+
Remove = MemberNodes -- ExpectedNodes,
584+
case Remove of
585+
[] ->
586+
add_member_effects(ClusterName, Cluster, QName, MemberNodes);
587+
_ ->
588+
remove_member_effects(ClusterName, Cluster, QName, Remove)
589+
end.
590+
591+
add_member_effects(ClusterName, Cluster, QName, MemberNodes) ->
592+
Running = rabbit_nodes:list_running(),
593+
case lists:sort(Running) == lists:sort([node() | nodes()]) of
594+
true ->
595+
{ok, Q} = rabbit_amqqueue:lookup(QName),
596+
New = Running -- MemberNodes,
597+
Arguments = amqqueue:get_arguments(Q),
598+
Size = get_default_quorum_initial_group_size(Arguments, Q),
599+
CurrentSize = length(MemberNodes),
600+
case {CurrentSize < Size, New} of
601+
{true, NewNodes} when NewNodes =/= [] ->
602+
NodesToAdd = lists:sublist(grow_order_sort(NewNodes),
603+
Size - CurrentSize),
604+
create_add_member_effects(ClusterName, Cluster,
605+
Q, QName, NodesToAdd);
606+
{_,_} ->
607+
rabbit_log:debug("CALLED: NOOP ~n",[]),
608+
undefined
609+
end;
610+
false ->
611+
rabbit_log:debug("CALLED: BACKOFF~n",[]),
612+
eval_members_backoff
613+
end.
614+
615+
create_add_member_effects(ClusterName, Cluster, Q, QName, New) ->
616+
rabbit_log:debug("CALLED: WILL ADD ~p~n",[New]),
617+
NewMembers = [make_add_member_effect(Q, QName, {ClusterName, N}) || N <- New],
618+
{add_member, NewMembers, Cluster}.
619+
620+
make_add_member_effect(Q, QName, {_ClusterName, Node} = ServerId) ->
621+
Conf = make_ra_conf(Q, ServerId),
622+
ResultFun = fun({ok, _, Leader}) ->
623+
Fun = fun(Q1) ->
624+
Q2 = update_type_state(
625+
Q1, fun(#{nodes := Nodes} = Ts) ->
626+
Ts#{nodes => [Node | Nodes]}
627+
end),
628+
amqqueue:set_pid(Q2, Leader)
629+
end,
630+
_ = rabbit_amqqueue:update(QName, Fun)
631+
end,
632+
{{ServerId, Conf}, ResultFun}.
633+
634+
grow_order_sort(Nodes) ->
635+
QueueLenFun =
636+
fun(Node) ->
637+
length([Q || Q <- rabbit_amqqueue:list_by_type(quorum),
638+
amqqueue:get_state(Q) =/= crashed,
639+
lists:member(Node, rabbit_amqqueue:get_quorum_nodes(Q))])
640+
end,
641+
NodeWithQLen = lists:keysort(
642+
2,
643+
[{Node, QueueLenFun(Node)} || Node <- Nodes]),
644+
[N || {N,_} <- NodeWithQLen].
645+
646+
remove_member_effects(ClusterName, Cluster, QName, RemovedFromCluster) ->
647+
rabbit_log:debug("CALLED: WILL REMOVE ~p~n",[RemovedFromCluster]),
648+
RemoveMembers = [make_remove_member_effect(QName, {ClusterName, N}) || N <- RemovedFromCluster],
649+
{remove_member, RemoveMembers, Cluster}.
650+
651+
make_remove_member_effect(QName, {_ClusterName, Node} = ServerId) ->
652+
ResultFun = fun({ok, _, _}) ->
653+
Fun = fun(Q1) ->
654+
update_type_state(
655+
Q1,
656+
fun(#{nodes := Nodes} = Ts) ->
657+
Ts#{nodes => lists:delete(Node, Nodes)}
658+
end)
659+
end,
660+
_ = rabbit_amqqueue:update(QName, Fun)
661+
end,
662+
{ServerId, ResultFun}.
663+
553664
is_recoverable(Q) ->
554665
Node = node(),
555666
Nodes = get_nodes(Q),
@@ -1089,11 +1200,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
10891200
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
10901201
ServerId = {RaName, Node},
10911202
Members = members(Q),
1092-
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1093-
?TICK_TIMEOUT),
1094-
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1095-
?SNAPSHOT_INTERVAL),
1096-
Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval),
1203+
Conf = make_ra_conf(Q, ServerId),
10971204
case ra:start_server(?RA_SYSTEM, Conf) of
10981205
ok ->
10991206
case ra:add_member(Members, ServerId, Timeout) of
@@ -1573,12 +1680,18 @@ quorum_ctag(Other) ->
15731680
maybe_send_reply(_ChPid, undefined) -> ok;
15741681
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
15751682

1576-
get_default_quorum_initial_group_size(Arguments) ->
1577-
case rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>) of
1578-
undefined ->
1683+
get_default_quorum_initial_group_size(Arguments, Q) ->
1684+
PolicyValue = rabbit_policy:get(<<"target-group-size">>, Q),
1685+
ArgValue = rabbit_misc:table_lookup(Arguments, <<"x-quorum-initial-group-size">>),
1686+
case {ArgValue, PolicyValue} of
1687+
{undefined, undefined} ->
15791688
application:get_env(rabbit, quorum_cluster_size, 3);
1580-
{_Type, Val} ->
1581-
Val
1689+
{undefined, V} ->
1690+
V;
1691+
{{_Type, V}, undefined} ->
1692+
V;
1693+
{{_Type, ArgV}, PolV} ->
1694+
max(ArgV, PolV)
15821695
end.
15831696

15841697
%% member with the current leader first
@@ -1590,7 +1703,16 @@ members(Q) when ?amqqueue_is_quorum(Q) ->
15901703
format_ra_event(ServerId, Evt, QRef) ->
15911704
{'$gen_cast', {queue_event, QRef, {ServerId, Evt}}}.
15921705

1593-
make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
1706+
make_ra_conf(Q, ServerId) ->
1707+
TickTimeout = application:get_env(rabbit, quorum_tick_interval,
1708+
?TICK_TIMEOUT),
1709+
SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval,
1710+
?SNAPSHOT_INTERVAL),
1711+
%% Do we want these values to be configurable?
1712+
MemberEvalTimeout = application:get_env(rabbit, quorum_eval_members_timeout,
1713+
?EVAL_MEMBERS_TIMEOUT),
1714+
MemberEvalEventTimeout = application:get_env(rabbit, quorum_eval_members_event_timeout,
1715+
?EVAL_MEMBERS_EVENT_TIMEOUT),
15941716
QName = amqqueue:get_name(Q),
15951717
RaMachine = ra_machine(Q),
15961718
[{ClusterName, _} | _] = Members = members(Q),
@@ -1606,6 +1728,8 @@ make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval) ->
16061728
log_init_args => #{uid => UId,
16071729
snapshot_interval => SnapshotInterval},
16081730
tick_timeout => TickTimeout,
1731+
eval_members_timeout => MemberEvalTimeout,
1732+
eval_members_event_timeout => MemberEvalEventTimeout,
16091733
machine => RaMachine,
16101734
ra_event_formatter => Formatter}.
16111735

rabbitmq_run.bzl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ rabbitmq_run_command_private = rule(
126126
"start-background-broker",
127127
"stop-node",
128128
"start-cluster",
129+
"add-node",
129130
"stop-cluster",
130131
]),
131132
},

scripts/bazel/rabbitmq-run.sh

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ for arg in "$@"; do
170170
start-cluster)
171171
CMD="$arg"
172172
;;
173+
add-node)
174+
CMD="$arg"
175+
;;
173176
stop-cluster)
174177
CMD="$arg"
175178
;;
@@ -279,6 +282,31 @@ case $CMD in
279282
fi
280283
done
281284
;;
285+
add-node)
286+
n=${NODE_NUM:=3}
287+
NODE0=${NODE_LEADER:="rabbit-0"}
288+
setup_node_env "$n"
289+
RABBITMQ_NODE_PORT=$((5672 + n)) \
290+
RABBITMQ_SERVER_START_ARGS=" \
291+
-rabbit loopback_users [] \
292+
-rabbitmq_management listener [{port,$((15672 + n))}] \
293+
-rabbitmq_mqtt tcp_listeners [$((1883 + n))] \
294+
-rabbitmq_web_mqtt tcp_config [{port,$((1893 + n))}] \
295+
-rabbitmq_web_mqtt_examples listener [{port,$((1903 + n))}] \
296+
-rabbitmq_stomp tcp_listeners [$((61613 + n))] \
297+
-rabbitmq_web_stomp tcp_config [{port,$((61623 + n))}] \
298+
-rabbitmq_web_stomp_examples listener [{port,$((61633 + n))}] \
299+
-rabbitmq_prometheus tcp_config [{port,$((15692 + n))}] \
300+
-rabbitmq_stream tcp_listeners [$((5552 + n))]" \
301+
"$RABBITMQ_SERVER" \
302+
> "$RABBITMQ_LOG_BASE"/startup_log \
303+
2> "$RABBITMQ_LOG_BASE"/startup_err &
304+
305+
await_startup
306+
"$RABBITMQCTL" -n "$RABBITMQ_NODENAME" stop_app
307+
"$RABBITMQCTL" -n "$RABBITMQ_NODENAME" join_cluster "$NODE0"
308+
"$RABBITMQCTL" -n "$RABBITMQ_NODENAME" start_app
309+
;;
282310
stop-cluster)
283311
nodes=${NODES:=3}
284312
for ((n=nodes-1; n >= 0; n--))

0 commit comments

Comments
 (0)