Skip to content

Commit 26665f2

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
remove generated_ids from transaction writes (#28033)
`generated_ids` are redundant because they're also represented as the keys in `updates` where `update.old_document.is_none()`. delete the field. reconstruct it when we need to check generated IDs at commit time, and when constructing the proto for funrun to avoid version skew issues. GitOrigin-RevId: 510e856a94ca14a106351a001d29d1b0e5555c95
1 parent e6465a3 commit 26665f2

File tree

8 files changed

+44
-101
lines changed

8 files changed

+44
-101
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -381,18 +381,14 @@ impl<RT: Runtime> FunctionRouter<RT> {
381381
user_tx_size,
382382
system_tx_size,
383383
} = function_tx.reads;
384-
let FunctionWrites {
385-
updates,
386-
generated_ids,
387-
} = function_tx.writes;
384+
let FunctionWrites { updates } = function_tx.writes;
388385
tx.apply_function_runner_tx(
389386
function_tx.begin_timestamp,
390387
reads,
391388
num_intervals,
392389
user_tx_size,
393390
system_tx_size,
394391
updates,
395-
generated_ids,
396392
function_tx.rows_read_by_tablet,
397393
)?;
398394
Some(tx)

crates/database/src/committer.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -950,26 +950,23 @@ impl<RT: Runtime> CommitterClient<RT> {
950950
// checked for conflict against all writes after begin_timestamp.
951951
let ts = transaction.begin_timestamp().succ()?;
952952
let timer = metrics::commit_id_reuse_timer();
953-
if !transaction.writes.generated_ids.is_empty() {
953+
let generated_ids = transaction.writes.generated_ids();
954+
if !generated_ids.is_empty() {
954955
let repeatable_persistence = RepeatablePersistence::new(
955956
self.persistence_reader.clone(),
956957
transaction.begin_timestamp(),
957958
self.retention_validator.clone(),
958959
);
959-
let generated_ids: BTreeSet<_> = transaction
960-
.writes
961-
.generated_ids
960+
let generated_ids_with_ts: BTreeSet<_> = generated_ids
962961
.iter()
963962
.map(|id| (GenericDocumentId::<TabletId>::from(*id), ts))
964963
.collect();
965964
let mut previous_revisions_of_ids = repeatable_persistence
966-
.previous_revisions(generated_ids)
965+
.previous_revisions(generated_ids_with_ts)
967966
.await?;
968967
if let Some(((document_id, _), (_, maybe_doc))) = previous_revisions_of_ids.pop_first()
969968
{
970-
let display_id = transaction
971-
.writes
972-
.generated_ids
969+
let display_id = generated_ids
973970
.iter()
974971
.find(|id| GenericDocumentId::<TabletId>::from(**id) == document_id)
975972
.map(|id| DeveloperDocumentId::from(*id).encode())

crates/database/src/tests/apply_function_runner_tx.rs

Lines changed: 7 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,14 @@ async fn test_apply_function_runner_tx_new_table(rt: TestRuntime) -> anyhow::Res
8080
.iter()
8181
.map(|(table, stats)| (*table, stats.rows_read))
8282
.collect();
83-
let (updates, generated_ids) = function_runner_tx
84-
.writes
85-
.clone()
86-
.into_updates_and_generated_ids();
83+
let updates = function_runner_tx.writes.clone().into_updates();
8784
backend_tx.apply_function_runner_tx(
8885
*begin_timestamp,
8986
reads,
9087
num_intervals,
9188
user_tx_size,
9289
system_tx_size,
9390
updates,
94-
generated_ids,
9591
rows_read_by_tablet,
9692
)?;
9793
assert_eq!(
@@ -137,18 +133,14 @@ async fn test_apply_function_runner_tx_read_only(rt: TestRuntime) -> anyhow::Res
137133
.iter()
138134
.map(|(table, stats)| (*table, stats.rows_read))
139135
.collect();
140-
let (updates, generated_ids) = function_runner_tx
141-
.writes
142-
.clone()
143-
.into_updates_and_generated_ids();
136+
let updates = function_runner_tx.writes.clone().into_updates();
144137
backend_tx.apply_function_runner_tx(
145138
*begin_timestamp,
146139
reads,
147140
num_intervals,
148141
user_tx_size,
149142
system_tx_size,
150143
updates,
151-
generated_ids,
152144
rows_read_by_tablet,
153145
)?;
154146

@@ -191,18 +183,14 @@ async fn test_apply_function_runner_tx_replace(rt: TestRuntime) -> anyhow::Resul
191183
.iter()
192184
.map(|(table, stats)| (*table, stats.rows_read))
193185
.collect();
194-
let (updates, generated_ids) = function_runner_tx
195-
.writes
196-
.clone()
197-
.into_updates_and_generated_ids();
186+
let updates = function_runner_tx.writes.clone().into_updates();
198187
backend_tx.apply_function_runner_tx(
199188
*begin_timestamp,
200189
reads,
201190
num_intervals,
202191
user_tx_size,
203192
system_tx_size,
204193
updates,
205-
generated_ids,
206194
rows_read_by_tablet,
207195
)?;
208196

@@ -230,8 +218,8 @@ async fn test_apply_function_runner_tx_merge_existing_writes(
230218
FunctionUsageTracker::new(),
231219
)
232220
.await?;
233-
let (updates, generated_ids) = backend_tx.writes().clone().into_updates_and_generated_ids();
234-
function_runner_tx.merge_writes(updates, generated_ids)?;
221+
let updates = backend_tx.writes().clone().into_updates();
222+
function_runner_tx.merge_writes(updates)?;
235223

236224
// Perform writes as if in funrun
237225
UserFacingModel::new_root_for_test(&mut function_runner_tx)
@@ -248,18 +236,14 @@ async fn test_apply_function_runner_tx_merge_existing_writes(
248236
.iter()
249237
.map(|(table, stats)| (*table, stats.rows_read))
250238
.collect();
251-
let (updates, generated_ids) = function_runner_tx
252-
.writes
253-
.clone()
254-
.into_updates_and_generated_ids();
239+
let updates = function_runner_tx.writes.clone().into_updates();
255240
backend_tx.apply_function_runner_tx(
256241
*begin_timestamp,
257242
reads,
258243
num_intervals,
259244
user_tx_size,
260245
system_tx_size,
261246
updates,
262-
generated_ids,
263247
rows_read_by_tablet,
264248
)?;
265249

@@ -304,10 +288,7 @@ async fn test_apply_function_runner_tx_merge_existing_writes_bad(
304288
.iter()
305289
.map(|(table, stats)| (*table, stats.rows_read))
306290
.collect();
307-
let (updates, generated_ids) = function_runner_tx
308-
.writes
309-
.clone()
310-
.into_updates_and_generated_ids();
291+
let updates = function_runner_tx.writes.clone().into_updates();
311292
assert!(backend_tx
312293
.apply_function_runner_tx(
313294
*begin_timestamp,
@@ -316,7 +297,6 @@ async fn test_apply_function_runner_tx_merge_existing_writes_bad(
316297
user_tx_size,
317298
system_tx_size,
318299
updates,
319-
generated_ids,
320300
rows_read_by_tablet,
321301
)
322302
.is_err());

crates/database/src/tests/mod.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -500,15 +500,20 @@ async fn test_id_reuse_within_a_transactions(rt: TestRuntime) -> anyhow::Result<
500500
let document_id = TestFacingModel::new(&mut tx)
501501
.insert(&"table".parse()?, ConvexObject::empty())
502502
.await?;
503+
let table_mapping = tx.table_mapping().clone();
504+
let table_id = table_mapping
505+
.namespace(TableNamespace::test_user())
506+
.name_to_id()("table".parse()?)?;
503507

504-
// Pretend this transaction does another insert using the same DocumentId. We
505-
// can't do this through the normal Transaction interface so reach into
506-
// the Writes.
507-
let err = tx
508-
.writes
509-
.register_new_id(&mut tx.reads, document_id)
508+
// Do another insert using the same DocumentId.
509+
let value = assert_obj!(
510+
"_id" => DeveloperDocumentId::from(document_id).encode(),
511+
);
512+
let err = ImportFacingModel::new(&mut tx)
513+
.insert(table_id, &"table".parse()?, value, &table_mapping)
514+
.await
510515
.unwrap_err();
511-
assert!(format!("{err}").contains("Transaction allocated the same DocumentId twice"));
516+
assert!(format!("{err}").contains("Duplicate insert"), "{err}");
512517
Ok(())
513518
}
514519

crates/database/src/transaction.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,6 @@ impl<RT: Runtime> Transaction<RT> {
365365
user_tx_size: crate::reads::TransactionReadSize,
366366
system_tx_size: crate::reads::TransactionReadSize,
367367
updates: BTreeMap<ResolvedDocumentId, DocumentUpdate>,
368-
generated_ids: BTreeSet<ResolvedDocumentId>,
369368
rows_read_by_tablet: BTreeMap<TabletId, u64>,
370369
) -> anyhow::Result<()> {
371370
anyhow::ensure!(
@@ -376,7 +375,7 @@ impl<RT: Runtime> Transaction<RT> {
376375
self.reads
377376
.merge(reads, num_intervals, user_tx_size, system_tx_size);
378377

379-
self.merge_writes(updates, generated_ids)?;
378+
self.merge_writes(updates)?;
380379

381380
for (tablet_id, rows_read) in rows_read_by_tablet {
382381
self.stats.entry(tablet_id).or_default().rows_read += rows_read;
@@ -393,21 +392,8 @@ impl<RT: Runtime> Transaction<RT> {
393392
pub fn merge_writes(
394393
&mut self,
395394
updates: BTreeMap<ResolvedDocumentId, DocumentUpdate>,
396-
// TODO: Delete generated_ids, they are included as (None, None)
397-
// update in updates.
398-
generated_ids: BTreeSet<ResolvedDocumentId>,
399395
) -> anyhow::Result<()> {
400-
let (existing_updates, existing_generated_ids) =
401-
self.writes().clone().into_updates_and_generated_ids();
402-
403-
// TODO: Delete generated_ids, they are included as (None, None)
404-
// update in updates. This check is redundant.
405-
for id in generated_ids.iter() {
406-
anyhow::ensure!(
407-
existing_updates.contains_key(id) || !existing_generated_ids.contains(id),
408-
"Conflicting generated ID {id}"
409-
);
410-
}
396+
let existing_updates = self.writes().clone().into_updates();
411397

412398
let mut updates = updates.into_iter().collect::<Vec<_>>();
413399
updates.sort_by_key(|(id, update)| {

crates/database/src/writes.rs

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ pub struct DocumentWrite {
5656
pub struct Writes {
5757
updates: BTreeMap<ResolvedDocumentId, DocumentUpdate>,
5858

59-
// All of the new DocumentIds that were generated in this transaction.
60-
// TODO: Remove, `generated_ids` are included in `updates` as (None, None)
61-
pub generated_ids: BTreeSet<ResolvedDocumentId>,
62-
6359
// Fields below can be recomputed from `updates`.
6460

6561
// Size of writes to user tables
@@ -82,7 +78,6 @@ impl Writes {
8278
pub fn new() -> Self {
8379
Self {
8480
updates: BTreeMap::new(),
85-
generated_ids: BTreeSet::new(),
8681
user_tx_size: TransactionWriteSize::default(),
8782
system_tx_size: TransactionWriteSize::default(),
8883
}
@@ -245,12 +240,6 @@ impl Writes {
245240
reads: &mut TransactionReadSet,
246241
document_id: ResolvedDocumentId,
247242
) -> anyhow::Result<()> {
248-
anyhow::ensure!(
249-
!self.generated_ids.contains(&document_id),
250-
"Transaction allocated the same DocumentId twice: {document_id}"
251-
);
252-
self.generated_ids.insert(document_id);
253-
254243
// New ID creation requires the ID to have never existed before.
255244
// We check in CommitterClient that it never existed before the transaction's
256245
// begin timestamp, and here we take a dependency on the ID to make sure
@@ -285,13 +274,16 @@ impl Writes {
285274
self.updates.into_iter()
286275
}
287276

288-
pub fn into_updates_and_generated_ids(
289-
self,
290-
) -> (
291-
BTreeMap<ResolvedDocumentId, DocumentUpdate>,
292-
BTreeSet<ResolvedDocumentId>,
293-
) {
294-
(self.updates, self.generated_ids)
277+
pub fn into_updates(self) -> BTreeMap<ResolvedDocumentId, DocumentUpdate> {
278+
self.updates
279+
}
280+
281+
pub fn generated_ids(&self) -> BTreeSet<ResolvedDocumentId> {
282+
self.updates
283+
.iter()
284+
.filter(|(_, update)| update.old_document.is_none())
285+
.map(|(id, _)| *id)
286+
.collect()
295287
}
296288
}
297289

@@ -477,7 +469,7 @@ mod tests {
477469
new_document: Some(document),
478470
},
479471
)?;
480-
assert_eq!(writes.generated_ids, btreeset! {id});
472+
assert_eq!(writes.generated_ids(), btreeset! {id});
481473
Ok(())
482474
}
483475

@@ -534,7 +526,7 @@ mod tests {
534526
}
535527
)
536528
);
537-
assert_eq!(writes.generated_ids, btreeset! {});
529+
assert_eq!(writes.generated_ids(), btreeset! {});
538530
Ok(())
539531
}
540532
}

crates/function_runner/src/in_memory_indexes.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,7 @@ fn make_transaction<RT: Runtime>(
120120
retention_validator,
121121
virtual_system_mapping,
122122
);
123-
let updates = existing_writes.updates;
124-
tx.merge_writes(updates, existing_writes.generated_ids)?;
123+
tx.merge_writes(existing_writes.updates)?;
125124
Ok(tx)
126125
}
127126

crates/function_runner/src/lib.rs

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
#![feature(try_blocks)]
55
#![feature(lint_reasons)]
66
use std::{
7-
collections::{
8-
BTreeMap,
9-
BTreeSet,
10-
},
7+
collections::BTreeMap,
118
sync::Arc,
129
};
1310

@@ -145,9 +142,6 @@ impl From<TransactionReadSet> for FunctionReads {
145142
#[derive(Clone, Default)]
146143
pub struct FunctionWrites {
147144
pub updates: BTreeMap<ResolvedDocumentId, DocumentUpdate>,
148-
149-
// All of the new DocumentIds that were generated in this transaction.
150-
pub generated_ids: BTreeSet<ResolvedDocumentId>,
151145
}
152146

153147
#[cfg(any(test, feature = "testing"))]
@@ -157,24 +151,18 @@ impl proptest::arbitrary::Arbitrary for FunctionWrites {
157151
type Strategy = impl Strategy<Value = FunctionWrites>;
158152

159153
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
160-
(
161-
proptest::collection::vec(proptest::prelude::any::<DocumentUpdate>(), 0..4),
162-
proptest::collection::btree_set(proptest::prelude::any::<ResolvedDocumentId>(), 0..4),
163-
)
164-
.prop_map(|(updates, generated_ids)| Self {
154+
proptest::collection::vec(proptest::prelude::any::<DocumentUpdate>(), 0..4)
155+
.prop_map(|updates| Self {
165156
updates: updates.into_iter().map(|u| (u.id, u)).collect(),
166-
generated_ids,
167157
})
168158
.boxed()
169159
}
170160
}
171161

172162
impl From<Writes> for FunctionWrites {
173163
fn from(writes: Writes) -> Self {
174-
let (updates, generated_ids) = writes.into_updates_and_generated_ids();
175164
Self {
176-
updates,
177-
generated_ids,
165+
updates: writes.into_updates(),
178166
}
179167
}
180168
}

0 commit comments

Comments
 (0)