Skip to content

Commit 390a0b4

Browse files
Merge pull request #10754 from rabbitmq/amazon-mq-consumers-limit
Introduce per-channel consumers limit (by @illotum, AWS)
2 parents de6cff6 + 34b1cf6 commit 390a0b4

File tree

4 files changed

+79
-3
lines changed

4 files changed

+79
-3
lines changed

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -903,6 +903,24 @@ end}.
903903
end
904904
}.
905905

906+
%% Set the max allowed number of consumers per channel.
907+
%% `infinity` means "no limit".
908+
%%
909+
%% {consumer_max_per_channel, infinity},
910+
911+
{mapping, "consumer_max_per_channel", "rabbit.consumer_max_per_channel",
912+
[{datatype, [{atom, infinity}, integer]}]}.
913+
914+
{translation, "rabbit.consumer_max_per_channel",
915+
fun(Conf) ->
916+
case cuttlefish:conf_get("consumer_max_per_channel", Conf, undefined) of
917+
undefined -> cuttlefish:unset();
918+
infinity -> infinity;
919+
Val when is_integer(Val) andalso Val > 0 -> Val;
920+
_ -> cuttlefish:invalid("should be positive integer or 'infinity'")
921+
end
922+
end
923+
}.
906924

907925
%% Set the max permissible number of client connections per node.
908926
%% `infinity` means "no limit".

deps/rabbit/src/rabbit_channel.erl

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
consumer_prefetch,
106106
consumer_timeout,
107107
authz_context,
108+
max_consumers, % taken from rabbit.consumer_max_per_channel
108109
%% defines how ofter gc will be executed
109110
writer_gc_threshold
110111
}).
@@ -507,6 +508,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
507508
ConsumerTimeout = get_consumer_timeout(),
508509
OptionalVariables = extract_variable_map_from_amqp_params(AmqpParams),
509510
{ok, GCThreshold} = application:get_env(rabbit, writer_gc_threshold),
511+
MaxConsumers = application:get_env(rabbit, consumer_max_per_channel, infinity),
510512
State = #ch{cfg = #conf{state = starting,
511513
protocol = Protocol,
512514
channel = Channel,
@@ -523,6 +525,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
523525
consumer_prefetch = Prefetch,
524526
consumer_timeout = ConsumerTimeout,
525527
authz_context = OptionalVariables,
528+
max_consumers = MaxConsumers,
526529
writer_gc_threshold = GCThreshold
527530
},
528531
limiter = Limiter,
@@ -1313,8 +1316,13 @@ handle_method(#'basic.consume'{queue = <<"amq.rabbitmq.reply-to">>,
13131316
no_ack = NoAck,
13141317
nowait = NoWait},
13151318
_, State = #ch{reply_consumer = ReplyConsumer,
1319+
cfg = #conf{max_consumers = MaxConsumers},
13161320
consumer_mapping = ConsumerMapping}) ->
1321+
CurrentConsumers = maps:size(ConsumerMapping),
13171322
case maps:find(CTag0, ConsumerMapping) of
1323+
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
1324+
rabbit_misc:protocol_error(
1325+
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
13181326
error ->
13191327
case {ReplyConsumer, NoAck} of
13201328
{none, true} ->
@@ -1363,12 +1371,17 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
13631371
nowait = NoWait,
13641372
arguments = Args},
13651373
_, State = #ch{cfg = #conf{consumer_prefetch = ConsumerPrefetch,
1374+
max_consumers = MaxConsumers,
13661375
user = User,
13671376
virtual_host = VHostPath,
13681377
authz_context = AuthzContext},
13691378
consumer_mapping = ConsumerMapping
13701379
}) ->
1380+
CurrentConsumers = maps:size(ConsumerMapping),
13711381
case maps:find(ConsumerTag, ConsumerMapping) of
1382+
error when CurrentConsumers >= MaxConsumers -> % false when MaxConsumers is 'infinity'
1383+
rabbit_misc:protocol_error(
1384+
not_allowed, "reached maximum (~ts) of consumers per channel", [MaxConsumers]);
13721385
error ->
13731386
QueueName = qbin_to_resource(QueueNameBin, VHostPath),
13741387
check_read_permitted(QueueName, User, AuthzContext),

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,14 @@ tcp_listen_options.exit_on_close = false",
404404
"channel_max_per_node = infinity",
405405
[{rabbit,[{channel_max_per_node, infinity}]}],
406406
[]},
407+
{consumer_max_per_channel,
408+
"consumer_max_per_channel = 16",
409+
[{rabbit,[{consumer_max_per_channel, 16}]}],
410+
[]},
411+
{consumer_max_per_channel,
412+
"consumer_max_per_channel = infinity",
413+
[{rabbit,[{consumer_max_per_channel, infinity}]}],
414+
[]},
407415
{max_message_size,
408416
"max_message_size = 131072",
409417
[{rabbit, [{max_message_size, 131072}]}],

deps/rabbit/test/per_node_limit_SUITE.erl

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ groups() ->
2323
{limit_tests, [], [
2424
node_connection_limit,
2525
vhost_limit,
26+
channel_consumers_limit,
2627
node_channel_limit
2728
]}
2829
].
@@ -62,13 +63,15 @@ init_per_testcase(Testcase, Config) ->
6263

