Skip to content

Commit 5be8e0e

Browse files
emmaling27Convex, Inc.
authored and
Convex, Inc.
committed
Add ComponentPath to FunctionUsageStats (#29839)
This PR supports breaking usage down by component path. This makes it possible to see within a transaction how much bandwidth was used in each component, for example. There are still some TODOs mostly around snapshot import, export and file storage. Notable changes: - adding `ComponentRegistry` to `FinalTransaction` - some new helpers to get `ComponentPath` given document id - pass around `BTreeMap<ComponentId, ComponentPath>` from `Snapshot` to avoid needing to create a transaction just to convert `ComponentId` to `ComponentPath` for snapshot export GitOrigin-RevId: 86e4f205081331ae1ddae5a6192df558bb7c4691
1 parent daff7a8 commit 5be8e0e

File tree

17 files changed

+243
-66
lines changed

17 files changed

+243
-66
lines changed

crates/application/src/export_worker.rs

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use common::{
2727
components::{
2828
ComponentId,
2929
ComponentName,
30+
ComponentPath,
3031
},
3132
document::{
3233
ParsedDocument,
@@ -289,7 +290,7 @@ impl<RT: Runtime> ExportWorker<RT> {
289290
) -> anyhow::Result<(Timestamp, ObjectKey, FunctionUsageTracker)> {
290291
tracing::info!("Beginning snapshot export...");
291292
let storage = &self.storage;
292-
let (ts, tables, by_id_indexes, system_tables, component_tree) = {
293+
let (ts, tables, component_ids_to_paths, by_id_indexes, system_tables, component_tree) = {
293294
let mut tx = self.database.begin(Identity::system()).await?;
294295
let by_id_indexes = IndexModel::new(&mut tx).by_id_indexes().await?;
295296
let component_tree = ComponentTree::new(&mut tx, component).await?;
@@ -309,6 +310,7 @@ impl<RT: Runtime> ExportWorker<RT> {
309310
)
310311
})
311312
.collect();
313+
let component_ids_to_paths = snapshot.component_ids_to_paths();
312314
let system_tables = snapshot
313315
.table_registry
314316
.iter_active_system_tables()
@@ -317,6 +319,7 @@ impl<RT: Runtime> ExportWorker<RT> {
317319
(
318320
tx.begin_timestamp(),
319321
tables,
322+
component_ids_to_paths,
320323
by_id_indexes,
321324
system_tables,
322325
component_tree,
@@ -335,6 +338,7 @@ impl<RT: Runtime> ExportWorker<RT> {
335338
writer,
336339
component_tree,
337340
tables.clone(),
341+
&component_ids_to_paths,
338342
ts,
339343
by_id_indexes,
340344
system_tables,
@@ -355,13 +359,18 @@ impl<RT: Runtime> ExportWorker<RT> {
355359
component_tree: &'a ComponentTree,
356360
zip_snapshot_upload: &'a mut ZipSnapshotUpload<'b>,
357361
tables: &'a mut BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
362+
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
358363
snapshot_ts: RepeatableTimestamp,
359364
by_id_indexes: &BTreeMap<TabletId, IndexId>,
360365
system_tables: &BTreeMap<(TableNamespace, TableName), TabletId>,
361366
include_storage: bool,
362367
usage: FunctionUsageTracker,
363368
) -> anyhow::Result<()> {
364369
let namespace: TableNamespace = component_tree.id.into();
370+
let component_path = component_ids_to_paths
371+
.get(&component_tree.id)
372+
.cloned()
373+
.unwrap_or_default();
365374
let tablet_ids: BTreeSet<_> = tables
366375
.iter()
367376
.filter(|(_, (ns, ..))| *ns == namespace)
@@ -468,6 +477,7 @@ impl<RT: Runtime> ExportWorker<RT> {
468477
.transpose()?;
469478
usage
470479
.track_storage_call(
480+
component_path.clone(),
471481
"snapshot_export",
472482
file_storage_entry.storage_id.clone(),
473483
content_type,
@@ -507,7 +517,12 @@ impl<RT: Runtime> ExportWorker<RT> {
507517

508518
// Write documents from stream to table uploads
509519
while let Some((doc, _ts)) = stream.try_next().await? {
510-
usage.track_database_egress_size(table_name.to_string(), doc.size() as u64, false);
520+
usage.track_database_egress_size(
521+
component_path.clone(),
522+
table_name.to_string(),
523+
doc.size() as u64,
524+
false,
525+
);
511526
table_upload.write(doc).await?;
512527
}
513528
table_upload.complete().await?;
@@ -525,6 +540,7 @@ impl<RT: Runtime> ExportWorker<RT> {
525540
child,
526541
zip_snapshot_upload,
527542
tables,
543+
component_ids_to_paths,
528544
snapshot_ts,
529545
by_id_indexes,
530546
system_tables,
@@ -542,6 +558,7 @@ impl<RT: Runtime> ExportWorker<RT> {
542558
mut writer: ChannelWriter,
543559
component_tree: ComponentTree,
544560
mut tables: BTreeMap<TabletId, (TableNamespace, TableNumber, TableName, TableSummary)>,
561+
component_ids_to_paths: &BTreeMap<ComponentId, ComponentPath>,
545562
snapshot_ts: RepeatableTimestamp,
546563
by_id_indexes: BTreeMap<TabletId, IndexId>,
547564
system_tables: BTreeMap<(TableNamespace, TableName), TabletId>,
@@ -555,6 +572,7 @@ impl<RT: Runtime> ExportWorker<RT> {
555572
&component_tree,
556573
&mut zip_snapshot_upload,
557574
&mut tables,
575+
component_ids_to_paths,
558576
snapshot_ts,
559577
&by_id_indexes,
560578
&system_tables,
@@ -760,7 +778,10 @@ mod tests {
760778
use anyhow::Context;
761779
use bytes::Bytes;
762780
use common::{
763-
components::ComponentId,
781+
components::{
782+
ComponentId,
783+
ComponentPath,
784+
},
764785
document::ParsedDocument,
765786
types::{
766787
ConvexOrigin,
@@ -929,9 +950,10 @@ mod tests {
929950
assert_eq!(zip_entries, expected_export_entries);
930951

931952
let usage = usage.gather_user_stats();
932-
assert!(usage.database_egress_size["table_0"] > 0);
933-
assert!(usage.database_egress_size["table_1"] > 0);
934-
assert!(usage.database_egress_size["table_2"] > 0);
953+
let component_path = ComponentPath::test_user();
954+
assert!(usage.database_egress_size[&(component_path.clone(), "table_0".to_string())] > 0);
955+
assert!(usage.database_egress_size[&(component_path.clone(), "table_1".to_string())] > 0);
956+
assert!(usage.database_egress_size[&(component_path, "table_2".to_string())] > 0);
935957

936958
Ok(())
937959
}
@@ -1000,8 +1022,9 @@ mod tests {
10001022
expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string());
10011023

10021024
let mut tx = db.begin(Identity::system()).await?;
1003-
let (_, child_component) = BootstrapComponentsModel::new(&mut tx)
1004-
.must_component_path_to_ids(&"component".parse()?)?;
1025+
let component_path = "component".parse()?;
1026+
let (_, child_component) =
1027+
BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?;
10051028

10061029
for (path_prefix, component) in [
10071030
("", ComponentId::Root),
@@ -1041,7 +1064,7 @@ mod tests {
10411064
assert_eq!(zip_entries, expected_export_entries);
10421065

10431066
let usage = usage.gather_user_stats();
1044-
assert!(usage.database_egress_size["messages"] > 0);
1067+
assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0);
10451068
Ok(())
10461069
}
10471070

@@ -1062,8 +1085,9 @@ mod tests {
10621085
expected_export_entries.insert("README.md".to_string(), README_MD_CONTENTS.to_string());
10631086

10641087
let mut tx = db.begin(Identity::system()).await?;
1065-
let (_, child_component) = BootstrapComponentsModel::new(&mut tx)
1066-
.must_component_path_to_ids(&"component".parse()?)?;
1088+
let component_path = "component".parse()?;
1089+
let (_, child_component) =
1090+
BootstrapComponentsModel::new(&mut tx).must_component_path_to_ids(&component_path)?;
10671091

10681092
// Data in root component doesn't matter.
10691093
write_test_data_in_component(&db, ComponentId::Root, "", &mut BTreeMap::new()).await?;
@@ -1100,7 +1124,7 @@ mod tests {
11001124
assert_eq!(zip_entries, expected_export_entries);
11011125

11021126
let usage = usage.gather_user_stats();
1103-
assert!(usage.database_egress_size["messages"] > 0);
1127+
assert!(usage.database_egress_size[&(component_path, "messages".to_string())] > 0);
11041128
Ok(())
11051129
}
11061130

crates/application/src/snapshot_import.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2090,6 +2090,7 @@ async fn import_storage_table<RT: Runtime>(
20902090
.transpose()?;
20912091
usage
20922092
.track_storage_call(
2093+
component_path.clone(),
20932094
"snapshot_import",
20942095
entry.storage_id,
20952096
content_type,
@@ -3516,7 +3517,7 @@ a
35163517
.await?;
35173518

35183519
let stats = usage.gather_user_stats();
3519-
assert!(stats.database_ingress_size[&table_name.to_string()] > 0);
3520+
assert!(stats.database_ingress_size[&(component_path, table_name.to_string())] > 0);
35203521
assert_eq!(stats.storage_ingress_size, 9);
35213522

35223523
Ok(())

crates/common/src/components/component_path.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl HeapSize for ComponentName {
7272
// path can potentially change when the component tree changes during a push, so
7373
// we should resolve this path to a `ComponentId` within a transaction
7474
// as soon as possible.
75-
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
75+
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Default)]
7676
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
7777
pub struct ComponentPath {
7878
path: WithHeapSize<Vec<ComponentName>>,

crates/database/src/bootstrap_model/user_facing.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
134134
.get(self.namespace, id, version)
135135
.await;
136136
if let Ok(Some((document, _))) = &result {
137+
let component_path = self
138+
.tx
139+
.must_component_path(ComponentId::from(self.namespace))?;
137140
self.tx.reads.record_read_document(
141+
component_path,
138142
table_name,
139143
document.size(),
140144
&self.tx.usage_tracker,
@@ -337,7 +341,11 @@ impl<'a, RT: Runtime> UserFacingModel<'a, RT> {
337341
.tx
338342
.virtual_system_mapping()
339343
.is_virtual_table(table_name);
344+
let component_path = self
345+
.tx
346+
.must_component_path(ComponentId::from(self.namespace))?;
340347
self.tx.reads.record_read_document(
348+
component_path,
341349
table_name.clone(),
342350
document.size(),
343351
&self.tx.usage_tracker,

crates/database/src/committer.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ use common::{
1515
TableState,
1616
TABLES_TABLE,
1717
},
18+
components::{
19+
ComponentId,
20+
ComponentPath,
21+
},
1822
document::{
1923
DocumentUpdate,
2024
ParsedDocument,
@@ -118,7 +122,9 @@ use crate::{
118122
WriteSource,
119123
},
120124
writes::DocumentWrite,
125+
ComponentRegistry,
121126
Transaction,
127+
TransactionReadSet,
122128
};
123129

124130
enum PersistenceWrite {
@@ -697,6 +703,7 @@ impl<RT: Runtime> Committer<RT> {
697703
metrics::log_write_tx(&transaction);
698704

699705
let table_mapping = transaction.table_mapping.clone();
706+
let component_registry = transaction.component_registry.clone();
700707
let usage_tracking = transaction.usage_tracker.clone();
701708
let ValidatedCommit {
702709
index_writes,
@@ -720,6 +727,7 @@ impl<RT: Runtime> Committer<RT> {
720727
&index_writes,
721728
&document_writes,
722729
&table_mapping,
730+
&component_registry,
723731
);
724732
Self::write_to_persistence(persistence, index_writes, document_writes).await?;
725733
Ok(PersistenceWrite::Commit {
@@ -743,13 +751,24 @@ impl<RT: Runtime> Committer<RT> {
743751
index_writes: &BTreeSet<(Timestamp, DatabaseIndexUpdate)>,
744752
document_writes: &Vec<ValidatedDocumentWrite>,
745753
table_mapping: &TableMapping,
754+
component_registry: &ComponentRegistry,
746755
) {
747756
for (_, index_write) in index_writes {
748757
if let DatabaseIndexValue::NonClustered(doc) = index_write.value {
749-
if let Ok(table_name) = table_mapping.tablet_name(doc.tablet_id) {
758+
let tablet_id = doc.tablet_id;
759+
let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else {
760+
continue;
761+
};
762+
let component_id = ComponentId::from(table_namespace);
763+
let component_path = component_registry
764+
.get_component_path(component_id, &mut TransactionReadSet::new())
765+
// It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root.
766+
.unwrap_or(ComponentPath::root());
767+
if let Ok(table_name) = table_mapping.tablet_name(tablet_id) {
750768
// Index metadata is never a vector
751769
// Database bandwidth for index writes
752770
usage_tracker.track_database_ingress_size(
771+
component_path,
753772
table_name.to_string(),
754773
index_write.key.size() as u64,
755774
// Exclude indexes on system tables or reserved system indexes on user
@@ -768,16 +787,27 @@ impl<RT: Runtime> Committer<RT> {
768787
} = validated_write;
769788
if let Some(document) = document {
770789
let document_write_size = document_id.size() + document.size();
771-
if let Ok(table_name) = table_mapping.tablet_name(document.id().tablet_id) {
790+
let tablet_id = document.id().tablet_id;
791+
let Ok(table_namespace) = table_mapping.tablet_namespace(tablet_id) else {
792+
continue;
793+
};
794+
let component_id = ComponentId::from(table_namespace);
795+
let component_path = component_registry
796+
.get_component_path(component_id, &mut TransactionReadSet::new())
797+
// It's possible that the component gets deleted in this transaction. In that case, miscount the usage as root.
798+
.unwrap_or(ComponentPath::root());
799+
if let Ok(table_name) = table_mapping.tablet_name(tablet_id) {
772800
// Database bandwidth for document writes
773801
if *doc_in_vector_index == DocInVectorIndex::Absent {
774802
usage_tracker.track_database_ingress_size(
803+
component_path,
775804
table_name.to_string(),
776805
document_write_size as u64,
777806
table_name.is_system(),
778807
);
779808
} else {
780809
usage_tracker.track_vector_ingress_size(
810+
component_path,
781811
table_name.to_string(),
782812
document_write_size as u64,
783813
table_name.is_system(),

crates/database/src/component_registry.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::collections::BTreeMap;
22

3+
use anyhow::Context;
34
use common::{
45
bootstrap_model::{
56
components::{
@@ -136,6 +137,27 @@ impl ComponentRegistry {
136137
Some(ComponentPath::from(path))
137138
}
138139

140+
pub fn must_component_path(
141+
&self,
142+
component_id: ComponentId,
143+
reads: &mut TransactionReadSet,
144+
) -> anyhow::Result<ComponentPath> {
145+
self.get_component_path(component_id, reads)
146+
.with_context(|| format!("Component {component_id:?} not found"))
147+
}
148+
149+
pub fn component_path_from_document_id(
150+
&self,
151+
table_mapping: &TableMapping,
152+
id: ResolvedDocumentId,
153+
reads: &mut TransactionReadSet,
154+
) -> anyhow::Result<Option<ComponentPath>> {
155+
let tablet_id = id.tablet_id;
156+
let table_namespace = table_mapping.tablet_namespace(tablet_id)?;
157+
let component_id = ComponentId::from(table_namespace);
158+
Ok(self.get_component_path(component_id, reads))
159+
}
160+
139161
pub fn all_component_paths(
140162
&self,
141163
reads: &mut TransactionReadSet,

crates/database/src/database.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2062,9 +2062,10 @@ impl<RT: Runtime> Database<RT> {
20622062
let timer = metrics::vector::vector_search_timer();
20632063
let usage = FunctionUsageTracker::new();
20642064
let snapshot = self.snapshot(ts)?;
2065+
let component_id = query.component_id;
20652066
let table_mapping = snapshot
20662067
.table_mapping()
2067-
.namespace(TableNamespace::from(query.component_id));
2068+
.namespace(TableNamespace::from(component_id));
20682069
if !table_mapping.name_exists(query.index_name.table()) {
20692070
return Ok((vec![], usage.gather_user_stats()));
20702071
}
@@ -2091,7 +2092,11 @@ impl<RT: Runtime> Database<RT> {
20912092
.map(|r| r.to_public(table_number))
20922093
.collect();
20932094
let size: u64 = results.iter().map(|row| row.size() as u64).sum();
2095+
let component_path = snapshot
2096+
.component_registry
2097+
.must_component_path(component_id, &mut TransactionReadSet::new())?;
20942098
usage.track_vector_egress_size(
2099+
component_path,
20952100
table_mapping.tablet_name(*index_name.table())?.to_string(),
20962101
size,
20972102
// We don't have system owned vector indexes.

crates/database/src/query/index_range.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
use async_trait::async_trait;
77
use common::{
88
bootstrap_model::index::database_index::IndexedFields,
9+
components::ComponentId,
910
document::DeveloperDocument,
1011
index::IndexKeyBytes,
1112
interval::Interval,
@@ -212,7 +213,9 @@ impl IndexRange {
212213
.record_read_document(&v, self.printable_index_name.table())?;
213214

214215
// Database bandwidth for index reads
216+
let component_path = tx.must_component_path(ComponentId::from(self.namespace))?;
215217
tx.usage_tracker.track_database_egress_size(
218+
component_path,
216219
self.printable_index_name.table().to_string(),
217220
index_bytes as u64,
218221
self.printable_index_name.is_system_owned(),

0 commit comments

Comments
 (0)