Skip to content

Commit 0364e93

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
parallel storage.getUrl (#24821)
allow `ctx.storage.getUrl` in queries and mutations to run in parallel with itself, as a batch. note this removes a couple of the sanity checks, because i don't think they're necessary and with batching they're harder: (1) removes the assertion that there's at most one stored file with a given storage id, (2) removes the assertion that `entry.storage_id == storage_id` when `entry` was fetched with an equality index filter on `storage_id` i wanted to make it also run in parallel with `db.get` and `db.query` but that turns out to be tricky because those need to use `DeveloperQuery` while file storage urls needs to use `ResolvedQuery`. GitOrigin-RevId: 41e686efe0b36c60d9e91a99a5cfc8bc4d3f56d3
1 parent fd86561 commit 0364e93

File tree

11 files changed

+324
-115
lines changed

11 files changed

+324
-115
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/file_storage/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ errors = { path = "../errors" }
1717
futures = { workspace = true }
1818
headers = { workspace = true }
1919
keybroker = { path = "../keybroker" }
20+
maplit = { workspace = true }
2021
metrics = { path = "../metrics" }
2122
mime = { workspace = true }
2223
model = { path = "../model" }

crates/file_storage/src/core.rs

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::{
2+
collections::BTreeMap,
23
ops::Bound,
34
sync::Arc,
45
};
@@ -32,12 +33,14 @@ use keybroker::{
3233
Identity,
3334
KeyBroker,
3435
};
36+
use maplit::btreemap;
3537
use mime::Mime;
3638
use model::file_storage::{
3739
types::{
3840
FileStorageEntry,
3941
StorageUuid,
4042
},
43+
BatchKey,
4144
FileStorageId,
4245
FileStorageModel,
4346
};
@@ -92,15 +95,30 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
9295
tx: &mut Transaction<RT>,
9396
storage_id: FileStorageId,
9497
) -> anyhow::Result<Option<String>> {
98+
self.get_url_batch(tx, btreemap! { 0 => storage_id })
99+
.await
100+
.remove(&0)
101+
.context("batch_key missing")?
102+
}
103+
104+
pub async fn get_url_batch(
105+
&self,
106+
tx: &mut Transaction<RT>,
107+
storage_ids: BTreeMap<BatchKey, FileStorageId>,
108+
) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> {
95109
let origin = &self.convex_origin;
96-
let file: Option<FileStorageEntry> = self.get_file_entry(tx, storage_id.clone()).await?;
97-
Ok(match file {
98-
Some(entry) => {
99-
let storage_id = entry.storage_id;
100-
Some(format!("{origin}/api/storage/{storage_id}"))
101-
},
102-
None => None,
103-
})
110+
let files = self.get_file_entry_batch(tx, storage_ids).await;
111+
files
112+
.into_iter()
113+
.map(|(batch_key, result)| {
114+
(
115+
batch_key,
116+
result.map(|file| {
117+
file.map(|entry| format!("{origin}/api/storage/{}", entry.storage_id))
118+
}),
119+
)
120+
})
121+
.collect()
104122
}
105123

