Skip to content
148 changes: 94 additions & 54 deletions src/hb_http_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ start_link(Opts) ->
req(Args, Opts) -> req(Args, false, Opts).
req(Args, ReestablishedConnection, Opts) ->
case hb_opts:get(http_client, gun, Opts) of
gun -> gun_req(Args, ReestablishedConnection, Opts);
gun ->
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference for #481

MaxRedirects = hb_maps:get(gun_max_redirects, Opts, 5),
GunArgs = Args#{redirects_left => MaxRedirects},
gun_req(GunArgs, ReestablishedConnection, Opts);
httpc -> httpc_req(Args, ReestablishedConnection, Opts)
end.

Expand All @@ -35,11 +38,13 @@ httpc_req(Args, _, Opts) ->
body := Body
} = Args,
?event({httpc_req, Args}),
{Host, Port} = parse_peer(Peer, Opts),
Scheme = case Port of
443 -> "https";
_ -> "http"
ParsedPeer = uri_string:parse(iolist_to_binary(Peer)),
#{ scheme := Scheme, host := Host } = ParsedPeer,
DefaultPort = case Scheme of
<<"https">> -> 443;
<<"http">> -> 80
end,
Port = maps:get(port, ParsedPeer, DefaultPort),
?event(http_client, {httpc_req, {explicit, Args}}),
URL = binary_to_list(iolist_to_binary([Scheme, "://", Host, ":", integer_to_binary(Port), Path])),
FilteredHeaders = hb_maps:without([<<"content-type">>, <<"cookie">>], Headers, Opts),
Expand Down Expand Up @@ -78,9 +83,11 @@ httpc_req(Args, _, Opts) ->
}
end,
?event({http_client_outbound, Method, URL, Request}),
FollowRedirects = hb_maps:get(http_follow_redirects, Opts, true),
ReqOpts = [{autoredirect, FollowRedirects}],
HTTPCOpts = [{full_result, true}, {body_format, binary}],
StartTime = os:system_time(millisecond),
case httpc:request(Method, Request, [], HTTPCOpts) of
case httpc:request(Method, Request, ReqOpts, HTTPCOpts) of
{ok, {{_, Status, _}, RawRespHeaders, RespBody}} ->
EndTime = os:system_time(millisecond),
RespHeaders =
Expand All @@ -105,46 +112,57 @@ httpc_req(Args, _, Opts) ->
end.

gun_req(Args, ReestablishedConnection, Opts) ->
StartTime = os:system_time(millisecond),
#{ peer := Peer, path := Path, method := Method } = Args,
Response =
StartTime = os:system_time(millisecond),
#{ peer := Peer, path := Path, method := Method, redirects_left := RedirectsLeft } = Args,
Response =
case catch gen_server:call(?MODULE, {get_connection, Args, Opts}, infinity) of
{ok, PID} ->
ar_rate_limiter:throttle(Peer, Path, Opts),
case request(PID, Args, Opts) of
{error, Error} when Error == {shutdown, normal};
Error == noproc ->
{error, Error} when Error == {shutdown, normal}; Error == noproc ->
case ReestablishedConnection of
true ->
{error, client_error};
false ->
req(Args, true, Opts)
end;
Reply ->
Reply
end;
Reply = {_Ok, StatusCode, RedirectRes, _} ->
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference for #481

FollowRedirects = hb_maps:get(http_follow_redirects, Opts, true),
case lists:member(StatusCode, [301, 302, 307, 308]) of
true when FollowRedirects, RedirectsLeft > 0 ->
RedirectArgs = Args#{ redirects_left := RedirectsLeft - 1 },
handle_redirect(
RedirectArgs,
ReestablishedConnection,
Opts,
RedirectRes,
Reply
);
_ -> Reply
end
end;
{'EXIT', _} ->
{error, client_error};
Error ->
Error
end,
EndTime = os:system_time(millisecond),
%% Only log the metric for the top-level call to req/2 - not the recursive call
%% that happens when the connection is reestablished.
case ReestablishedConnection of
true ->
ok;
false ->
record_duration(#{
<<"request-method">> => method_to_bin(Method),
<<"request-path">> => hb_util:bin(Path),
<<"status-class">> => get_status_class(Response),
<<"duration">> => EndTime - StartTime
},
Opts
)
end,
Response.
end,
EndTime = os:system_time(millisecond),
%% Only log the metric for the top-level call to req/2 - not the recursive call
%% that happens when the connection is reestablished.
case ReestablishedConnection of
true ->
ok;
false ->
record_duration(#{
<<"request-method">> => method_to_bin(Method),
<<"request-path">> => hb_util:bin(Path),
<<"status-class">> => get_status_class(Response),
<<"duration">> => EndTime - StartTime
},
Opts
)
end,
Response.

