Skip to content

Commit 477390e

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
forbid bare TryStreamExt::try_chunks (#24261)
avoid issues with losing error context by forbidding bare `try_chunks` and requiring `TryChunksExt::try_chunks2` instead. better name? i don't like using `try_chunks` because then you have to qualify it with `TryChunksExt::try_chunks(stream, cap)` whenever it's used in a file that also imports `TryStreamExt`. GitOrigin-RevId: f1e758e21301c6affdb0a27868254cabe1de7a37
1 parent 2400425 commit 477390e

File tree

5 files changed

+31
-15
lines changed

5 files changed

+31
-15
lines changed

crates/common/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub mod types;
7070
pub mod utils;
7171
pub use value;
7272
pub mod bounded_thread_pool;
73+
pub mod try_chunks;
7374
pub mod version;
7475
pub mod ws;
7576

crates/common/src/persistence_helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::{
1010
document::ResolvedDocument,
1111
knobs::DOCUMENTS_IN_MEMORY,
1212
persistence::RepeatablePersistence,
13+
try_chunks::TryChunksExt,
1314
types::Timestamp,
1415
};
1516

@@ -49,7 +50,7 @@ pub async fn stream_revision_pairs<'a>(
4950
documents: impl Stream<Item = RevisionStreamEntry> + 'a,
5051
reader: &'a RepeatablePersistence,
5152
) {
52-
let documents = documents.try_chunks(*DOCUMENTS_IN_MEMORY).map_err(|e| e.1);
53+
let documents = documents.try_chunks2(*DOCUMENTS_IN_MEMORY);
5354
futures::pin_mut!(documents);
5455

5556
while let Some(read_chunk) = documents.try_next().await? {

crates/common/src/try_chunks.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use futures::{
2+
Stream,
3+
TryStreamExt,
4+
};
5+
6+
/// Use this instead of `try_chunks` to pass through errors directly.
7+
/// TryStreamExt::try_chunks wraps errors which can lose stacktrace and context.
8+
pub trait TryChunksExt<T, E> {
9+
fn try_chunks2(self, cap: usize) -> impl Stream<Item = Result<Vec<T>, E>>;
10+
}
11+
12+
impl<T, E, S: Stream<Item = Result<T, E>>> TryChunksExt<T, E> for S {
13+
fn try_chunks2(self, cap: usize) -> impl Stream<Item = Result<Vec<T>, E>> {
14+
#[allow(clippy::disallowed_methods)]
15+
TryStreamExt::try_chunks(self, cap).map_err(|e| e.1)
16+
}
17+
}

crates/database/src/retention.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ use common::{
8282
Reader,
8383
Writer,
8484
},
85+
try_chunks::TryChunksExt,
8586
types::{
8687
GenericIndexName,
8788
IndexId,
@@ -533,8 +534,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
533534
let reader_ = &reader;
534535
let mut index_entry_chunks = reader
535536
.load_documents(TimestampRange::new(cursor..min_snapshot_ts)?, Order::Asc)
536-
.try_chunks(*RETENTION_READ_CHUNK)
537-
.map_err(|e| e.1)
537+
.try_chunks2(*RETENTION_READ_CHUNK)
538538
.map(move |chunk| async move {
539539
let chunk = chunk?.to_vec();
540540
let mut entries_to_delete = vec![];
@@ -653,8 +653,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
653653
all_indexes,
654654
persistence_version,
655655
)
656-
.try_chunks(*RETENTION_DELETE_CHUNK)
657-
.map_err(|e| e.1);
656+
.try_chunks2(*RETENTION_DELETE_CHUNK);
658657
pin_mut!(expired_chunks);
659658
while let Some(delete_chunk) = expired_chunks.try_next().await? {
660659
tracing::trace!(
@@ -709,8 +708,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
709708
Order::Asc,
710709
Arc::new(NoopRetentionValidator),
711710
)
712-
.try_chunks(*RETENTION_READ_CHUNK)
713-
.map_err(|e| e.1)
711+
.try_chunks2(*RETENTION_READ_CHUNK)
714712
.map(move |chunk| async move {
715713
let chunk = chunk?.to_vec();
716714
let mut entries_to_delete: Vec<(Timestamp, InternalDocumentId)> = vec![];
@@ -812,8 +810,7 @@ impl<RT: Runtime> LeaderRetentionManager<RT> {
812810

813811
tracing::trace!("delete_documents: about to grab chunks");
814812
let expired_chunks = Self::expired_documents(rt, reader, cursor, min_snapshot_ts)
815-
.try_chunks(*RETENTION_DELETE_CHUNK)
816-
.map_err(|e| e.1);
813+
.try_chunks2(*RETENTION_DELETE_CHUNK);
817814
pin_mut!(expired_chunks);
818815
while let Some(delete_chunk) = expired_chunks.try_next().await? {
819816
tracing::trace!(
@@ -1530,6 +1527,7 @@ mod tests {
15301527
TestIdGenerator,
15311528
TestPersistence,
15321529
},
1530+
try_chunks::TryChunksExt,
15331531
types::{
15341532
unchecked_repeatable_ts,
15351533
DatabaseIndexUpdate,
@@ -1570,9 +1568,8 @@ mod tests {
15701568
));
15711569
};
15721570
let stream_throws = stream::once(async move { throws() });
1573-
// IMPORTANT: the map_err is required here and whenever we use try_chunks.
1574-
// Otherwise the error gets re-wrapped and loses context.
1575-
let chunks = stream_throws.try_chunks(1).map_err(|e| e.1);
1571+
// IMPORTANT: try_chunks fails here. try_chunks2 is necessary.
1572+
let chunks = stream_throws.try_chunks2(1);
15761573
let chunk_throws = async move || -> anyhow::Result<()> {
15771574
pin_mut!(chunks);
15781575
chunks.try_next().await?;

crates/database/src/table_iteration.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use common::{
3535
Runtime,
3636
RuntimeInstant,
3737
},
38+
try_chunks::TryChunksExt,
3839
types::{
3940
IndexId,
4041
RepeatableTimestamp,
@@ -318,8 +319,7 @@ impl<RT: Runtime> TableIterator<RT> {
318319
RepeatablePersistence::new(reader, end_ts, self.retention_validator.clone());
319320
let documents = repeatable_persistence
320321
.load_documents(TimestampRange::new(start_ts.succ()?..=*end_ts)?, Order::Asc)
321-
.try_chunks(self.page_size)
322-
.map_err(|e| e.1);
322+
.try_chunks2(self.page_size);
323323
pin_mut!(documents);
324324
while let Some(chunk) = documents.try_next().await? {
325325
while let Err(not_until) = rate_limiter.check() {
@@ -409,7 +409,7 @@ impl<RT: Runtime> TableIterator<RT> {
409409

410410
// Note even though `previous_revisions` can paginate internally, we don't want
411411
// to hold the entire result set in memory, because documents can be large.
412-
let id_chunks = ids.try_chunks(self.page_size).map_err(|e| e.1);
412+
let id_chunks = ids.try_chunks2(self.page_size);
413413
pin_mut!(id_chunks);
414414

415415
while let Some(chunk) = id_chunks.try_next().await? {

0 commit comments

Comments
 (0)