Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP 1.0 client: make it easier to pass in a virtual host #13661

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions deps/amqp10_client/src/amqp10_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
link_handle/1,
get_msg/1,
get_msg/2,
parse_uri/1
parse_uri/1,
%% for tests
binary_without_leading_slash/1
]).

-type snd_settle_mode() :: amqp10_client_session:snd_settle_mode().
Expand Down Expand Up @@ -412,14 +414,24 @@ parse_uri(Uri) ->
end.

parse_result(Map) ->
_ = case maps:get(path, Map, "/") of
"/" -> ok;
"" -> ok;
_ -> throw(path_segment_not_supported)
end,
Scheme = maps:get(scheme, Map, "amqp"),
UserInfo = maps:get(userinfo, Map, undefined),
Host = maps:get(host, Map),

%% AMQP 1.0 may not have the concept of virtual hosts but
%% Shovels and Erlang/BEAM-based apps connecting to RabbitMQ
%% need to be able to pass it, so treat any "non-default" path as a virtual host name
PathSegment = case maps:get(path, Map, "/") of
"/" -> undefined;
"" -> undefined;
Value0 -> binary_without_leading_slash(Value0)
end,
%% Note: this is not the same thing as a hostname at the TCP/IP level, that is, not 'address'.
DefaultHostname = case PathSegment of
undefined -> to_binary(Host);
Value1 -> list_to_binary(io_lib:format("vhost:~ts", [Value1]))
end,

DefaultPort = case Scheme of
"amqp" -> 5672;
"amqps" -> 5671
Expand All @@ -444,13 +456,15 @@ parse_result(Map) ->
Acc#{max_frame_size => list_to_integer(V)};
("hostname", V, Acc) ->
Acc#{hostname => list_to_binary(V)};
("vhost", V, Acc) ->
Acc#{hostname => list_to_binary(io_lib:format("vhost:~ts", [V]))};
("container_id", V, Acc) ->
Acc#{container_id => list_to_binary(V)};
("transfer_limit_margin", V, Acc) ->
Acc#{transfer_limit_margin => list_to_integer(V)};
(_, _, Acc) -> Acc
end, #{address => Host,
hostname => to_binary(Host),
hostname => DefaultHostname,
port => Port,
sasl => Sasl}, Query),
case Scheme of
Expand All @@ -460,6 +474,15 @@ parse_result(Map) ->
Ret0#{tls_opts => {secure_port, TlsOpts}}
end.

-spec binary_without_leading_slash(binary() | string()) -> binary().
binary_without_leading_slash(Bin) when is_binary(Bin) ->
case Bin of
<<"/", Rest/binary>> -> Rest;
Other -> Other
end;
binary_without_leading_slash(Bin) when is_list(Bin) ->
?FUNCTION_NAME(list_to_binary(Bin)).

parse_usertoken(U) ->
[User, Pass] = string:tokens(U, ":"),
{plain,
Expand Down Expand Up @@ -558,6 +581,12 @@ parse_uri_test_() ->
hostname => <<"my_proxy">>,
sasl => {plain, <<"fred">>, <<"passw">>}}},
parse_uri("amqp://fred:passw@my_proxy:9876")),
%% treat URI path as a virtual host name
?_assertEqual({ok, #{port => 5672,
address => "my_host",
sasl => anon,
hostname => <<"vhost:my_path_segment:9876">>}},
parse_uri("amqp://my_host/my_path_segment:9876")),
?_assertEqual(
{ok, #{address => "my_proxy", port => 9876,
hostname => <<"my_proxy">>,
Expand Down Expand Up @@ -597,9 +626,7 @@ parse_uri_test_() ->
"cacertfile=/etc/cacertfile.pem&certfile=/etc/certfile.pem&" ++
"keyfile=/etc/keyfile.key&fail_if_no_peer_cert=banana")),
?_assertEqual({error, plain_sasl_missing_userinfo},
parse_uri("amqp://my_host:9876?sasl=plain")),
?_assertEqual({error, path_segment_not_supported},
parse_uri("amqp://my_host/my_path_segment:9876"))
parse_uri("amqp://my_host:9876?sasl=plain"))
].

