Skip to content

Commit f5f1212

Browse files
committed
wip
1 parent 17aa9b7 commit f5f1212

File tree

7 files changed

+84
-49
lines changed

7 files changed

+84
-49
lines changed

deps/rabbit/src/rabbit_channel.erl

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,9 @@ declare_fast_reply_to_v1(EncodedBin) ->
366366
-spec list() -> [pid()].
367367

368368
list() ->
369-
Nodes = rabbit_nodes:list_running(),
370-
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local, [], ?RPC_TIMEOUT).
369+
Nodes = rabbit_presence:list_present(),
370+
rabbit_misc:append_rpc_all_nodes(Nodes, rabbit_channel, list_local,
371+
[], ?RPC_TIMEOUT).
371372

372373
-spec list_local() -> [pid()].
373374

deps/rabbit/src/rabbit_queue_type_util.erl

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@
1212
check_auto_delete/1,
1313
check_exclusive/1,
1414
check_non_durable/1,
15-
run_checks/2]).
15+
run_checks/2,
16+
erpc_call/5]).
1617

1718
-include_lib("rabbit_common/include/rabbit.hrl").
1819
-include("amqqueue.hrl").
@@ -70,3 +71,30 @@ run_checks([C | Checks], Q) ->
7071
Err ->
7172
Err
7273
end.
74+
75+
-spec erpc_call(node(), module(), atom(), list(), non_neg_integer()) ->
76+
term() | {error, term()}.
77+
erpc_call(Node, M, F, A, _Timeout)
78+
when Node =:= node() ->
79+
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
80+
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
81+
try erpc:call(Node, M, F, A, infinity) of
82+
Result ->
83+
Result
84+
catch
85+
error:Err ->
86+
{error, Err}
87+
end;
88+
erpc_call(Node, M, F, A, Timeout) ->
89+
case lists:member(Node, nodes()) of
90+
true ->
91+
try erpc:call(Node, M, F, A, Timeout) of
92+
Result ->
93+
Result
94+
catch
95+
error:Err ->
96+
{error, Err}
97+
end;
98+
false ->
99+
{error, noconnection}
100+
end.

deps/rabbit/src/rabbit_quorum_queue.erl

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@
7878
force_all_queues_shrink_member_to_current_member/0]).
7979

8080
-import(rabbit_queue_type_util, [args_policy_lookup/3,
81-
qname_to_internal_name/1]).
81+
qname_to_internal_name/1,
82+
erpc_call/5
83+
]).
8284

8385
-include_lib("stdlib/include/qlc.hrl").
8486
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -1676,8 +1678,7 @@ format(Q, Ctx) when ?is_amqqueue(Q) ->
16761678
#{running_nodes := Running0} ->
16771679
Running0;
16781680
_ ->
1679-
%% WARN: slow
1680-
rabbit_nodes:list_running()
1681+
rabbit_presence:list_present()
16811682
end,
16821683
Online = [N || N <- Nodes, lists:member(N, Running)],
16831684
{_, LeaderNode} = amqqueue:get_pid(Q),
@@ -1840,31 +1841,6 @@ notify_decorators(QName, F, A) ->
18401841
ok
18411842
end.
18421843

1843-
erpc_call(Node, M, F, A, _Timeout)
1844-
when Node =:= node() ->
1845-
%% Only timeout 'infinity' optimises the local call in OTP 23-25 avoiding a new process being spawned:
1846-
%% https://github.com/erlang/otp/blob/47f121af8ee55a0dbe2a8c9ab85031ba052bad6b/lib/kernel/src/erpc.erl#L121
1847-
try erpc:call(Node, M, F, A, infinity) of
1848-
Result ->
1849-
Result
1850-
catch
1851-
error:Err ->
1852-
{error, Err}
1853-
end;
1854-
erpc_call(Node, M, F, A, Timeout) ->
1855-
case lists:member(Node, nodes()) of
1856-
true ->
1857-
try erpc:call(Node, M, F, A, Timeout) of
1858-
Result ->
1859-
Result
1860-
catch
1861-
error:Err ->
1862-
{error, Err}
1863-
end;
1864-
false ->
1865-
{error, noconnection}
1866-
end.
1867-
18681844
is_stateful() -> true.
18691845

18701846
force_shrink_member_to_current_member(VHost, Name) ->

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757
-export([eval_listeners/3,
5858
state/0]).
5959

