@@ -407,8 +407,8 @@ impl StreamingSyncIteration {
407
407
SyncStateMachineTransition :: Empty
408
408
} else {
409
409
// Periodically check whether any subscriptions that are part of this stream
410
- // have become expired. We currently do this by re-creating the request and
411
- // aborting the iteration if it has changed.
410
+ // are expired. We currently do this by re-creating the request and aborting the
411
+ // iteration if it has changed.
412
412
let updated_request = self
413
413
. adapter
414
414
. collect_subscription_requests ( self . options . include_defaults ) ?;
@@ -600,6 +600,15 @@ impl StreamingSyncIteration {
600
600
Ok ( progress)
601
601
}
602
602
603
+ /// Reconciles local stream subscriptions with service-side state received in a checkpoint.
604
+ ///
605
+ /// This involves:
606
+ ///
607
+ /// 1. Marking local streams that don't exist in the checkpoint as inactive or deleting them.
608
+ /// 2. Creating new subscriptions for auto-subscribed streams we weren't tracking before.
609
+ /// 3. Associating buckets in the checkpoint with the stream subscriptions that created them.
610
+ /// 4. Reporting errors for stream subscriptions that are marked as errorenous in the
611
+ /// checkpoint.
603
612
fn resolve_subscription_state (
604
613
& self ,
605
614
tracked : & TrackedCheckpoint ,
@@ -628,7 +637,6 @@ impl StreamingSyncIteration {
628
637
} ) ;
629
638
} ) ?;
630
639
631
- // If they don't exist already, create default subscriptions included in checkpoint
632
640
for ( server_index, subscription) in tracked. streams . iter ( ) . enumerate ( ) {
633
641
let matching_local_subscriptions = tracked_subscriptions
634
642
. iter_mut ( )
@@ -640,57 +648,56 @@ impl StreamingSyncIteration {
640
648
local. local . active = true ;
641
649
local. local . is_default = subscription. is_default ;
642
650
has_local = true ;
651
+ }
643
652
644
- // Warn if this local subscription has errors. This search is quadratic because we
645
- // only get the index of the faulty subscription with an error, which we then map
646
- // to the local id via `tracked.requested_subscription_ids` and finally compare with
647
- // one of the matching subscription here. That's fine though because we don't expect
648
- // most of the streams to error in practice.
649
- for error in & * subscription. errors {
650
- if let StreamSubscriptionErrorCause :: ExplicitSubscription ( index) =
651
- error. subscription
652
- {
653
+ for error in & * subscription. errors {
654
+ match error. subscription {
655
+ StreamSubscriptionErrorCause :: Default => {
656
+ event. instructions . push ( Instruction :: LogLine {
657
+ severity : LogSeverity :: WARNING ,
658
+ line : Cow :: Owned ( format ! (
659
+ "Default subscription {} has errors: {}" ,
660
+ subscription. name, error. message
661
+ ) ) ,
662
+ } ) ;
663
+ }
664
+ StreamSubscriptionErrorCause :: ExplicitSubscription ( index) => {
653
665
let Some ( local_id_for_error) =
654
666
tracked. requested_subscriptions . subscription_ids . get ( index)
655
667
else {
656
668
continue ;
657
669
} ;
658
670
659
- if * local_id_for_error == local. local . id {
660
- let mut desc = String :: new ( ) ;
661
- let _ = write ! (
662
- & mut desc,
663
- "Subscription to stream {} " ,
664
- local. local. stream_name
665
- ) ;
666
- if let Some ( params) = & local. local . local_params {
667
- let _ = write ! ( & mut desc, "(with parameters {params})" ) ;
668
- } else {
669
- desc. push_str ( "(without parameters)" ) ;
671
+ // Find the matching explicit subscription to contextualize this error
672
+ // message with the name of the stream and parameters used for the
673
+ // subscription.
674
+ for local in & tracked_subscriptions {
675
+ if * local_id_for_error == local. local . id {
676
+ let mut desc = String :: new ( ) ;
677
+ let _ = write ! (
678
+ & mut desc,
679
+ "Subscription to stream {} " ,
680
+ local. local. stream_name
681
+ ) ;
682
+ if let Some ( params) = & local. local . local_params {
683
+ let _ = write ! ( & mut desc, "(with parameters {params})" ) ;
684
+ } else {
685
+ desc. push_str ( "(without parameters)" ) ;
686
+ }
687
+
688
+ let _ =
689
+ write ! ( & mut desc, " could not be resolved: {}" , error. message) ;
690
+ event. instructions . push ( Instruction :: LogLine {
691
+ severity : LogSeverity :: WARNING ,
692
+ line : Cow :: Owned ( desc) ,
693
+ } ) ;
670
694
}
671
-
672
- let _ = write ! ( & mut desc, " could not be resolved: {}" , error. message) ;
673
- event. instructions . push ( Instruction :: LogLine {
674
- severity : LogSeverity :: WARNING ,
675
- line : Cow :: Owned ( desc) ,
676
- } ) ;
677
695
}
678
696
}
679
- }
680
- }
681
-
682
- for error in & * subscription. errors {
683
- if let StreamSubscriptionErrorCause :: Default = error. subscription {
684
- event. instructions . push ( Instruction :: LogLine {
685
- severity : LogSeverity :: WARNING ,
686
- line : Cow :: Owned ( format ! (
687
- "Default subscription {} has errors: {}" ,
688
- subscription. name, error. message
689
- ) ) ,
690
- } ) ;
691
- }
697
+ } ;
692
698
}
693
699
700
+ // If they don't exist already, create default subscriptions included in checkpoint
694
701
if !has_local && subscription. is_default {
695
702
let local = self . adapter . create_default_subscription ( subscription) ?;
696
703
tracked_subscriptions. push ( LocalAndServerSubscription {
0 commit comments