@@ -196,12 +196,12 @@ create_stream(Q0) ->
196
196
replica_nodes => Followers }),
197
197
Q1 = amqqueue :set_type_state (Q0 , Conf ),
198
198
case rabbit_amqqueue :internal_declare (Q1 , false ) of
199
- {created , Q } ->
200
- case rabbit_stream_coordinator :new_stream (Q , Leader ) of
199
+ {created , Q2 } ->
200
+ case rabbit_stream_coordinator :new_stream (Q2 , Leader ) of
201
201
{ok , {ok , LeaderPid }, _ } ->
202
202
% % update record with leader pid
203
- case set_leader_pid (LeaderPid , amqqueue : get_name ( Q ) ) of
204
- ok ->
203
+ case set_leader_pid (LeaderPid , QName ) of
204
+ { ok , Q } ->
205
205
rabbit_event :notify (queue_created ,
206
206
[{name , QName },
207
207
{durable , true },
@@ -218,7 +218,7 @@ create_stream(Q0) ->
218
218
[rabbit_misc :rs (QName ), node ()]}
219
219
end ;
220
220
Error ->
221
- _ = rabbit_amqqueue :internal_delete (Q , ActingUser ),
221
+ _ = rabbit_amqqueue :internal_delete (Q2 , ActingUser ),
222
222
{protocol_error , internal_error , " Cannot declare ~ts on node '~ts ': ~255p " ,
223
223
[rabbit_misc :rs (QName ), node (), Error ]}
224
224
end ;
@@ -1396,11 +1396,6 @@ resend_all(#stream_client{leader = LeaderPid,
1396
1396
end || {Seq , {_Corr , Msg }} <- Msgs ],
1397
1397
State .
1398
1398
1399
- - spec set_leader_pid (Pid , QName ) -> Ret when
1400
- Pid :: pid (),
1401
- QName :: rabbit_amqqueue :name (),
1402
- Ret :: ok | {error , timeout }.
1403
-
1404
1399
set_leader_pid (Pid , QName ) ->
1405
1400
% % TODO this should probably be a single khepri transaction for better performance.
1406
1401
Fun = fun (Q ) ->
@@ -1409,10 +1404,16 @@ set_leader_pid(Pid, QName) ->
1409
1404
case rabbit_amqqueue :update (QName , Fun ) of
1410
1405
not_found ->
1411
1406
% % This can happen during recovery
1412
- {ok , Q } = rabbit_amqqueue :lookup_durable_queue (QName ),
1413
- rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Fun (Q ));
1414
- _ ->
1415
- ok
1407
+ {ok , Q1 } = rabbit_amqqueue :lookup_durable_queue (QName ),
1408
+ Q = Fun (Q1 ),
1409
+ case rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Q ) of
1410
+ ok ->
1411
+ {ok , Q };
1412
+ Err ->
1413
+ Err
1414
+ end ;
1415
+ Q ->
1416
+ {ok , Q }
1416
1417
end .
1417
1418
1418
1419
close_log (undefined ) -> ok ;
0 commit comments