60+
61+
-import(rabbit_queue_type_util, [
62+
erpc_call/5
63+
]).
64+
6065
-rabbit_boot_step({?MODULE,
6166
[{description, "Restart stream coordinator"},
6267
{mfa, {?MODULE, recover, []}},
@@ -396,34 +401,44 @@ process_command([Server | Servers], Cmd) ->
396401

397402
ensure_coordinator_started() ->
398403
Local = {?MODULE, node()},
399-
AllNodes = all_coord_members(),
404+
ExpectedMembers = expected_coord_members(),
400405
case whereis(?MODULE) of
401406
undefined ->
402407
global:set_lock(?STREAM_COORDINATOR_STARTUP),
403408
Nodes = case ra:restart_server(?RA_SYSTEM, Local) of
404409
{error, Reason} when Reason == not_started orelse
405410
Reason == name_not_registered ->
406-
OtherNodes = all_coord_members() -- [Local],
411+
OtherNodes = ExpectedMembers -- [Local],
412+
%% this could potentially be slow if some expected
413+
%% members are on nodes that have recently terminated
414+
%% and have left a dangling TCP connection
415+
%% I suspect this rarely happens as the local coordinator
416+
%% server is started in recover/0
407417
case lists:filter(
408418
fun({_, N}) ->
409-
erpc:call(N, erlang, whereis, [?MODULE]) =/= undefined
419+
is_pid(erpc_call(N, erlang,
420+
whereis, [?MODULE],
421+
1000))
410422
end, OtherNodes) of
411423
[] ->
412424
start_coordinator_cluster();
413425
_ ->
414426
OtherNodes
415427
end;
416428
ok ->
417-
AllNodes;
429+
%% TODO: it may be better to do a leader call
430+
%% here as the local member will not have caught up
431+
%% yet
432+
locally_known_members();
418433
{error, {already_started, _}} ->
419-
AllNodes;
434+
locally_known_members();
420435
_ ->
421-
AllNodes
436+
locally_known_members()
422437
end,
423438
global:del_lock(?STREAM_COORDINATOR_STARTUP),
424439
Nodes;
425440
_ ->
426-
AllNodes
441+
locally_known_members()
427442
end.
428443

429444
start_coordinator_cluster() ->
@@ -440,9 +455,15 @@ start_coordinator_cluster() ->
440455
[]
441456
end.
442457

443-
all_coord_members() ->
444-
Nodes = rabbit_nodes:list_running() -- [node()],
445-
[{?MODULE, Node} || Node <- [node() | Nodes]].
458+
present_coord_members() ->
459+
Local = {?MODULE, node()},
460+
Nodes = rabbit_presence:list_present(),
461+
[Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]].
462+
463+
expected_coord_members() ->
464+
Local = {?MODULE, node()},
465+
Nodes = rabbit_nodes:list_members(),
466+
[Local] ++ [{?MODULE, Node} || Node <- [node() | Nodes]].
446467

447468
version() -> 4.
448469

@@ -681,6 +702,15 @@ all_member_nodes(Streams) ->
681702
tick(_Ts, _State) ->
682703
[{aux, maybe_resize_coordinator_cluster}].
683704

705+
locally_known_members() ->
706+
%% TODO: use ra_leaderboard and fallback if leaderboard not populated
707+
case ra:members({local, {?MODULE, node()}}) of
708+
{_, Members, _} ->
709+
Members;
710+
Err ->
711+
exit({error_fetching_locally_known_coordinator_members, Err})
712+
end.
713+
684714
maybe_resize_coordinator_cluster() ->
685715
spawn(fun() ->
686716
case ra:members({?MODULE, node()}) of

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -645,9 +645,9 @@ grow_coordinator_cluster(Config) ->
645645
?assertEqual({'queue.declare_ok', Q, 0, 0},
646646
declare(Config, Server0, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])),
647647

648-
ok = rabbit_control_helper:command(stop_app, Server1),
648+
% ok = rabbit_control_helper:command(stop_app, Server1),
649649
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
650-
rabbit_control_helper:command(start_app, Server1),
650+
% rabbit_control_helper:command(start_app, Server1),
651651
%% at this point there _probably_ won't be a stream coordinator member on
652652
%% Server1
653653

@@ -673,7 +673,8 @@ grow_coordinator_cluster(Config) ->
673673
false
674674
end
675675
end, 60000),
676-
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]).
676+
% rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]),
677+
ok.
677678

678679
shrink_coordinator_cluster(Config) ->
679680
[Server0, Server1, Server2] =

deps/rabbitmq_management/src/rabbit_mgmt_db.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ handle_pre_hibernate(State) ->
258258
%% rabbit_mgmt_db is hibernating the odds are rabbit_event is
259259
%% quiescing in some way too).
260260
_ = rpc:multicall(
261-
rabbit_nodes:list_running(), rabbit_mgmt_db_handler, gc, []),
261+
rabbit_presence:list_present(), rabbit_mgmt_db_handler, gc, []),
262262
{hibernate, State}.
263263

264264
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

deps/rabbitmq_management/src/rabbit_mgmt_wm_queues.erl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,9 @@ is_authorized(ReqData, {Mode, Context}) ->
7272
%% Exported functions
7373

7474
basic(ReqData) ->
75-
%% rabbit_nodes:list_running/1 is a potentially slow function that performs
76-
%% a cluster wide query with a reasonably long (10s) timeout.
77-
%% TODO: replace with faster approximate function
78-
Running = rabbit_nodes:list_running(),
75+
%% rabbit_presence:list_present/1 is an approximate, "good enough" view of
76+
%% the current active running cluster members.
77+
Running = rabbit_presence:list_present(),
7978
Ctx = #{running_nodes => Running},
8079
FmtQ = fun (Q) -> rabbit_mgmt_format:queue(Q, Ctx) end,
8180
case rabbit_mgmt_util:disable_stats(ReqData) of

0 commit comments

Comments
 (0)