Skip to content

Commit 3b667fd

Browse files
committed
refactor
1 parent 7038737 commit 3b667fd

File tree

1 file changed

+9
-9
lines changed

1 file changed

+9
-9
lines changed

source/logrepl/combined.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,17 +121,17 @@ func (c *CombinedIterator) NextN(ctx context.Context, n int) ([]opencdc.Record,
121121

122122
records, err := c.activeIterator.NextN(ctx, n)
123123
if err != nil {
124+
if !errors.Is(err, snapshot.ErrIteratorDone) {
125+
return nil, fmt.Errorf("failed to fetch records in batch: %w", err)
126+
}
127+
124128
// Snapshot iterator is done, handover to CDC iterator
125-
if errors.Is(err, snapshot.ErrIteratorDone) {
126-
if err := c.useCDCIterator(ctx); err != nil {
127-
return nil, err
128-
}
129-
sdk.Logger(ctx).Debug().Msg("Snapshot completed, switching to CDC mode")
130-
131-
// Retry with new iterator
132-
return c.NextN(ctx, n)
129+
if err := c.useCDCIterator(ctx); err != nil {
130+
return nil, err
133131
}
134-
return nil, fmt.Errorf("failed to fetch records in batch: %w", err)
132+
133+
sdk.Logger(ctx).Debug().Msg("Snapshot completed, switching to CDC mode")
134+
return c.NextN(ctx, n)
135135
}
136136
return records, nil
137137
}

0 commit comments

Comments
 (0)