Skip to content

Commit 1c7285c

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
fix TryChunksStreamError usage (#24256)
using `try_chunks` directly loses all context on errors. i tried to create a new trait that has a better `try_chunks` implementation, but the types were annoying so i decided to put `map_err` everywhere (for now, to fix the bleeding). added a regression test. GitOrigin-RevId: e1723a714d34b7687b594e476bd47af545d54948
1 parent 3e3302b commit 1c7285c

File tree

3 files changed

+43
-6
lines changed

3 files changed

+43
-6
lines changed

crates/common/src/persistence_helpers.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ pub async fn stream_revision_pairs<'a>(
4949
documents: impl Stream<Item = RevisionStreamEntry> + 'a,
5050
reader: &'a RepeatablePersistence,
5151
) {
52-
let documents = documents.try_chunks(*DOCUMENTS_IN_MEMORY);
52+
let documents = documents.try_chunks(*DOCUMENTS_IN_MEMORY).map_err(|e| e.1);
5353
futures::pin_mut!(documents);
5454

5555
while let Some(read_chunk) = documents.try_next().await? {

crates/database/src/retention.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,6 +520,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
520520
let mut index_entry_chunks = reader
521521
.load_documents(TimestampRange::new(cursor..min_snapshot_ts)?, Order::Asc)
522522
.try_chunks(*RETENTION_READ_CHUNK)
523+
.map_err(|e| e.1)
523524
.map(move |chunk| async move {
524525
let chunk = chunk?.to_vec();
525526
let mut entries_to_delete = vec![];
@@ -638,7 +639,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
638639
all_indexes,
639640
persistence_version,
640641
)
641-
.try_chunks(*RETENTION_DELETE_CHUNK);
642+
.try_chunks(*RETENTION_DELETE_CHUNK)
643+
.map_err(|e| e.1);
642644
pin_mut!(expired_chunks);
643645
while let Some(delete_chunk) = expired_chunks.try_next().await? {
644646
tracing::trace!(
@@ -694,6 +696,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
694696
Arc::new(NoopRetentionValidator),
695697
)
696698
.try_chunks(*RETENTION_READ_CHUNK)
699+
.map_err(|e| e.1)
697700
.map(move |chunk| async move {
698701
let chunk = chunk?.to_vec();
699702
let mut entries_to_delete: Vec<(Timestamp, InternalDocumentId)> = vec![];
@@ -795,7 +798,8 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
795798

796799
tracing::trace!("delete_documents: about to grab chunks");
797800
let expired_chunks = Self::expired_documents(rt, reader, cursor, min_snapshot_ts)
798-
.try_chunks(*RETENTION_DELETE_CHUNK);
801+
.try_chunks(*RETENTION_DELETE_CHUNK)
802+
.map_err(|e| e.1);
799803
pin_mut!(expired_chunks);
800804
while let Some(delete_chunk) = expired_chunks.try_next().await? {
801805
tracing::trace!(
@@ -1521,13 +1525,45 @@ mod tests {
15211525
TableName,
15221526
},
15231527
};
1524-
use futures::TryStreamExt;
1528+
use errors::ErrorMetadataAnyhowExt;
1529+
use futures::{
1530+
pin_mut,
1531+
stream,
1532+
TryStreamExt,
1533+
};
15251534
use maplit::{
15261535
btreemap,
15271536
btreeset,
15281537
};
15291538

15301539
use super::LeaderRetentionManager;
1540+
use crate::retention::{
1541+
snapshot_invalid_error,
1542+
RetentionType,
1543+
};
1544+
1545+
#[convex_macro::test_runtime]
1546+
async fn test_chunks_is_out_of_retention(_rt: TestRuntime) -> anyhow::Result<()> {
1547+
let throws = || -> anyhow::Result<()> {
1548+
anyhow::bail!(snapshot_invalid_error(
1549+
Timestamp::must(1),
1550+
Timestamp::must(30),
1551+
RetentionType::Document
1552+
));
1553+
};
1554+
let stream_throws = stream::once(async move { throws() });
1555+
// IMPORTANT: the map_err is required here and whenever we use try_chunks.
1556+
// Otherwise the error gets re-wrapped and loses context.
1557+
let chunks = stream_throws.try_chunks(1).map_err(|e| e.1);
1558+
let chunk_throws = async move || -> anyhow::Result<()> {
1559+
pin_mut!(chunks);
1560+
chunks.try_next().await?;
1561+
anyhow::Ok(())
1562+
};
1563+
let err = chunk_throws().await.unwrap_err();
1564+
assert!(err.is_out_of_retention());
1565+
Ok(())
1566+
}
15311567

15321568
#[convex_macro::test_runtime]
15331569
async fn test_expired_index_entries(_rt: TestRuntime) -> anyhow::Result<()> {

crates/database/src/table_iteration.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,8 @@ impl<RT: Runtime> TableIterator<RT> {
318318
RepeatablePersistence::new(reader, end_ts, self.retention_validator.clone());
319319
let documents = repeatable_persistence
320320
.load_documents(TimestampRange::new(start_ts.succ()?..=*end_ts)?, Order::Asc)
321-
.try_chunks(self.page_size);
321+
.try_chunks(self.page_size)
322+
.map_err(|e| e.1);
322323
pin_mut!(documents);
323324
while let Some(chunk) = documents.try_next().await? {
324325
while let Err(not_until) = rate_limiter.check() {
@@ -408,7 +409,7 @@ impl<RT: Runtime> TableIterator<RT> {
408409

409410
// Note even though `previous_revisions` can paginate internally, we don't want
410411
// to hold the entire result set in memory, because documents can be large.
411-
let id_chunks = ids.try_chunks(self.page_size);
412+
let id_chunks = ids.try_chunks(self.page_size).map_err(|e| e.1);
412413
pin_mut!(id_chunks);
413414

414415
while let Some(chunk) = id_chunks.try_next().await? {

0 commit comments

Comments
 (0)