@@ -174,12 +174,12 @@ create_stream(Q0) ->
174
174
replica_nodes => Followers }),
175
175
Q1 = amqqueue :set_type_state (Q0 , Conf ),
176
176
case rabbit_amqqueue :internal_declare (Q1 , false ) of
177
- {created , Q } ->
178
- case rabbit_stream_coordinator :new_stream (Q , Leader ) of
177
+ {created , Q2 } ->
178
+ case rabbit_stream_coordinator :new_stream (Q2 , Leader ) of
179
179
{ok , {ok , LeaderPid }, _ } ->
180
180
% % update record with leader pid
181
- case set_leader_pid (LeaderPid , amqqueue : get_name ( Q ) ) of
182
- ok ->
181
+ case set_leader_pid (LeaderPid , QName ) of
182
+ { ok , Q } ->
183
183
rabbit_event :notify (queue_created ,
184
184
[{name , QName },
185
185
{durable , true },
@@ -196,7 +196,7 @@ create_stream(Q0) ->
196
196
[rabbit_misc :rs (QName ), node ()]}
197
197
end ;
198
198
Error ->
199
- _ = rabbit_amqqueue :internal_delete (Q , ActingUser ),
199
+ _ = rabbit_amqqueue :internal_delete (Q2 , ActingUser ),
200
200
{protocol_error , internal_error , " Cannot declare ~ts on node '~ts ': ~255p " ,
201
201
[rabbit_misc :rs (QName ), node (), Error ]}
202
202
end ;
@@ -1346,11 +1346,6 @@ resend_all(#stream_client{leader = LeaderPid,
1346
1346
end || {Seq , {_Corr , Msg }} <- Msgs ],
1347
1347
State .
1348
1348
1349
- -spec set_leader_pid (Pid , QName ) -> Ret when
1350
- Pid :: pid (),
1351
- QName :: rabbit_amqqueue :name (),
1352
- Ret :: ok | {error , timeout }.
1353
-
1354
1349
set_leader_pid (Pid , QName ) ->
1355
1350
% % TODO this should probably be a single khepri transaction for better performance.
1356
1351
Fun = fun (Q ) ->
@@ -1359,10 +1354,16 @@ set_leader_pid(Pid, QName) ->
1359
1354
case rabbit_amqqueue :update (QName , Fun ) of
1360
1355
not_found ->
1361
1356
% % This can happen during recovery
1362
- {ok , Q } = rabbit_amqqueue :lookup_durable_queue (QName ),
1363
- rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Fun (Q ));
1364
- _ ->
1365
- ok
1357
+ {ok , Q1 } = rabbit_amqqueue :lookup_durable_queue (QName ),
1358
+ Q = Fun (Q1 ),
1359
+ case rabbit_amqqueue :ensure_rabbit_queue_record_is_initialized (Q ) of
1360
+ ok ->
1361
+ {ok , Q };
1362
+ Err ->
1363
+ Err
1364
+ end ;
1365
+ Q ->
1366
+ {ok , Q }
1366
1367
end .
1367
1368
1368
1369
close_log (undefined ) -> ok ;
0 commit comments