Skip to content

Commit cb0eda2

Browse files
ansddeadtrickster
authored andcommitted
Convert MQTT 5.0 User Property
* to AMQP 0.9.1 headers * to AMQP 1.0 application properties and message annotations * from AMQP 0.9.1 headers TODO: * from AMQP 1.0 application properties and message annotations
1 parent bbf51e7 commit cb0eda2

File tree

1 file changed

+176
-42
lines changed

1 file changed

+176
-42
lines changed

deps/rabbitmq_mqtt/src/mc_mqtt.erl

Lines changed: 176 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,8 @@ init(Msg = #mqtt_msg{qos = Qos,
4848
end,
4949
{Msg, Anns}.
5050

51-
init_amqp(Sections)
52-
when is_list(Sections) ->
53-
{Header, MsgAnns, AmqpProps, PayloadRev} =
51+
convert_from(mc_amqp, Sections) ->
52+
{Header, _MsgAnns, AmqpProps, PayloadRev} =
5453
lists:foldl(
5554
fun(#'v1_0.header'{} = S, Acc) ->
5655
setelement(1, Acc, S);
@@ -78,37 +77,114 @@ init_amqp(Sections)
7877
_ ->
7978
?QOS_0
8079
end,
81-
Props0 = case MsgAnns of
82-
#'v1_0.message_annotations'{
83-
content = #{{symbol, <<"x-opt-reply-to-topic">>} := {utf8, Topic}}} ->
84-
#{'Response-Topic' => rabbit_mqtt_util:amqp_to_mqtt(Topic)};
85-
_ ->
86-
#{}
87-
end,
88-
Props1 = case AmqpProps of
80+
%% TODO convert #'v1_0.properties'{reply_to} to Response-Topic
81+
Props0 = case AmqpProps of
8982
#'v1_0.properties'{correlation_id = {_Type, _Val} = Corr} ->
90-
Props0#{'Correlation-Data' => correlation_id(Corr)};
83+
#{'Correlation-Data' => correlation_id(Corr)};
9184
_ ->
92-
Props0
85+
#{}
9386
end,
9487
Props = case AmqpProps of
9588
#'v1_0.properties'{content_type = {symbol, ContentType}} ->
96-
Props1#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType)};
89+
Props0#{'Content-Type' => rabbit_data_coercion:to_binary(ContentType)};
9790
_ ->
98-
Props1
91+
Props0
9992
end,
10093
#mqtt_msg{retain = false,
10194
qos = Qos,
10295
dup = false,
10396
props = Props,
104-
payload = lists:reverse(PayloadRev)}.
97+
payload = Payload};
98+
convert_from(mc_amqpl, #content{properties = PBasic,
99+
payload_fragments_rev = Payload}) ->
100+
#'P_basic'{expiration = Expiration,
101+
delivery_mode = DelMode,
102+
headers = H0,
103+
correlation_id = CorrId,
104+
content_type = ContentType} = PBasic,
105+
Qos = case DelMode of
106+
2 -> ?QOS_1;
107+
_ -> ?QOS_0
108+
end,
109+
P0 = case is_binary(ContentType) of
110+
true -> #{'Content-Type' => ContentType};
111+
false -> #{}
112+
end,
113+
H1 = case H0 of
114+
undefined -> [];
115+
_ -> H0
116+
end,
117+
{P1, H3} = case lists:keytake(<<"x-reply-to-topic">>, 1, H1) of
118+
{value, {_, longstr, Topic}, H2} ->
119+
{P0#{'Response-Topic' => rabbit_mqtt_util:amqp_to_mqtt(Topic)}, H2};
120+
false ->
121+
{P0, H1}
122+
end,
123+
{P2, H} = case is_binary(CorrId) of
124+
true ->
125+
{P1#{'Correlation-Data' => CorrId}, H3};
126+
false ->
127+
case lists:keytake(<<"x-correlation-id">>, 1, H3) of
128+
{value, {_, longstr, Corr}, H4} ->
129+
{P1#{'Correlation-Data' => Corr}, H4};
130+
false ->
131+
{P1, H3}
132+
end
133+
end,
134+
P3 = case amqpl_header_to_user_property(H) of
135+
[] ->
136+
P2;
137+
UserProperty ->
138+
P2#{'User-Property' => UserProperty}
139+
end,
140+
P = case is_binary(Expiration) of
141+
true ->
142+
Millis = binary_to_integer(Expiration),
143+
P3#{'Message-Expiry-Interval' => Millis div 1000};
144+
false ->
145+
P3
146+
end,
147+
#mqtt_msg{retain = false,
148+
qos = Qos,
149+
dup = false,
150+
payload = lists:reverse(Payload),
151+
props = P};
152+
convert_from(_SourceProto, _) ->
153+
not_implemented.
105154

106155
convert(?MODULE, Msg) ->
107156
Msg;
108-
convert(mc_amqp, #mqtt_msg{qos = Qos,
109-
props = Props,
110-
payload = Payload}) ->
111-
Header = #'v1_0.header'{durable = Qos > 0},
157+
convert_to(mc_amqp, #mqtt_msg{qos = Qos,
158+
props = Props,
159+
payload = Payload}) ->
160+
S0 = [#'v1_0.data'{content = Payload}],
161+
162+
%% x- prefixed MQTT User Properties go into Message Annotations.
163+
%% All other MQTT User Properties go into Application Properties.
164+
%% MQTT User Property allows duplicate keys, while AMQP maps don't.
165+
%% Order is semantically important in both MQTT User Property and AMQP maps.
166+
%% Therefore, we must dedup the keys and must maintain order.
167+
{MsgAnns, AppProps} =
168+
case Props of
169+
#{'User-Property' := UserProps} ->
170+
{MsgAnnsRev, AppPropsRev, _} =
171+
lists:foldl(fun({Name, _}, Acc = {_, _, M})
172+
when is_map_key(Name, M) ->
173+
Acc;
174+
({<<"x-", _/binary>> = Name, Val}, {MAnns, AProps, M}) ->
175+
{[{{utf8, Name}, {utf8, Val}} | MAnns], AProps, M#{Name => true}};
176+
({Name, Val}, {MAnns, AProps, M}) ->
177+
{MAnns, [{{utf8, Name}, {utf8, Val}} | AProps], M#{Name => true}}
178+
end, {[], [], #{}}, UserProps),
179+
{lists:reverse(MsgAnnsRev), lists:reverse(AppPropsRev)};
180+
_ ->
181+
{[], []}
182+
end,
183+
S1 = case AppProps of
184+
[] -> S0;
185+
_ -> [#'v1_0.application_properties'{content = AppProps} | S0]
186+
end,
187+
112188
ContentType = case Props of
113189
#{'Content-Type' := ContType} ->
114190
%%TODO MQTT Content Type is UTF-8 whereas
@@ -128,22 +204,29 @@ convert(mc_amqp, #mqtt_msg{qos = Qos,
128204
_ ->
129205
undefined
130206
end,
131-
AmqpProps = #'v1_0.properties'{content_type = ContentType,
132-
correlation_id = CorrId},
133-
AppData = #'v1_0.data'{content = [Payload]},
134-
Sections = case Props of
135-
#{'Response-Topic' := Topic} ->
136-
MsgAnns = #'v1_0.message_annotations'{
137-
content = [{{symbol, <<"x-opt-reply-to-topic">>},
138-
{utf8, rabbit_mqtt_util:mqtt_to_amqp(Topic)}}]},
139-
[Header, MsgAnns, AmqpProps, AppData];
140-
_ ->
141-
[Header, AmqpProps, AppData]
142-
end,
143-
mc_amqp:init_amqp(Sections);
144-
convert(mc_amqpl, #mqtt_msg{qos = Qos,
145-
props = Props,
146-
payload = Payload}) ->
207+
%% TODO Translate MQTT Response-Topic to AMQP topic.
208+
%% If operator did not mofidy mqtt.exchange, set reply-to address to "/topic/" RK.
209+
%% If operator modified mqtt.exchange, set reply-to address to "/exchange/" X "/" RK.
210+
% case Props of
211+
% #{'Response-Topic' := Topic} ->
212+
% rabbit_mqtt_util:mqtt_to_amqp(Topic)
213+
S2 = case {ContentType, CorrId} of
214+
{undefined, undefined} ->
215+
S1;
216+
_ ->
217+
[#'v1_0.properties'{content_type = ContentType,
218+
correlation_id = CorrId} | S1]
219+
end,
220+
221+
S3 = case MsgAnns of
222+
[] -> S2;
223+
_ -> [#'v1_0.message_annotations'{content = MsgAnns} | S2]
224+
end,
225+
S = [#'v1_0.header'{durable = Qos > 0} | S3],
226+
mc_amqp:convert_from(mc_amqp, S);
227+
convert_to(mc_amqpl, #mqtt_msg{qos = Qos,
228+
props = Props,
229+
payload = Payload}) ->
147230
DelMode = case Qos of
148231
?QOS_0 -> 1;
149232
?QOS_1 -> 2
@@ -153,28 +236,42 @@ convert(mc_amqpl, #mqtt_msg{qos = Qos,
153236
_ -> undefined
154237
end,
155238
Hs0 = case Props of
156-
#{'Response-Topic' := Topic} ->
157-
[{<<"x-opt-reply-to-topic">>, longstr, rabbit_mqtt_util:mqtt_to_amqp(Topic)}];
239+
#{'User-Property' := UserProperty} ->
240+
lists:map(fun({Name, Value}) ->
241+
{Name, longstr, Value}
242+
end, UserProperty);
158243
_ ->
159244
[]
160245
end,
161-
{CorrId, Hs} = case Props of
246+
Hs1 = case Props of
247+
#{'Response-Topic' := Topic} ->
248+
[{<<"x-reply-to-topic">>, longstr, rabbit_mqtt_util:mqtt_to_amqp(Topic)} | Hs0];
249+
_ ->
250+
Hs0
251+
end,
252+
{CorrId, Hs2} = case Props of
162253
#{'Correlation-Data' := Corr} ->
163254
case mc_util:is_valid_shortstr(Corr) of
164255
true ->
165-
{Corr, Hs0};
256+
{Corr, Hs1};
166257
false ->
167-
{undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs0]}
258+
{undefined, [{<<"x-correlation-id">>, longstr, Corr} | Hs1]}
168259
end;
169260
_ ->
170-
{undefined, Hs0}
261+
{undefined, Hs1}
171262
end,
172263
Expiration = case Props of
173264
#{'Message-Expiry-Interval' := Seconds} ->
174265
integer_to_binary(timer:seconds(Seconds));
175266
_ ->
176267
undefined
177268
end,
269+
%% "Duplicate fields are illegal." [4.2.5.5 Field Tables]
270+
%% RabbitMQ sorts field tables by keys.
271+
Hs = lists:usort(fun({Key1, _Type1, _Val1},
272+
{Key2, _Type2, _Val2}) ->
273+
Key1 =< Key2
274+
end, Hs2),
178275
BP = #'P_basic'{content_type = ContentType,
179276
headers = Hs,
180277
delivery_mode = DelMode,
@@ -241,3 +338,40 @@ correlation_id({uuid, UUID}) ->
241338
mc_util:uuid_to_string(UUID);
242339
correlation_id({_T, Corr}) ->
243340
rabbit_data_coercion:to_binary(Corr).
341+
342+
%% Translates AMQP 0.9.1 headers to MQTT 5.0 User Properties if
343+
%% the value is convertible to a UTF-8 String.
344+
-spec amqpl_header_to_user_property(rabbit_framing:amqp_table()) ->
345+
user_property().
346+
amqpl_header_to_user_property(Table) ->
347+
lists:filtermap(fun amqpl_field_to_string_pair/1, Table).
348+
349+
amqpl_field_to_string_pair({K, longstr, V}) ->
350+
case mc_util:is_utf8_no_null(V) of
351+
true -> {true, {K, V}};
352+
false -> false
353+
end;
354+
amqpl_field_to_string_pair({K, T, V})
355+
when T =:= byte;
356+
T =:= unsignedbyte;
357+
T =:= short;
358+
T =:= unsignedshort;
359+
T =:= signedint;
360+
T =:= unsignedint;
361+
T =:= long;
362+
T =:= timestamp ->
363+
{true, {K, integer_to_binary(V)}};
364+
amqpl_field_to_string_pair({K, T, V})
365+
when T =:= float;
366+
T =:= double ->
367+
{true, {K, float_to_binary(V)}};
368+
amqpl_field_to_string_pair({K, void, _V}) ->
369+
{true, {K, <<>>}};
370+
amqpl_field_to_string_pair({K, bool, V}) ->
371+
{true, {K, atom_to_binary(V)}};
372+
amqpl_field_to_string_pair({_K, T, _V})
373+
when T =:= array;
374+
T =:= table;
375+
%% Raw binary data is not UTF-8 encoded.
376+
T =:= binary ->
377+
false.

0 commit comments

Comments
 (0)