Skip to content

Commit f61964c

Browse files
committed
backup
1 parent 7a6341c commit f61964c

File tree

2 files changed

+40
-39
lines changed

2 files changed

+40
-39
lines changed

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 26 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -340,46 +340,49 @@ process_connect(Implicit, Frame,
340340
{?HEADER_HEART_BEAT,
341341
io_lib:format("~B,~B", [SendTimeout, ReceiveTimeout])},
342342
{?HEADER_VERSION, Version}],
343-
ok('CONNECTED',
344-
case application:get_env(rabbitmq_stomp, hide_server_info, false) of
345-
true -> Headers;
346-
false -> [{?HEADER_SERVER, server_header()} | Headers]
347-
end,
348-
"",
349-
StateN1#state{cfg = #cfg{
343+
344+
Res = ok("CONNECTED",
345+
case application:get_env(rabbitmq_stomp, hide_server_info, false) of
346+
true -> Headers;
347+
false -> [{?HEADER_SERVER, server_header()} | Headers]
348+
end,
349+
"",
350+
StateN1#state{cfg = #cfg{
350351
session_id = SessionId,
351-
version = Version
352-
},
353-
user = User,
354-
authz_ctx = AuthzCtx})
352+
version = Version
353+
},
354+
user = User,
355+
authz_ctx = AuthzCtx}),
356+
self() ! connection_created,
357+
Res
355358
else
356359
{error, no_common_version} ->
357360
error("Version mismatch",
358361
"Supported versions are ~ts~n",
359362
[string:join(?SUPPORTED_VERSIONS, ",")],
360363
StateN);
361-
{error, not_allowed} ->
364+
{error, not_allowed, EUsername, EVHost} ->
362365
rabbit_log:warning("STOMP login failed for user '~ts': "
363-
"virtual host access not allowed", [Username]),
366+
"virtual host access not allowed", [EUsername]),
364367
error("Bad CONNECT", "Virtual host '" ++
365-
binary_to_list(VHost) ++
368+
binary_to_list(EVHost) ++
366369
"' access denied", State);
367370
{refused, Username1, _Msg, _Args} ->
368371
rabbit_log:warning("STOMP login failed for user '~ts': authentication failed", [Username1]),
369372
error("Bad CONNECT", "Access refused for user '" ++
370373
binary_to_list(Username1) ++ "'", [], State);
371-
{error, not_loopback} ->
374+
{error, not_loopback, EUsername} ->
372375
rabbit_log:warning("STOMP login failed for user '~ts': "
373-
"this user's access is restricted to localhost", [Username]),
376+
"this user's access is restricted to localhost", [EUsername]),
374377
error("Bad CONNECT", "non-loopback access denied", State)
375-
end,
378+
end
376379
case {Res, Implicit} of
377380
{{ok, _, StateN2}, implicit} ->
378381
self() ! connection_created, ok(StateN2);
379382
_ ->
380383
self() ! connection_created, Res
381-
end
382-
end,
384+
385+
end,
383386
State).
384387

385388
creds(_, _, #cfg{default_login = DefLogin,
@@ -912,16 +915,6 @@ do_send(Destination, _DestHdr,
912915

913916
io:format("Message: ~p~n", [Message]),
914917

915-
%% {ok, BasicMessage} = rabbit_basic:message(ExchangeName, RoutingKey, Content),
916-
917-
%% Delivery = #delivery{
918-
%% mandatory = false,
919-
%% confirm = DoConfirm,
920-
%% sender = self(),
921-
%% message = BasicMessage,
922-
%% msg_seq_no = MsgSeqNo,
923-
%% flow = Flow
924-
%% },
925918
QNames = rabbit_exchange:route(Exchange, Message, #{return_binding_keys => true}),
926919
io:format("QNames ~p~n", [QNames]),
927920

@@ -1318,8 +1311,11 @@ ensure_reply_queue(TempQueueId, State = #state{reply_queues = RQS,
13181311
#resource{name = QNameBin} = QName = amqqueue:get_name(Queue),
13191312

13201313
ConsumerTag = rabbit_stomp_util:consumer_tag_reply_to(TempQueueId),
1314+
1315+
1316+
{ok, {_Global, DefaultPrefetch}} = application:get_env(rabbit, default_consumer_prefetch),
13211317
Spec = #{no_ack => true,
1322-
prefetch_count => application:get_env(rabbit, default_consumer_prefetch),
1318+
prefetch_count => DefaultPrefetch,
13231319
consumer_tag => ConsumerTag,
13241320
exclusive_consume => false,
13251321
args => []},
@@ -1718,7 +1714,6 @@ check_resource_access(User, Resource, Perm, Context) ->
17181714

17191715
handle_down({{'DOWN', QName}, _MRef, process, QPid, Reason},
17201716
State0 = #state{queue_states = QStates0} = State) ->
1721-
credit_flow:peer_down(QPid),
17221717
case rabbit_queue_type:handle_down(QPid, QName, Reason, QStates0) of
17231718
{ok, QStates1, Actions} ->
17241719
State1 = State0#state{queue_states = QStates1},
@@ -1782,13 +1777,6 @@ handle_queue_actions(Actions, #state{} = State0) ->
17821777
record_rejects(Rej, S);
17831778
({queue_down, QRef}, S0) ->
17841779
handle_consuming_queue_down_or_eol(QRef, S0);
1785-
%% TODO: I have no idea about the scope of credit_flow
1786-
({block, QName}, S0) ->
1787-
credit_flow:block(QName),
1788-
S0;
1789-
({unblock, QName}, S0) ->
1790-
credit_flow:unblock(QName),
1791-
S0;
17921780
%% TODO: in rabbit_channel there code for handling
17931781
%% send_drained and send_credit_reply
17941782
%% I'm doing catch all here to not crash?

deps/rabbitmq_stomp/src/rabbit_stomp_reader.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
-include("rabbit_stomp.hrl").
2020
-include("rabbit_stomp_frame.hrl").
21-
-include_lib("amqp_client/include/amqp_client.hrl").
2221

2322
-record(reader_state, {
2423
socket,
@@ -404,6 +403,20 @@ processor_args(Configuration, Sock) ->
404403
ssl_login_name(RealSocket, Configuration), PeerAddr}.
405404

406405
adapter_info(Sock) ->
406+
case rabbit_net:socket_ends(Socket, inbound) of
407+
{ok, {PeerIp, PeerPort, Ip, Port}} ->
408+
#amqp_adapter_info{protocol = {'STOMP', 0},
409+
name = Name,
410+
host = Host,
411+
port = Port,
412+
peer_host = PeerHost,
413+
peer_port = PeerPort,
414+
additional_info = maybe_ssl_info(Sock)}
415+
process_connect(ConnectPacket, Socket, ConnName, SendFun, SocketEnds);
416+
{error, Reason} ->
417+
{error, {socket_ends, Reason}}
418+
end.
419+
407420
amqp_connection:socket_adapter_info(Sock, {'STOMP', 0}).
408421

409422
ssl_login_name(_Sock, #stomp_configuration{ssl_cert_login = false}) ->

0 commit comments

Comments
 (0)