%% @doc Record the duration of the request in an async process. We write the
%% data to prometheus if the application is enabled, as well as invoking the
Expand Down Expand Up @@ -455,6 +473,37 @@ terminate(Reason, #state{ status_by_pid = StatusByPID }) ->
%%% Private functions.
%%% ==================================================================

handle_redirect(Args, ReestablishedConnection, Opts, Res, Reply) ->
case lists:keyfind(<<"location">>, 1, Res) of
false ->
% There's no Location header, so we can't follow the redirect.
Reply;
{_LocationHeaderName, Location} ->
case uri_string:parse(Location) of
{error, _Reason, _Detail} ->
% Server returned a Location header but the URI was malformed.
Reply;
Parsed ->
#{ scheme := NewScheme, host := NewHost, path := NewPath } = Parsed,
Port = maps:get(port, Parsed, undefined),
FormattedPort = case Port of
undefined -> "";
_ -> lists:flatten(io_lib:format(":~i", [Port]))
end,
NewPeer = lists:flatten(
io_lib:format(
"~s://~s~s~s",
[NewScheme, NewHost, FormattedPort, NewPath]
)
),
NewArgs = Args#{
peer := NewPeer,
path := NewPath
},
gun_req(NewArgs, ReestablishedConnection, Opts)
end
end.

%% @doc Safe wrapper for prometheus_gauge:inc/2.
inc_prometheus_gauge(Name) ->
case application:get_application(prometheus) of
Expand All @@ -481,7 +530,13 @@ inc_prometheus_counter(Name, Labels, Value) ->
end.

open_connection(#{ peer := Peer }, Opts) ->
{Host, Port} = parse_peer(Peer, Opts),
ParsedPeer = uri_string:parse(iolist_to_binary(Peer)),
#{ scheme := Scheme, host := Host } = ParsedPeer,
DefaultPort = case Scheme of
<<"https">> -> 443;
<<"http">> -> 80
end,
Port = maps:get(port, ParsedPeer, DefaultPort),
?event(http_outbound, {parsed_peer, {peer, Peer}, {host, Host}, {port, Port}}),
BaseGunOpts =
#{
Expand All @@ -503,9 +558,9 @@ open_connection(#{ peer := Peer }, Opts) ->
)
},
Transport =
case Port of
443 -> tls;
_ -> tcp
case Scheme of
<<"https">> -> tls;
<<"http">> -> tcp
end,
DefaultProto =
case hb_features:http3() of
Expand All @@ -516,7 +571,7 @@ open_connection(#{ peer := Peer }, Opts) ->
GunOpts =
case Proto = hb_opts:get(protocol, DefaultProto, Opts) of
http3 -> BaseGunOpts#{protocols => [http3], transport => quic};
_ -> BaseGunOpts
_ -> BaseGunOpts#{transport => Transport}
end,
?event(http_outbound,
{gun_open,
Expand All @@ -526,22 +581,7 @@ open_connection(#{ peer := Peer }, Opts) ->
{transport, Transport}
}
),
gun:open(Host, Port, GunOpts).

parse_peer(Peer, Opts) ->
Parsed = uri_string:parse(Peer),
case Parsed of
#{ host := Host, port := Port } ->
{hb_util:list(Host), Port};
URI = #{ host := Host } ->
{
hb_util:list(Host),
case hb_maps:get(scheme, URI, undefined, Opts) of
<<"https">> -> 443;
_ -> hb_opts:get(port, 8734, Opts)
end
}
end.
gun:open(hb_util:list(Host), Port, GunOpts).

reply_error([], _Reason) ->
ok;
Expand Down Expand Up @@ -755,4 +795,4 @@ get_status_class(Data) when is_binary(Data) ->
get_status_class(Data) when is_atom(Data) ->
atom_to_binary(Data);
get_status_class(_) ->
<<"unknown">>.
<<"unknown">>.
8 changes: 7 additions & 1 deletion src/hb_opts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ default_message() ->
%% What HTTP client should the node use?
%% Options: gun, httpc
http_client => gun,
%% Should the HTTP client automatically follow 3xx redirects?
http_follow_redirects => true,
%% For the gun HTTP client, to mitigate resource exhaustion attacks, what's
%% the maximum number of automatic 3xx redirects we'll allow when
%% http_follow_redirects = true?
gun_max_redirects => 5,
%% Scheduling mode: Determines when the SU should inform the recipient
%% that an assignment has been scheduled for a message.
%% Options: aggressive(!), local_confirmation, remote_confirmation,
Expand Down Expand Up @@ -920,4 +926,4 @@ ensure_node_history_test() ->
]
},
?assertEqual({error, invalid_values}, ensure_node_history(InvalidItems, RequiredOpts)).
-endif.
-endif.