Skip to content

Commit 4c52b74

Browse files
Sujay JayakarConvex, Inc.
Sujay Jayakar
authored and
Convex, Inc.
committed
Wire up queries to isolate2 (#24784)
GitOrigin-RevId: 10d1b67cdd8ff9ba38822d8563ae17beb0af1382
1 parent bc76986 commit 4c52b74

File tree

10 files changed

+776
-541
lines changed

10 files changed

+776
-541
lines changed

crates/isolate/src/environment/udf/async_syscall.rs

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use common::{
2323
},
2424
types::PersistenceVersion,
2525
value::ConvexValue,
26+
version::Version,
2627
};
2728
use database::{
2829
query::{
@@ -76,6 +77,7 @@ use crate::{
7677
ArgName,
7778
},
7879
helpers::UdfArgsJson,
80+
isolate2::client::QueryId,
7981
metrics::async_syscall_timer,
8082
};
8183

@@ -214,6 +216,14 @@ impl<RT: Runtime> QueryManager<RT> {
214216
}
215217
}
216218

219+
pub enum ManagedQuery<RT: Runtime> {
220+
Pending {
221+
query: Query,
222+
version: Option<Version>,
223+
},
224+
Active(DeveloperQuery<RT>),
225+
}
226+
217227
// Trait for allowing code reuse between `DatabaseUdfEnvironment` and isolate2.
218228
#[allow(async_fn_in_trait)]
219229
pub trait AsyncSyscallProvider<RT: Runtime> {
@@ -229,7 +239,9 @@ pub trait AsyncSyscallProvider<RT: Runtime> {
229239

230240
fn log_async_syscall(&mut self, name: String, duration: Duration, is_success: bool);
231241

232-
fn query_manager(&mut self) -> &mut QueryManager<RT>;
242+
fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>>;
243+
fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>);
244+
fn cleanup_query(&mut self, query_id: QueryId) -> bool;
233245

234246
fn prev_journal(&mut self) -> &mut QueryJournal;
235247
fn next_journal(&mut self) -> &mut QueryJournal;
@@ -291,8 +303,18 @@ impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
291303
.log_async_syscall(name, duration, is_success);
292304
}
293305

