Skip to content

Commit b8c88ce

Browse files
NicolappsConvex, Inc.
authored and
Convex, Inc.
committed
Streaming export: stop using the legacy cursor format for list_snapshot (#37219)
This fixes a bug where streaming export would fail with an error message (_cannot find table with id 10001_) or never complete when exporting instances with components. ### What was happening? `list_snapshot` uses a cursor that is essentially the last document that was seen. Its table number is ambiguous without knowing the component of the document. When we added component support for streaming export, we added a new cursor format that contains a tablet ID for disambiguation. However, the part of the code that sends these cursors to the connector was still using the legacy format. This would cause cursors to always be interpreted as being in the root component. ### Deployment Even though this PR technically introduces an API breaking change (because it drops support for the legacy cursor format), the worst scenario that can happen is the conductor deployment happening between several `list_snapshot` calls. In this case, the user can restart the operation and it will work. The connectors simply treat `cursor` as an arbitrary string and don’t need updates. Also, only new Fivetran/Airbyte exports (or full resyncs of them) call `list_snapshot`. ### Testing - I tested manually the new behavior by hitting the HTTP endpoints and following the cursors I received. - I also added a new `application` unit test. This test used to fail when the cursor being passed didn’t contain the tablet ID, which is the previous behavior we had when using the legacy format. The test used to fail with an invalid table number error or infinite loop depending on the order of the tablet IDs, which matches the behavior we had in production repros of this bug. - This code path is also tested by a smoke test, that I’ve fixed in #37218 GitOrigin-RevId: 0f0c0ea511d5cb07d0a9f4615305f3b07ac2312b
1 parent 4111bd3 commit b8c88ce

File tree

5 files changed

+100
-19
lines changed

5 files changed

+100
-19
lines changed

crates/application/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ use value::{
357357
Namespace,
358358
ResolvedDocumentId,
359359
TableNamespace,
360-
TabletId,
361360
};
362361
use vector::{
363362
PublicVectorSearchQueryResult,
@@ -925,7 +924,7 @@ impl<RT: Runtime> Application<RT> {
925924
&self,
926925
identity: Identity,
927926
snapshot: Option<Timestamp>,
928-
cursor: Option<(Option<TabletId>, DeveloperDocumentId)>,
927+
cursor: Option<ResolvedDocumentId>,
929928
table_filter: Option<TableName>,
930929
component_filter: Option<ComponentPath>,
931930
) -> anyhow::Result<SnapshotPage> {

crates/application/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ mod scheduled_jobs;
1717
mod schema;
1818
mod source_package;
1919
mod storage;
20+
mod streaming_export;
2021

2122
const NODE_SOURCE: &str = r#"
2223
var nodeFunction = () => {};
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use common::{
2+
assert_obj,
3+
components::ComponentPath,
4+
runtime::Runtime,
5+
};
6+
use convex_macro::test_runtime;
7+
use database::StreamingExportTableFilter;
8+
use keybroker::Identity;
9+
use runtime::testing::TestRuntime;
10+
use value::{
11+
TableName,
12+
TableNamespace,
13+
};
14+
15+
use crate::{
16+
test_helpers::ApplicationTestExt,
17+
Application,
18+
BootstrapComponentsModel,
19+
UserFacingModel,
20+
};
21+
22+
pub fn table_name() -> TableName {
23+
"table1".parse().unwrap()
24+
}
25+
26+
#[test_runtime]
27+
async fn test_streaming_export_from_component(rt: TestRuntime) -> anyhow::Result<()> {
28+
let application = Application::new_for_tests(&rt).await?;
29+
30+
application.load_component_tests_modules("mounted").await?;
31+
let component_path = "component".parse()?;
32+
33+
// Insert documents in the component
34+
let mut tx = application.begin(Identity::system()).await?;
35+
insert_documents(&mut tx, component_path, &table_name()).await?;
36+
application.commit_test(tx).await?;
37+
38+
// Get all documents through list_snapshot
39+
let mut seen_documents = Vec::new();
40+
let mut snapshot = None;
41+
let mut cursor = None;
42+
43+
loop {
44+
let snapshot_page = application
45+
.database
46+
.list_snapshot(
47+
Identity::system(),
48+
snapshot,
49+
cursor,
50+
StreamingExportTableFilter::default(),
51+
5,
52+
5,
53+
)
54+
.await?;
55+
56+
snapshot = Some(snapshot_page.snapshot);
57+
cursor = snapshot_page.cursor;
58+
59+
for (_, _, _, doc) in snapshot_page.documents {
60+
seen_documents.push(doc);
61+
}
62+
63+
if !snapshot_page.has_more {
64+
break;
65+
}
66+
}
67+
68+
// Verify we got all documents from both components
69+
assert_eq!(seen_documents.len(), 10);
70+
71+
Ok(())
72+
}
73+
74+
async fn insert_documents<RT: Runtime>(
75+
tx: &mut crate::Transaction<RT>,
76+
component_path: ComponentPath,
77+
table_name: &TableName,
78+
) -> anyhow::Result<()> {
79+
let mut components_model = BootstrapComponentsModel::new(tx);
80+
let (_, component_id) = components_model.must_component_path_to_ids(&component_path)?;
81+
let table_namespace = TableNamespace::from(component_id);
82+
let mut user_facing_model = UserFacingModel::new(tx, table_namespace);
83+
84+
for i in 0..10 {
85+
user_facing_model
86+
.insert(table_name.clone(), assert_obj!("index" => i))
87+
.await?;
88+
}
89+
90+
Ok(())
91+
}

crates/database/src/database.rs

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ struct ListSnapshotTableIteratorCacheEntry {
287287
snapshot: Timestamp,
288288
tablet_id: TabletId,
289289
by_id: IndexId,
290-
resolved_cursor: Option<ResolvedDocumentId>,
290+
cursor: Option<ResolvedDocumentId>,
291291
}
292292

293293
#[derive(Clone)]
@@ -1719,7 +1719,7 @@ impl<RT: Runtime> Database<RT> {
17191719
&self,
17201720
identity: Identity,
17211721
snapshot: Option<Timestamp>,
1722-
cursor: Option<(Option<TabletId>, DeveloperDocumentId)>,
1722+
cursor: Option<ResolvedDocumentId>,
17231723
table_filter: StreamingExportTableFilter,
17241724
rows_read_limit: usize,
17251725
rows_returned_limit: usize,
@@ -1752,16 +1752,6 @@ impl<RT: Runtime> Database<RT> {
17521752
let table_mapping = self.snapshot_table_mapping(snapshot).await?;
17531753
let by_id_indexes = self.snapshot_by_id_indexes(snapshot).await?;
17541754
let component_paths = self.snapshot_component_paths(snapshot).await?;
1755-
let resolved_cursor = cursor
1756-
.map(|(tablet, developer_id)| match tablet {
1757-
Some(tablet_id) => Ok(ResolvedDocumentId::new(tablet_id, developer_id)),
1758-
None => developer_id.to_resolved(
1759-
table_mapping
1760-
.namespace(TableNamespace::by_component_TODO())
1761-
.number_to_tablet(),
1762-
),
1763-
})
1764-
.transpose()?;
17651755
let tablet_ids: BTreeSet<_> = table_mapping
17661756
.iter()
17671757
.map(|(tablet_id, ..)| tablet_id)
@@ -1771,7 +1761,7 @@ impl<RT: Runtime> Database<RT> {
17711761
*tablet_id,
17721762
&table_mapping,
17731763
&component_paths,
1774-
) && resolved_cursor
1764+
) && cursor
17751765
.as_ref()
17761766
.map(|c| *tablet_id >= c.tablet_id)
17771767
.unwrap_or(true)
@@ -1798,7 +1788,7 @@ impl<RT: Runtime> Database<RT> {
17981788
snapshot: *snapshot,
17991789
tablet_id,
18001790
by_id,
1801-
resolved_cursor,
1791+
cursor,
18021792
};
18031793
if let Some((cache_key, _ds)) = &*cached
18041794
&& *cache_key == expected_cache_key
@@ -1808,7 +1798,7 @@ impl<RT: Runtime> Database<RT> {
18081798
} else {
18091799
let table_iterator = self.table_iterator(snapshot, 100);
18101800
table_iterator
1811-
.stream_documents_in_table(tablet_id, by_id, resolved_cursor)
1801+
.stream_documents_in_table(tablet_id, by_id, cursor)
18121802
.boxed()
18131803
}
18141804
};
@@ -1854,7 +1844,7 @@ impl<RT: Runtime> Database<RT> {
18541844
snapshot: *snapshot,
18551845
tablet_id,
18561846
by_id,
1857-
resolved_cursor: Some(new_cursor),
1847+
cursor: Some(new_cursor),
18581848
};
18591849
*self.list_snapshot_table_iterator_cache.lock() =
18601850
Some((new_cache_key, document_stream));

crates/database/src/tests/streaming_export_tests.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -327,7 +327,7 @@ async fn test_snapshot_list(rt: TestRuntime) -> anyhow::Result<()> {
327327
.list_snapshot(
328328
Identity::system(),
329329
snapshot,
330-
cursor.map(|c| (Some(c.tablet_id), c.developer_id)),
330+
cursor,
331331
StreamingExportTableFilter {
332332
table_name: table_filter.clone(),
333333
..Default::default()

0 commit comments

Comments
 (0)