Skip to content

Commit fb9b537

Browse files
authored
Merge pull request #9291 from rabbitmq/mqtt-stream-test
Add integration test MQTT 5.0 -> Stream
2 parents 3783388 + 62710f5 commit fb9b537

File tree

2 files changed

+135
-2
lines changed

2 files changed

+135
-2
lines changed

deps/rabbitmq_mqtt/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,7 @@ rabbitmq_integration_suite(
281281
runtime_deps = [
282282
"//deps/amqp10_client:erlang_app",
283283
"//deps/rabbitmq_stomp:erlang_app",
284+
"//deps/rabbitmq_stream_common:erlang_app",
284285
"@emqtt//:erlang_app",
285286
],
286287
)

deps/rabbitmq_mqtt/test/protocol_interop_SUITE.erl

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -326,8 +326,119 @@ stomp(Config) ->
326326

327327
ok = emqtt:disconnect(C).
328328

329-
stream(_Config) ->
330-
{skip, "TODO write test"}.
329+
%% The stream test case is one-way because an MQTT client can publish to a stream,
330+
%% but not consume (directly) from a stream.
331+
stream(Config) ->
332+
Q = ClientId = atom_to_binary(?FUNCTION_NAME),
333+
Ch = rabbit_ct_client_helpers:open_channel(Config),
334+
335+
%% Bind a stream to the MQTT topic exchange.
336+
#'queue.declare_ok'{} = amqp_channel:call(
337+
Ch, #'queue.declare'{queue = Q,
338+
durable = true,
339+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]}),
340+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = Q,
341+
exchange = <<"amq.topic">>,
342+
routing_key = <<"my.topic">>}),
343+
344+
%% MQTT 5.0 to Stream
345+
C = connect(ClientId, Config),
346+
ContentType = <<"text/plain">>,
347+
Correlation = <<"some correlation ID">>,
348+
Payload = <<"my payload">>,
349+
UserProperty = [{<<"rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
350+
%% We expect that this message annotation will be dropped
351+
%% since AMQP 1.0 annoations must be symbols, i.e encoded as ASCII.
352+
{<<"x-rabbit🐇"/utf8>>, <<"carrot🥕"/utf8>>},
353+
{<<"key">>, <<"val">>},
354+
%% We expect that this application property will be dropped
355+
%% since AMQP 1.0 application properties are maps and maps disallow duplicate keys.
356+
{<<"key">>, <<"val">>},
357+
{<<"x-key">>, <<"val">>},
358+
%% We expect that this message annotation will be dropped
359+
%% since AMQP 1.0 annoations are maps and maps disallow duplicate keys.
360+
{<<"x-key">>, <<"val">>}],
361+
{ok, _} = emqtt:publish(C, <<"my/topic">>,
362+
#{'Content-Type' => ContentType,
363+
'Correlation-Data' => Correlation,
364+
'Response-Topic' => <<"response/topic">>,
365+
'User-Property' => UserProperty,
366+
'Payload-Format-Indicator' => 1},
367+
Payload, [{qos, 1}]),
368+
ok = emqtt:disconnect(C),
369+
370+
%% There is no open source Erlang RabbitMQ Stream client.
371+
%% Therefore, we have to build the commands for the Stream protocol handshake manually.
372+
StreamPort = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream),
373+
{ok, S} = gen_tcp:connect("localhost", StreamPort, [{active, false}, {mode, binary}]),
374+
375+
C0 = rabbit_stream_core:init(0),
376+
PeerPropertiesFrame = rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
377+
ok = gen_tcp:send(S, PeerPropertiesFrame),
378+
{{response, 1, {peer_properties, _, _}}, C1} = receive_stream_commands(S, C0),
379+
380+
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 1, sasl_handshake})),
381+
{{response, _, {sasl_handshake, _, _}}, C2} = receive_stream_commands(S, C1),
382+
Username = <<"guest">>,
383+
Password = <<"guest">>,
384+
Null = 0,
385+
PlainSasl = <<Null:8, Username/binary, Null:8, Password/binary>>,
386+
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 2, {sasl_authenticate, <<"PLAIN">>, PlainSasl}})),
387+
{{response, 2, {sasl_authenticate, _}}, C3} = receive_stream_commands(S, C2),
388+
{{tune, DefaultFrameMax, _}, C4} = receive_stream_commands(S, C3),
389+
390+
ok = gen_tcp:send(S, rabbit_stream_core:frame({response, 0, {tune, DefaultFrameMax, 0}})),
391+
ok = gen_tcp:send(S, rabbit_stream_core:frame({request, 3, {open, <<"/">>}})),
392+
{{response, 3, {open, _, _ConnectionProperties}}, C5} = receive_stream_commands(S, C4),
393+
394+
SubscriptionId = 99,
395+
SubCmd = {request, 1, {subscribe, SubscriptionId, Q, 0, 10, #{}}},
396+
SubscribeFrame = rabbit_stream_core:frame(SubCmd),
397+
ok = gen_tcp:send(S, SubscribeFrame),
398+
{{response, 1, {subscribe, _}}, C6} = receive_stream_commands(S, C5),
399+
400+
{{deliver, SubscriptionId, Chunk}, _C7} = receive_stream_commands(S, C6),
401+
<<5:4/unsigned,
402+
0:4/unsigned,
403+
0:8,
404+
1:16,
405+
1:32,
406+
_Timestamp:64,
407+
_Epoch:64,
408+
_COffset:64,
409+
_Crc:32,
410+
_DataLength:32,
411+
_TrailerLength:32,
412+
_ReservedBytes:32,
413+
0:1,
414+
BodySize:31/unsigned,
415+
Sections0:BodySize/binary>> = Chunk,
416+
Sections = amqp10_framing:decode_bin(Sections0),
417+
418+
ct:pal("Stream client received AMQP 1.0 sections:~n~p", [Sections]),
419+
420+
U = undefined,
421+
FakeTransfer = {'v1_0.transfer', U, U, U, U, U, U, U, U, U, U, U},
422+
Msg = amqp10_msg:from_amqp_records([FakeTransfer | Sections]),
423+
424+
?assert(amqp10_msg:header(durable, Msg)),
425+
?assertEqual(#{<<"x-exchange">> => <<"amq.topic">>,
426+
<<"x-routing-key">> => <<"my.topic">>,
427+
<<"x-key">> => <<"val">>},
428+
amqp10_msg:message_annotations(Msg)),
429+
?assertEqual(#{correlation_id => Correlation,
430+
content_type => ContentType,
431+
%% We expect that reply_to contains a valid address,
432+
%% and that the topic format got translated from MQTT to AMQP 0.9.1.
433+
reply_to => <<"/topic/response.topic">>},
434+
amqp10_msg:properties(Msg)),
435+
?assertEqual(#{<<"rabbit🐇"/utf8>> => <<"carrot🥕"/utf8>>,
436+
<<"key">> => <<"val">>},
437+
amqp10_msg:application_properties(Msg)),
438+
%% We excpet the body to be a single AMQP 1.0 value section where the value is a string
439+
%% because we set the MQTT 5.0 Payload-Format-Indicator.
440+
?assertEqual({'v1_0.amqp_value', {utf8, Payload}},
441+
amqp10_msg:body(Msg)).
331442

332443
%% -------------------------------------------------------------------
333444
%% Helpers
@@ -336,6 +447,27 @@ stream(_Config) ->
336447
delete_queues() ->
337448
[{ok, 0} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) || Q <- rabbit_amqqueue:list()].
338449

450+
receive_stream_commands(Sock, C0) ->
451+
case rabbit_stream_core:next_command(C0) of
452+
empty ->
453+
case gen_tcp:recv(Sock, 0, 5000) of
454+
{ok, Data} ->
455+
C1 = rabbit_stream_core:incoming_data(Data, C0),
456+
case rabbit_stream_core:next_command(C1) of
457+
empty ->
458+
{ok, Data2} = gen_tcp:recv(Sock, 0, 5000),
459+
rabbit_stream_core:next_command(
460+
rabbit_stream_core:incoming_data(Data2, C1));
461+
Res ->
462+
Res
463+
end;
464+
{error, Err} ->
465+
ct:fail("error receiving stream data ~w", [Err])
466+
end;
467+
Res ->
468+
Res
469+
end.
470+
339471
%% -------------------------------------------------------------------
340472
%% STOMP client BEGIN
341473
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)