294-
fn query_manager(&mut self) -> &mut QueryManager<RT> {
295-
&mut self.query_manager
306+
fn take_query(&mut self, query_id: QueryId) -> Option<ManagedQuery<RT>> {
307+
self.query_manager
308+
.take_developer(query_id)
309+
.map(ManagedQuery::Active)
310+
}
311+
312+
fn insert_query(&mut self, query_id: QueryId, query: DeveloperQuery<RT>) {
313+
self.query_manager.insert_developer(query_id, query);
314+
}
315+
316+
fn cleanup_query(&mut self, query_id: QueryId) -> bool {
317+
self.query_manager.cleanup_developer(query_id)
296318
}
297319

298320
fn prev_journal(&mut self) -> &mut QueryJournal {
@@ -707,13 +729,24 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
707729
let args: QueryStreamNextArgs = serde_json::from_value(args)?;
708730
Ok(args.query_id)
709731
})?;
710-
let local_query = provider
711-
.query_manager()
712-
.take_developer(query_id)
713-
.context(ErrorMetadata::not_found(
714-
"QueryNotFound",
715-
"in-progress query not found",
716-
))?;
732+
let managed_query =
733+
provider
734+
.take_query(query_id)
735+
.context(ErrorMetadata::not_found(
736+
"QueryNotFound",
737+
"in-progress query not found",
738+
))?;
739+
let local_query = match managed_query {
740+
ManagedQuery::Pending { query, version } => {
741+
DeveloperQuery::new_with_version(
742+
provider.tx()?,
743+
query,
744+
version,
745+
table_filter,
746+
)?
747+
},
748+
ManagedQuery::Active(local_query) => local_query,
749+
};
717750
Some((Some(query_id), local_query))
718751
},
719752
AsyncRead::Get(args) => {
@@ -790,9 +823,7 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
790823
for (batch_key, (query_id, local_query)) in queries_to_fetch {
791824
let result: anyhow::Result<_> = try {
792825
if let Some(query_id) = query_id {
793-
provider
794-
.query_manager()
795-
.insert_developer(query_id, local_query);
826+
provider.insert_query(query_id, local_query);
796827
}
797828
let maybe_next = fetch_results
798829
.remove(&batch_key)
@@ -806,7 +837,7 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
806837

807838
if let Some(query_id) = query_id {
808839
if done {
809-
provider.query_manager().cleanup_developer(query_id);
840+
provider.cleanup_query(query_id);
810841
}
811842
serde_json::to_value(QueryStreamNextResult {
812843
value: value.into(),

crates/isolate/src/environment/udf/syscall.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use common::{
55
query::Query,
66
runtime::Runtime,
77
static_span,
8+
version::Version,
89
};
910
use database::{
1011
query::TableFilter,
1112
DeveloperQuery,
12-
Transaction,
1313
};
1414
use errors::ErrorMetadata;
1515
use serde::{
@@ -23,13 +23,12 @@ use serde_json::{
2323
use value::{
2424
id_v6::DocumentIdV6,
2525
InternalId,
26+
TableIdAndTableNumber,
2627
TableName,
28+
TableNumber,
2729
};
2830

29-
use super::{
30-
async_syscall::QueryManager,
31-
DatabaseUdfEnvironment,
32-
};
31+
use super::DatabaseUdfEnvironment;
3332
use crate::environment::helpers::{
3433
parse_version,
3534
with_argument_error,
@@ -38,8 +37,12 @@ use crate::environment::helpers::{
3837

3938
pub trait SyscallProvider<RT: Runtime> {
4039
fn table_filter(&self) -> TableFilter;
41-
fn query_manager(&mut self) -> &mut QueryManager<RT>;
42-
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata>;
40+
41+
fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableIdAndTableNumber>>;
42+
fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>>;
43+
44+
fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32>;
45+
fn cleanup_query(&mut self, query_id: u32) -> bool;
4346
}
4447

4548
impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
@@ -51,12 +54,29 @@ impl<RT: Runtime> SyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
5154
}
5255
}
5356

54-
fn query_manager(&mut self) -> &mut QueryManager<RT> {
55-
&mut self.query_manager
57+
fn lookup_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableIdAndTableNumber>> {
58+
let table_mapping = self.phase.tx()?.table_mapping();
59+
Ok(table_mapping.id_and_number_if_exists(name))
60+
}
61+
62+
fn lookup_virtual_table(&mut self, name: &TableName) -> anyhow::Result<Option<TableNumber>> {
63+
let virtual_table_mapping = self.phase.tx()?.virtual_table_mapping();
64+
Ok(virtual_table_mapping.number_if_exists(name))
65+
}
66+
67+
fn start_query(&mut self, query: Query, version: Option<Version>) -> anyhow::Result<u32> {
68+
let table_filter = self.table_filter();
69+
let tx = self.phase.tx()?;
70+
// TODO: Are all invalid query pipelines developer errors? These could be bugs
71+
// in convex/server.
72+
let compiled_query =
73+
{ DeveloperQuery::new_with_version(tx, query, version, table_filter)? };
74+
let query_id = self.query_manager.put_developer(compiled_query);
75+
Ok(query_id)
5676
}
5777

58-
fn tx(&mut self) -> Result<&mut Transaction<RT>, ErrorMetadata> {
59-
self.phase.tx()
78+
fn cleanup_query(&mut self, query_id: u32) -> bool {
79+
self.query_manager.cleanup_developer(query_id)
6080
}
6181
}
6282

@@ -111,18 +131,11 @@ fn syscall_normalize_id<RT: Runtime, P: SyscallProvider<RT>>(
111131
let table_name: TableName = args.table.parse().context(ArgName("table"))?;
112132
Ok((table_name, args.id_string))
113133
})?;
114-
let virtual_table_number = provider
115-
.tx()?
116-
.virtual_table_mapping()
117-
.number_if_exists(&table_name);
134+
let virtual_table_number = provider.lookup_virtual_table(&table_name)?;
118135
let table_number = match virtual_table_number {
119136
Some(table_number) => Some(table_number),
120137
None => {
121-
let physical_table_number = provider
122-
.tx()?
123-
.table_mapping()
124-
.id_and_number_if_exists(&table_name)
125-
.map(|t| t.table_number);
138+
let physical_table_number = provider.lookup_table(&table_name)?.map(|t| t.table_number);
126139
match provider.table_filter() {
127140
TableFilter::IncludePrivateSystemTables => physical_table_number,
128141
TableFilter::ExcludePrivateSystemTables if table_name.is_system() => None,
@@ -155,9 +168,7 @@ fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
155168
provider: &mut P,
156169
args: JsonValue,
157170
) -> anyhow::Result<JsonValue> {
158-
let _s: common::tracing::NoopSpan = static_span!();
159-
let table_filter = provider.table_filter();
160-
let tx = provider.tx()?;
171+
let _s = static_span!();
161172

162173
#[derive(Deserialize)]
163174
struct QueryStreamArgs {
@@ -170,11 +181,7 @@ fn syscall_query_stream<RT: Runtime, P: SyscallProvider<RT>>(
170181
let version = parse_version(args.version)?;
171182
Ok((parsed_query, version))
172183
})?;
173-
// TODO: Are all invalid query pipelines developer errors? These could be bugs
174-
// in convex/server.
175-
let compiled_query =
176-
{ DeveloperQuery::new_with_version(tx, parsed_query, version, table_filter)? };
177-
let query_id = provider.query_manager().put_developer(compiled_query);
184+
let query_id = provider.start_query(parsed_query, version)?;
178185

179186
#[derive(Serialize)]
180187
#[serde(rename_all = "camelCase")]
@@ -197,6 +204,6 @@ fn syscall_query_cleanup<RT: Runtime, P: SyscallProvider<RT>>(
197204
}
198205
let args: QueryCleanupArgs =
199206
with_argument_error("queryCleanup", || Ok(serde_json::from_value(args)?))?;
200-
let cleaned_up = provider.query_manager().cleanup_developer(args.query_id);
207+
let cleaned_up = provider.cleanup_query(args.query_id);
201208
Ok(serde_json::to_value(cleaned_up)?)
202209
}

0 commit comments

Comments
 (0)