6364
end_per_testcase(vhost_limit = Testcase, Config) ->
6465
set_node_limit(Config, vhost_max, infinity),
65-
set_node_limit(Config, channel_max_per_node, 0),
66+
set_node_limit(Config, channel_max_per_node, infinity),
67+
set_node_limit(Config, consumer_max_per_channel, infinity),
6668
set_node_limit(Config, connection_max, infinity),
6769
[rabbit_ct_broker_helpers:delete_vhost(Config, integer_to_binary(I)) || I <- lists:seq(1,4)],
6870
rabbit_ct_helpers:testcase_finished(Config, Testcase);
6971
end_per_testcase(Testcase, Config) ->
7072
set_node_limit(Config, vhost_max, infinity),
71-
set_node_limit(Config, channel_max_per_node, 0),
73+
set_node_limit(Config, channel_max_per_node, infinity),
74+
set_node_limit(Config, consumer_max_per_channel, infinity),
7275
set_node_limit(Config, connection_max, infinity),
7376
rabbit_ct_helpers:testcase_finished(Config, Testcase).
7477

@@ -111,12 +114,13 @@ vhost_limit(Config) ->
111114
node_channel_limit(Config) ->
112115
set_node_limit(Config, channel_max_per_node, 5),
113116

114-
VHost = <<"foobar">>,
117+
VHost = <<"node_channel_limit">>,
115118
User = <<"guest">>,
116119
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
117120
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
118121
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
119122
Conn2 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
123+
0 = count_channels_per_node(Config),
120124

121125
lists:foreach(fun(N) when (N band 1) == 1 -> {ok, _} = open_channel(Conn1);
122126
(_) -> {ok,_ } = open_channel(Conn2)
@@ -137,6 +141,30 @@ node_channel_limit(Config) ->
137141

138142
%% Now all connections are closed, so there should be 0 open connections
139143
0 = count_channels_per_node(Config),
144+
close_all_connections([Conn1, Conn2]),
145+
146+
rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
147+
148+
ok.
149+
150+
channel_consumers_limit(Config) ->
151+
set_node_limit(Config, consumer_max_per_channel, 2),
152+
153+
VHost = <<"channel_consumers_limit">>,
154+
User = <<"guest">>,
155+
ok = rabbit_ct_broker_helpers:add_vhost(Config, VHost),
156+
ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
157+
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0, VHost),
158+
{ok, Ch} = open_channel(Conn1),
159+
Q = <<"Q">>, Tag = <<"Tag">>,
160+
161+
{ok, _} = consume(Ch, Q, <<"Tag1">>),
162+
{ok, _} = consume(Ch, Q, <<"Tag2">>),
163+
{error, not_allowed_crash} = consume(Ch, Q, <<"Tag3">>), % Third consumer should fail
164+
165+
close_all_connections([Conn1]),
166+
rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
167+
140168
ok.
141169

142170
%% -------------------------------------------------------------------
@@ -157,6 +185,15 @@ set_node_limit(Config, Type, Limit) ->
157185
application,
158186
set_env, [rabbit, Type, Limit]).
159187

188+
consume(Ch, Q, Tag) ->
189+
#'queue.declare_ok'{queue = Q} = amqp_channel:call(Ch, #'queue.declare'{queue = Q}),
190+
try amqp_channel:call(Ch, #'basic.consume'{queue = Q, consumer_tag = Tag}) of
191+
#'basic.consume_ok'{} = OK -> {ok, OK};
192+
NotOk -> {error, NotOk}
193+
catch
194+
_:_Error -> {error, not_allowed_crash}
195+
end.
196+
160197
open_channel(Conn) when is_pid(Conn) ->
161198
try amqp_connection:open_channel(Conn) of
162199
{ok, Ch} -> {ok, Ch};

0 commit comments

Comments
 (0)