-endif.
121 changes: 121 additions & 0 deletions deps/amqp10_client/test/unit_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(unit_SUITE).

-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile([export_all, nowarn_export_all]).

suite() ->
[{timetrap, {minutes, 1}}].

all() ->
[
{group, uri_parsing}
].

groups() ->
[
{uri_parsing, [parallel], [
without_leading_slash,
parse_uri_case1,
parse_uri_case2,
parse_uri_case3,
parse_uri_case4,
parse_uri_case5,
parse_uri_case6,
parse_uri_case7,
parse_uri_case8
]}
].

%%
%% Test cases
%%

without_leading_slash(_) ->
?assertEqual(<<>>, amqp10_client:binary_without_leading_slash(<<>>)),
?assertEqual(<<>>, amqp10_client:binary_without_leading_slash(<<"/">>)),
?assertEqual(<<"abc">>, amqp10_client:binary_without_leading_slash(<<"/abc">>)),

?assertEqual(<<>>, amqp10_client:binary_without_leading_slash("")),
?assertEqual(<<>>, amqp10_client:binary_without_leading_slash("/")),
?assertEqual(<<"abc">>, amqp10_client:binary_without_leading_slash("/abc")).

parse_uri_case1(_) ->
URI = "amqp://target.hostname:5672",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result), 5672),
?assertEqual(anon, maps:get(sasl, Result), anon),
?assertEqual(undefined, maps:get(tls_opts, Result, undefined), undefined).

parse_uri_case2(_) ->
URI = "amqps://target.hostname:5671",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5671, maps:get(port, Result)),
?assertMatch({secure_port, _}, maps:get(tls_opts, Result)).

parse_uri_case3(_) ->
URI = "amqp://target.hostname",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)).

parse_uri_case4(_) ->
URI = "amqp://username:[email protected]",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)),
?assertEqual({plain, <<"username">>, <<"secre7">>}, maps:get(sasl, Result)).

parse_uri_case5(_) ->
URI = "amqp://username:[email protected]?container_id=container9&hostname=vhost:abc",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)),
?assertEqual({plain, <<"username">>, <<"secre7">>}, maps:get(sasl, Result)),
?assertEqual(<<"container9">>, maps:get(container_id, Result)),
?assertEqual(<<"vhost:abc">>, maps:get(hostname, Result)).

parse_uri_case6(_) ->
URI = "amqp://username:[email protected]?container_id=container7&vhost=abc",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)),
?assertEqual({plain, <<"username">>, <<"secre7">>}, maps:get(sasl, Result)),
?assertEqual(<<"container7">>, maps:get(container_id, Result)),
?assertEqual(<<"vhost:abc">>, maps:get(hostname, Result)).

parse_uri_case7(_) ->
URI = "amqp://username:[email protected]/abc?container_id=container5",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)),
?assertEqual({plain, <<"username">>, <<"secre7">>}, maps:get(sasl, Result)),
?assertEqual(<<"container5">>, maps:get(container_id, Result)),
?assertEqual(<<"vhost:abc">>, maps:get(hostname, Result)).

parse_uri_case8(_) ->
URI = "amqp://username:[email protected]/abc?container_id=container10&hostname=vhost:def&vhost=ghi",
{ok, Result} = amqp10_client:parse_uri(URI),

?assertEqual("target.hostname", maps:get(address, Result)),
?assertEqual(5672, maps:get(port, Result)),
?assertEqual({plain, <<"username">>, <<"secre7">>}, maps:get(sasl, Result)),
?assertEqual(<<"container10">>, maps:get(container_id, Result)),
?assertEqual(<<"vhost:ghi">>, maps:get(hostname, Result)).