Skip to content

Commit 8d0609e

Browse files
Merge branch 'main' into issue-12324
2 parents aeda3ca + 82c81c8 commit 8d0609e

29 files changed

+1720
-202
lines changed

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -698,23 +698,39 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
698698

699699
make_source(#{role := {sender, _}}) ->
700700
#'v1_0.source'{};
701-
make_source(#{role := {receiver, #{address := Address} = Source, _Pid}, filter := Filter}) ->
701+
make_source(#{role := {receiver, Source, _Pid},
702+
filter := Filter}) ->
702703
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
704+
Dynamic = maps:get(dynamic, Source, false),
703705
TranslatedFilter = translate_filters(Filter),
704-
#'v1_0.source'{address = {utf8, Address},
706+
#'v1_0.source'{address = make_address(Source),
705707
durable = {uint, Durable},
706-
filter = TranslatedFilter}.
708+
dynamic = Dynamic,
709+
filter = TranslatedFilter,
710+
capabilities = make_capabilities(Source)}.
707711

708712
make_target(#{role := {receiver, _Source, _Pid}}) ->
709713
#'v1_0.target'{};
710-
make_target(#{role := {sender, #{address := Address} = Target}}) ->
714+
make_target(#{role := {sender, Target}}) ->
711715
Durable = translate_terminus_durability(maps:get(durable, Target, none)),
712-
TargetAddr = case is_binary(Address) of
713-
true -> {utf8, Address};
714-
false -> Address
715-
end,
716-
#'v1_0.target'{address = TargetAddr,
717-
durable = {uint, Durable}}.
716+
Dynamic = maps:get(dynamic, Target, false),
717+
#'v1_0.target'{address = make_address(Target),
718+
durable = {uint, Durable},
719+
dynamic = Dynamic,
720+
capabilities = make_capabilities(Target)}.
721+
722+
make_address(#{address := Addr}) ->
723+
if is_binary(Addr) ->
724+
{utf8, Addr};
725+
is_atom(Addr) ->
726+
Addr
727+
end.
728+
729+
make_capabilities(#{capabilities := Caps0}) ->
730+
Caps = [{symbol, C} || C <- Caps0],
731+
{array, symbol, Caps};
732+
make_capabilities(_) ->
733+
undefined.
718734

719735
max_message_size(#{max_message_size := Size})
720736
when is_integer(Size) andalso

deps/rabbit/include/rabbit_amqp_reader.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
-define(CLOSING_TIMEOUT, 30_000).
44
-define(SILENT_CLOSE_DELAY, 3_000).
55

6+
-define(SHUTDOWN_SESSIONS_TIMEOUT, 10_000).
7+
68
%% Allow for potentially large sets of tokens during the SASL exchange.
79
%% https://docs.oasis-open.org/amqp/amqp-cbs/v1.0/csd01/amqp-cbs-v1.0-csd01.html#_Toc67999915
810
-define(INITIAL_MAX_FRAME_SIZE, 8192).

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,10 +220,17 @@ terminate(_, _) ->
220220
%%--------------------------------------------------------------------------
221221
%% error handling / termination
222222

223-
close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
223+
close(Error, State0 = #v1{connection = #v1_connection{timeout = Timeout}}) ->
224224
%% Client properties will be emitted in the connection_closed event by rabbit_reader.
225-
ClientProperties = i(client_properties, State),
225+
ClientProperties = i(client_properties, State0),
226226
put(client_properties, ClientProperties),
227+
228+
%% "It is illegal to send any more frames (or bytes of any other kind)
229+
%% after sending a close frame." [2.7.9]
230+
%% Sessions might send frames via the writer proc.
231+
%% Therefore, let's first try to orderly shutdown our sessions.
232+
State = shutdown_sessions(State0),
233+
227234
Time = case Timeout > 0 andalso
228235
Timeout < ?CLOSING_TIMEOUT of
229236
true -> Timeout;
@@ -233,6 +240,31 @@ close(Error, State = #v1{connection = #v1_connection{timeout = Timeout}}) ->
233240
ok = send_on_channel0(State, #'v1_0.close'{error = Error}, amqp10_framing),
234241
State#v1{connection_state = closed}.
235242

243+
shutdown_sessions(#v1{tracked_channels = Channels} = State) ->
244+
maps:foreach(fun(_ChannelNum, Pid) ->
245+
gen_server:cast(Pid, shutdown)
246+
end, Channels),
247+
TimerRef = erlang:send_after(?SHUTDOWN_SESSIONS_TIMEOUT,
248+
self(),
249+
shutdown_sessions_timeout),
250+
wait_for_shutdown_sessions(TimerRef, State).
251+
252+
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State)
253+
when map_size(Channels) =:= 0 ->
254+
ok = erlang:cancel_timer(TimerRef, [{async, false},
255+
{info, false}]),
256+
State;
257+
wait_for_shutdown_sessions(TimerRef, #v1{tracked_channels = Channels} = State0) ->
258+
receive
259+
{{'DOWN', ChannelNum}, _MRef, process, SessionPid, _Reason} ->
260+
State = untrack_channel(ChannelNum, SessionPid, State0),
261+
wait_for_shutdown_sessions(TimerRef, State);
262+
shutdown_sessions_timeout ->
263+
?LOG_INFO("sessions running ~b ms after requested to be shut down: ~p",
264+
[?SHUTDOWN_SESSIONS_TIMEOUT, maps:values(Channels)]),
265+
State0
266+
end.
267+
236268
handle_session_exit(ChannelNum, SessionPid, Reason, State0) ->
237269
State = untrack_channel(ChannelNum, SessionPid, State0),
238270
S = case terminated_normally(Reason) of
@@ -760,6 +792,7 @@ send_to_new_session(
760792
connection = #v1_connection{outgoing_max_frame_size = MaxFrame,
761793
vhost = Vhost,
762794
user = User,
795+
container_id = ContainerId,
763796
name = ConnName},
764797
writer = WriterPid} = State) ->
765798
%% Subtract fixed frame header size.
@@ -772,6 +805,7 @@ send_to_new_session(
772805
OutgoingMaxFrameSize,
773806
User,
774807
Vhost,
808+
ContainerId,
775809
ConnName,
776810
BeginFrame],
777811
case rabbit_amqp_session_sup:start_session(SessionSup, ChildArgs) of

0 commit comments

Comments
 (0)