Skip to content

Commit 56756cc

Browse files
goffrieConvex, Inc.
authored and
Convex, Inc.
committed
Move some callers to PersistenceReader::load_documents_from_table (#36354)
GitOrigin-RevId: 35cf2f8cf142d6ab231a84d6a8ac331c58420ce0
1 parent a2932e9 commit 56756cc

File tree

3 files changed

+30
-19
lines changed

3 files changed

+30
-19
lines changed

crates/common/src/persistence.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,6 @@ impl RepeatablePersistence {
529529
self.upper_bound
530530
}
531531

532-
/// Same as [`Persistence::load_all_documents`] but only including documents
533-
/// in the snapshot range.
534-
pub fn load_all_documents(&self, order: Order) -> DocumentStream<'_> {
535-
self.load_documents(TimestampRange::snapshot(*self.upper_bound), order)
536-
}
537-
538532
/// Same as [`Persistence::load_documents`] but only including documents in
539533
/// the snapshot range.
540534
pub fn load_documents(&self, range: TimestampRange, order: Order) -> DocumentStream<'_> {
@@ -547,6 +541,24 @@ impl RepeatablePersistence {
547541
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
548542
}
549543

544+
/// Same as [`Persistence::load_documents_from_table`] but only including
545+
/// documents in the snapshot range.
546+
pub fn load_documents_from_table(
547+
&self,
548+
tablet_id: TabletId,
549+
range: TimestampRange,
550+
order: Order,
551+
) -> DocumentStream<'_> {
552+
let stream = self.reader.load_documents_from_table(
553+
tablet_id,
554+
range,
555+
order,
556+
*DEFAULT_DOCUMENTS_PAGE_SIZE,
557+
self.retention_validator.clone(),
558+
);
559+
Box::pin(stream.try_filter(|entry| future::ready(entry.ts <= *self.upper_bound)))
560+
}
561+
550562
/// Same as `load_documents` but doesn't use the `RetentionValidator` from
551563
/// this `RepeatablePersistence`. Instead, the caller can choose its
552564
/// own validator.

crates/database/src/index_worker.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@ use common::{
7676
},
7777
};
7878
use futures::{
79-
future,
8079
pin_mut,
8180
stream::FusedStream,
8281
Future,
@@ -92,7 +91,6 @@ use tokio::sync::mpsc;
9291
use tokio_stream::wrappers::ReceiverStream;
9392
use value::{
9493
DeveloperDocumentId,
95-
InternalDocumentId,
9694
TableNamespace,
9795
};
9896

@@ -195,11 +193,11 @@ impl IndexSelector {
195193
tables.into_iter()
196194
}
197195

198-
fn filter_id(&self, id: InternalDocumentId) -> bool {
196+
fn tablet_id(&self) -> Option<TabletId> {
199197
match self {
200-
Self::All(_) => true,
201-
Self::Index { name, .. } => *name.table() == id.table(),
202-
Self::ManyIndexes { tablet_id, .. } => *tablet_id == id.table(),
198+
Self::All(_) => None,
199+
Self::Index { name, .. } => Some(*name.table()),
200+
Self::ManyIndexes { tablet_id, .. } => Some(*tablet_id),
203201
}
204202
}
205203
}
@@ -940,9 +938,11 @@ impl<RT: Runtime> IndexWriter<RT> {
940938
order: Order,
941939
index_selector: &'a IndexSelector,
942940
) -> impl Stream<Item = anyhow::Result<RevisionPair>> + 'a {
943-
let document_stream = reader
944-
.load_documents(range, order)
945-
.try_filter(|entry| future::ready(index_selector.filter_id(entry.id)));
941+
let document_stream = if let Some(tablet_id) = index_selector.tablet_id() {
942+
reader.load_documents_from_table(tablet_id, range, order)
943+
} else {
944+
reader.load_documents(range, order)
945+
};
946946
stream_revision_pairs(document_stream, reader)
947947
}
948948

crates/database/src/retention.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1432,9 +1432,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
14321432
let Some(doc) = maybe_doc else {
14331433
return Ok(());
14341434
};
1435-
if doc.id().tablet_id != index_tablet_id {
1436-
return Ok(());
1437-
}
1435+
anyhow::ensure!(doc.id().tablet_id == index_tablet_id);
14381436
let index_id = doc.id().internal_id();
14391437
let index: ParsedDocument<IndexMetadata<TabletId>> = doc.parse()?;
14401438
let index = index.into_value();
@@ -1471,7 +1469,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
14711469
retention_validator: Arc<dyn RetentionValidator>,
14721470
) -> anyhow::Result<()> {
14731471
let reader = persistence.reader();
1474-
let mut document_stream = reader.load_documents(
1472+
let mut document_stream = reader.load_documents_from_table(
1473+
index_table_id,
14751474
TimestampRange::new(**cursor..*latest_ts)?,
14761475
Order::Asc,
14771476
*DEFAULT_DOCUMENTS_PAGE_SIZE,

0 commit comments

Comments
 (0)