106124
pub async fn delete(
@@ -123,7 +141,23 @@ impl<RT: Runtime> TransactionalFileStorage<RT> {
123141
tx: &mut Transaction<RT>,
124142
storage_id: FileStorageId,
125143
) -> anyhow::Result<Option<FileStorageEntry>> {
126-
FileStorageModel::new(tx).get_file(storage_id).await
144+
self.get_file_entry_batch(tx, btreemap! { 0 => storage_id })
145+
.await
146+
.remove(&0)
147+
.context("batch_key missing")?
148+
}
149+
150+
pub async fn get_file_entry_batch(
151+
&self,
152+
tx: &mut Transaction<RT>,
153+
storage_ids: BTreeMap<BatchKey, FileStorageId>,
154+
) -> BTreeMap<BatchKey, anyhow::Result<Option<FileStorageEntry>>> {
155+
FileStorageModel::new(tx)
156+
.get_file_batch(storage_ids)
157+
.await
158+
.into_iter()
159+
.map(|(batch_key, result)| (batch_key, result.map(|r| r.map(|r| r.into_value()))))
160+
.collect()
127161
}
128162

129163
pub async fn get_file_stream(

crates/isolate/src/environment/udf/async_syscall.rs

Lines changed: 61 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use keybroker::KeyBroker;
4848
use model::{
4949
file_storage::{
5050
types::FileStorageEntry,
51+
BatchKey,
5152
FileStorageId,
5253
},
5354
scheduled_jobs::VirtualSchedulerModel,
@@ -122,6 +123,7 @@ pub fn system_table_guard(name: &TableName, expect_system_table: bool) -> anyhow
122123
#[derive(Debug)]
123124
pub enum AsyncSyscallBatch {
124125
Reads(Vec<AsyncRead>),
126+
StorageGetUrls(Vec<JsonValue>),
125127
Unbatched { name: String, args: JsonValue },
126128
}
127129

@@ -136,6 +138,7 @@ impl AsyncSyscallBatch {
136138
match &*name {
137139
"1.0/get" => Self::Reads(vec![AsyncRead::Get(args)]),
138140
"1.0/queryStreamNext" => Self::Reads(vec![AsyncRead::QueryStreamNext(args)]),
141+
"1.0/storageGetUrl" => Self::StorageGetUrls(vec![args]),
139142
_ => Self::Unbatched { name, args },
140143
}
141144
}
@@ -148,6 +151,8 @@ impl AsyncSyscallBatch {
148151
(Self::Reads(_), "1.0/get") => true,
149152
(Self::Reads(_), "1.0/queryStreamNext") => true,
150153
(Self::Reads(_), _) => false,
154+
(Self::StorageGetUrls(_), "1.0/storageGetUrl") => true,
155+
(Self::StorageGetUrls(_), _) => false,
151156
(Self::Unbatched { .. }, _) => false,
152157
}
153158
}
@@ -158,6 +163,9 @@ impl AsyncSyscallBatch {
158163
(Self::Reads(batch_args), "1.0/queryStreamNext") => {
159164
batch_args.push(AsyncRead::QueryStreamNext(args))
160165
},
166+
(Self::StorageGetUrls(batch_args), "1.0/storageGetUrl") => {
167+
batch_args.push(args);
168+
},
161169
_ => anyhow::bail!("cannot push {name} onto {self:?}"),
162170
}
163171
Ok(())
@@ -167,13 +175,15 @@ impl AsyncSyscallBatch {
167175
match self {
168176
// 1.0/get is grouped in with 1.0/queryStreamNext.
169177
Self::Reads(_) => "1.0/queryStreamNext",
178+
Self::StorageGetUrls(_) => "1.0/storageGetUrl",
170179
Self::Unbatched { name, .. } => name,
171180
}
172181
}
173182

174183
pub fn len(&self) -> usize {
175184
match self {
176185
Self::Reads(args) => args.len(),
186+
Self::StorageGetUrls(args) => args.len(),
177187
Self::Unbatched { .. } => 1,
178188
}
179189
}
@@ -254,10 +264,10 @@ pub trait AsyncSyscallProvider<RT: Runtime> {
254264
) -> anyhow::Result<(UdfPath, ConvexArray)>;
255265

256266
fn file_storage_generate_upload_url(&self) -> anyhow::Result<String>;
257-
async fn file_storage_get_url(
267+
async fn file_storage_get_url_batch(
258268
&mut self,
259-
storage_id: FileStorageId,
260-
) -> anyhow::Result<Option<String>>;
269+
storage_ids: BTreeMap<BatchKey, FileStorageId>,
270+
) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>>;
261271
async fn file_storage_delete(&mut self, storage_id: FileStorageId) -> anyhow::Result<()>;
262272
async fn file_storage_get_entry(
263273
&mut self,
@@ -350,13 +360,20 @@ impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
350360
Ok(post_url)
351361
}
352362

353-
async fn file_storage_get_url(
363+
async fn file_storage_get_url_batch(
354364
&mut self,
355-
storage_id: FileStorageId,
356-
) -> anyhow::Result<Option<String>> {
357-
self.file_storage
358-
.get_url(self.phase.tx()?, storage_id)
359-
.await
365+
storage_ids: BTreeMap<BatchKey, FileStorageId>,
366+
) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> {
367+
let tx = match self.phase.tx() {
368+
Ok(tx) => tx,
369+
Err(e) => {
370+
return storage_ids
371+
.into_keys()
372+
.map(|batch_key| (batch_key, Err(e.clone().into())))
373+
.collect();
374+
},
375+
};
376+
self.file_storage.get_url_batch(tx, storage_ids).await
360377
}
361378

362379
async fn file_storage_delete(&mut self, storage_id: FileStorageId) -> anyhow::Result<()> {
@@ -389,7 +406,7 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
389406
pub async fn run_async_syscall_batch(
390407
provider: &mut P,
391408
batch: AsyncSyscallBatch,
392-
) -> anyhow::Result<Vec<anyhow::Result<String>>> {
409+
) -> Vec<anyhow::Result<String>> {
393410
let start = provider.rt().monotonic_now();
394411
let batch_name = batch.name().to_string();
395412
let timer = async_syscall_timer(&batch_name);
@@ -398,6 +415,9 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
398415
// errors.
399416
let results = match batch {
400417
AsyncSyscallBatch::Reads(batch_args) => Self::query_batch(provider, batch_args).await,
418+
AsyncSyscallBatch::StorageGetUrls(batch_args) => {
419+
Self::storage_get_url_batch(provider, batch_args).await
420+
},
401421
AsyncSyscallBatch::Unbatched { name, args } => {
402422
let result = match &name[..] {
403423
// Database
@@ -415,7 +435,6 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
415435
"1.0/storageGenerateUploadUrl" => {
416436
Self::storage_generate_upload_url(provider, args).await
417437
},
418-
"1.0/storageGetUrl" => Self::storage_get_url(provider, args).await,
419438
// Scheduling
420439
"1.0/schedule" => Self::schedule(provider, args).await,
421440
"1.0/cancel_job" => Self::cancel_job(provider, args).await,
@@ -444,10 +463,10 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
444463
results.iter().all(|result| result.is_ok()),
445464
);
446465
timer.finish();
447-
Ok(results
466+
results
448467
.into_iter()
449468
.map(|result| anyhow::Ok(serde_json::to_string(&result?)?))
450-
.collect())
469+
.collect()
451470
}
452471

453472
#[convex_macro::instrument_future]
@@ -493,18 +512,40 @@ impl<RT: Runtime, P: AsyncSyscallProvider<RT>> DatabaseSyscallsV1<RT, P> {
493512
}
494513

495514
#[convex_macro::instrument_future]
496-
async fn storage_get_url(provider: &mut P, args: JsonValue) -> anyhow::Result<JsonValue> {
515+
async fn storage_get_url_batch(
516+
provider: &mut P,
517+
batch_args: Vec<JsonValue>,
518+
) -> Vec<anyhow::Result<JsonValue>> {
497519
#[derive(Deserialize)]
498520
#[serde(rename_all = "camelCase")]
499521
struct GetUrlArgs {
500522
storage_id: String,
501523
}
502-
let storage_id: FileStorageId = with_argument_error("storage.getUrl", || {
503-
let GetUrlArgs { storage_id } = serde_json::from_value(args)?;
504-
storage_id.parse().context(ArgName("storageId"))
505-
})?;
506-
let url = provider.file_storage_get_url(storage_id).await?;
507-
Ok(url.into())
524+
let batch_size = batch_args.len();
525+
let mut results = BTreeMap::new();
526+
let mut storage_ids = BTreeMap::new();
527+
for (idx, args) in batch_args.into_iter().enumerate() {
528+
let storage_id_result = with_argument_error("storage.getUrl", || {
529+
let GetUrlArgs { storage_id } = serde_json::from_value(args)?;
530+
storage_id.parse().context(ArgName("storageId"))
531+
});
532+
match storage_id_result {
533+
Ok(storage_id) => {
534+
storage_ids.insert(idx, storage_id);
535+
},
536+
Err(e) => {
537+
assert!(results.insert(idx, Err(e)).is_none());
538+
},
539+
}
540+
}
541+
let urls = provider.file_storage_get_url_batch(storage_ids).await;
542+
for (batch_key, url) in urls {
543+
assert!(results
544+
.insert(batch_key, url.map(JsonValue::from))
545+
.is_none());
546+
}
547+
assert_eq!(results.len(), batch_size);
548+
results.into_values().collect()
508549
}
509550

510551
#[convex_macro::instrument_future]

crates/isolate/src/environment/udf/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,9 @@ impl<RT: Runtime> DatabaseUdfEnvironment<RT> {
718718
results = with_release_permit(
719719
&mut state.timeout,
720720
&mut state.permit,
721-
DatabaseSyscallsV1::run_async_syscall_batch(&mut state.environment, batch),
721+
DatabaseSyscallsV1::run_async_syscall_batch(
722+
&mut state.environment, batch,
723+
).map(Ok),
722724
).fuse() => results?,
723725
};
724726
(resolvers, results)

crates/isolate/src/isolate2/runner.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use model::{
4848
},
4949
file_storage::{
5050
types::FileStorageEntry,
51+
BatchKey,
5152
FileStorageId,
5253
},
5354
udf_config::UdfConfigModel,
@@ -581,7 +582,7 @@ async fn run_request<RT: Runtime>(
581582

582583
if let Some(batch) = syscall_batch.take() {
583584
let results =
584-
DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await?;
585+
DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await;
585586
assert_eq!(results.len(), batch_promise_ids.len());
586587

587588
for (promise_id, result) in batch_promise_ids.drain(..).zip(results) {
@@ -597,7 +598,7 @@ async fn run_request<RT: Runtime>(
597598
}
598599
if let Some(batch) = syscall_batch {
599600
let results =
600-
DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await?;
601+
DatabaseSyscallsV1::run_async_syscall_batch(&mut provider, batch).await;
601602
assert_eq!(results.len(), batch_promise_ids.len());
602603

603604
for (promise_id, result) in batch_promise_ids.into_iter().zip(results) {
@@ -882,10 +883,10 @@ impl<'a, RT: Runtime> AsyncSyscallProvider<RT> for Isolate2SyscallProvider<'a, R
882883
todo!()
883884
}
884885

885-
async fn file_storage_get_url(
886+
async fn file_storage_get_url_batch(
886887
&mut self,
887-
_storage_id: FileStorageId,
888-
) -> anyhow::Result<Option<String>> {
888+
_storage_ids: BTreeMap<BatchKey, FileStorageId>,
889+
) -> BTreeMap<BatchKey, anyhow::Result<Option<String>>> {
889890
todo!()
890891
}
891892

crates/isolate/src/tests/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ mod search;
2828
mod shapes;
2929
mod size_errors;
3030
mod source_maps;
31+
mod storage;
3132
mod system_udfs;
3233
mod unicode;
3334
mod user_error;

0 commit comments

Comments
 (0)