Skip to content

Commit 83e9b69

Browse files
jordanhunt22Convex, Inc.
authored and
Convex, Inc.
committed
Add worker that deletes expired entries from the documents log (#23527)
Added a `delete_documents` worker that retrieves entries before the valid retention window and deletes them. The logic looks pretty similar to the index deletion worker. I added rate limiting to prevent from overloading the database. I estimate that This is off by default. I can test this out by turning it on for a single instance and looking at the metrics. For any instance with > 10000 deleted entries, I calculated that it would make ~300 calls to the database: - delete batch size is 10000 - each call to the db in made in chunks of ~100 (128) operations - 100 queries to get `prev_revs` - 100 queries to get documents to delete - 100 calls to delete documents Not quite sure the limitations of the the database, so please let me know what's reasonable. I initially set rate limiting to only deleting 1 batch per minute per instance. ### Safeguards There are 2 knobs associated with this change: - `RETENTION_DOCUMENT_DELETES_ENABLED` allows the deletion logic to run - `DOCUMENT_RETENTION_DRY_RUN` controls whether or not the deletion queries are made to the db *this could be a bit confusing, so let me know* There is an `anyhow::ensure()` that all documents that we delete are out of the retention window. GitOrigin-RevId: 562b152ff7ed3ae9a68c28c140e7c214754bd6ec
1 parent b780c28 commit 83e9b69

File tree

5 files changed

+578
-35
lines changed

5 files changed

+578
-35
lines changed

crates/common/src/knobs.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,10 @@ pub static RETENTION_DELETE_BATCH: LazyLock<usize> =
362362
pub static RETENTION_DELETES_ENABLED: LazyLock<bool> =
363363
LazyLock::new(|| env_config("RETENTION_DELETES_ENABLED", true));
364364

365+
/// Whether retention document deletes are enabled.
366+
pub static RETENTION_DOCUMENT_DELETES_ENABLED: LazyLock<bool> =
367+
LazyLock::new(|| env_config("RETENTION_DOCUMENT_DELETES_ENABLED", false));
368+
365369
/// Enable or disable failing insert/update/deletes when retention is behind.
366370
pub static RETENTION_FAIL_ENABLED: LazyLock<bool> =
367371
LazyLock::new(|| env_config("RETENTION_FAIL_ENABLED", false));
@@ -376,6 +380,18 @@ pub static RETENTION_FAIL_START_MULTIPLIER: LazyLock<usize> =
376380
pub static RETENTION_FAIL_ALL_MULTIPLIER: LazyLock<usize> =
377381
LazyLock::new(|| env_config("RETENTION_FAIL_ALL_MULTIPLIER", 40));
378382

383+
/// Maximum number of batches of documents that can be deleted in a minute
384+
pub static DOCUMENT_RETENTION_BATCHES_PER_MINUTE: LazyLock<NonZeroU32> = LazyLock::new(|| {
385+
env_config(
386+
"DOCUMENT_RETENTION_BATCHES_PER_MINUTE",
387+
NonZeroU32::new(1).unwrap(),
388+
)
389+
});
390+
391+
/// Whether or not we run document retention in dry run mode
392+
pub static DOCUMENT_RETENTION_DRY_RUN: LazyLock<bool> =
393+
LazyLock::new(|| env_config("DOCUMENT_RETENTION_DRY_RUN", true));
394+
379395
/// Size at which a search index will be queued for snapshotting.
380396
pub static SEARCH_INDEX_SIZE_SOFT_LIMIT: LazyLock<usize> =
381397
LazyLock::new(|| env_config("SEARCH_INDEX_SIZE_SOFT_LIMIT", 10 * (1 << 20))); // 10 MiB

crates/common/src/persistence.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,23 @@ impl RepeatablePersistence {
559559
.await
560560
}
561561

562+
/// Allows a retention validator to be explicitly passed in
563+
pub async fn previous_revisions_with_validator(
564+
&self,
565+
ids: BTreeSet<(InternalDocumentId, Timestamp)>,
566+
retention_validator: Arc<dyn RetentionValidator>,
567+
) -> anyhow::Result<
568+
BTreeMap<(InternalDocumentId, Timestamp), (Timestamp, Option<ResolvedDocument>)>,
569+
> {
570+
for (_, ts) in &ids {
571+
// Reading documents <ts, so ts-1 needs to be repeatable.
572+
anyhow::ensure!(*ts <= self.upper_bound.succ()?);
573+
}
574+
self.reader
575+
.previous_revisions(ids, retention_validator.clone())
576+
.await
577+
}
578+
562579
pub fn read_snapshot(&self, at: RepeatableTimestamp) -> anyhow::Result<PersistenceSnapshot> {
563580
anyhow::ensure!(at <= self.upper_bound);
564581
self.retention_validator.optimistic_validate_snapshot(*at)?;

crates/common/src/testing/persistence_test_suite.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1340,7 +1340,7 @@ pub async fn persistence_global<P: Persistence>(p: Arc<P>) -> anyhow::Result<()>
13401340
Ok(())
13411341
}
13421342

1343-
fn doc(
1343+
pub fn doc(
13441344
id: ResolvedDocumentId,
13451345
ts: i32,
13461346
val: Option<i64>,

crates/database/src/metrics.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,14 @@ pub fn retention_delete_timer() -> Timer<VMHistogram> {
373373
Timer::new(&RETENTION_DELETE_SECONDS)
374374
}
375375

376+
register_convex_histogram!(
377+
RETENTION_DELETE_DOCUMENTS_SECONDS,
378+
"Time for retention to complete deletions"
379+
);
380+
pub fn retention_delete_documents_timer() -> Timer<VMHistogram> {
381+
Timer::new(&RETENTION_DELETE_DOCUMENTS_SECONDS)
382+
}
383+
376384
register_convex_histogram!(
377385
RETENTION_DELETE_CHUNK_SECONDS,
378386
"Time for retention to delete one chunk"
@@ -381,11 +389,27 @@ pub fn retention_delete_chunk_timer() -> Timer<VMHistogram> {
381389
Timer::new(&RETENTION_DELETE_CHUNK_SECONDS)
382390
}
383391

392+
register_convex_histogram!(
393+
RETENTION_DELETE_DOCUMENT_CHUNK_SECONDS,
394+
"Time for document retention to delete one chunk"
395+
);
396+
pub fn retention_delete_document_chunk_timer() -> Timer<VMHistogram> {
397+
Timer::new(&RETENTION_DELETE_DOCUMENT_CHUNK_SECONDS)
398+
}
399+
384400
register_convex_gauge!(RETENTION_CURSOR_AGE_SECONDS, "Age of the retention cursor");
385401
pub fn log_retention_cursor_age(age_secs: f64) {
386402
log_gauge(&RETENTION_CURSOR_AGE_SECONDS, age_secs)
387403
}
388404

405+
register_convex_gauge!(
406+
DOCUMENT_RETENTION_CURSOR_AGE_SECONDS,
407+
"Age of the document retention cursor"
408+
);
409+
pub fn log_document_retention_cursor_age(age_secs: f64) {
410+
log_gauge(&DOCUMENT_RETENTION_CURSOR_AGE_SECONDS, age_secs)
411+
}
412+
389413
register_convex_counter!(
390414
RETENTION_SCANNED_DOCUMENT_TOTAL,
391415
"Count of documents scanned by retention",
@@ -416,6 +440,36 @@ pub fn log_retention_scanned_document(is_tombstone: bool, has_prev_rev: bool) {
416440
)
417441
}
418442

443+
register_convex_counter!(
444+
DOCUMENT_RETENTION_SCANNED_DOCUMENT_TOTAL,
445+
"Count of documents scanned by retention",
446+
&["tombstone", "prev_rev"]
447+
);
448+
pub fn log_document_retention_scanned_document(is_tombstone: bool, has_prev_rev: bool) {
449+
log_counter_with_tags(
450+
&RETENTION_SCANNED_DOCUMENT_TOTAL,
451+
1,
452+
vec![
453+
metric_tag_const_value(
454+
"tombstone",
455+
if is_tombstone {
456+
"is_tombstone"
457+
} else {
458+
"is_document"
459+
},
460+
),
461+
metric_tag_const_value(
462+
"prev_rev",
463+
if has_prev_rev {
464+
"has_prev_rev"
465+
} else {
466+
"no_prev_rev"
467+
},
468+
),
469+
],
470+
)
471+
}
472+
419473
register_convex_counter!(
420474
RETENTION_EXPIRED_INDEX_ENTRY_TOTAL,
421475
"Number of index entries expired by retention",
@@ -448,6 +502,15 @@ pub fn log_retention_index_entries_deleted(deleted_rows: usize) {
448502
log_counter(&RETENTION_INDEX_ENTRIES_DELETED_TOTAL, deleted_rows as u64)
449503
}
450504

505+
register_convex_counter!(
506+
RETENTION_DOCUMENTS_DELETED_TOTAL,
507+
"The total number of documents persistence returns as having been actually deleted by \
508+
retention."
509+
);
510+
pub fn log_retention_documents_deleted(deleted_rows: usize) {
511+
log_counter(&RETENTION_DOCUMENTS_DELETED_TOTAL, deleted_rows as u64)
512+
}
513+
451514
register_convex_counter!(
452515
OUTSIDE_RETENTION_TOTAL,
453516
"Number of snapshots out of retention min_snapshot_ts",

0 commit comments

Comments
 (0)