Skip to content

Commit a01d010

Browse files
Sujay JayakarConvex, Inc.
Sujay Jayakar
authored and
Convex, Inc.
committed
Resubmit export PRs (#28698)
GitOrigin-RevId: 6f5d9a5e47e4aa4552ccb4fd1a170f43ba118a09
1 parent a2eef7f commit a01d010

File tree

16 files changed

+525
-204
lines changed

16 files changed

+525
-204
lines changed

crates/application/src/api.rs

Lines changed: 140 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ use common::{
99
components::{
1010
CanonicalizedComponentFunctionPath,
1111
ComponentId,
12+
ComponentPath,
13+
ExportPath,
1214
},
1315
http::ResolvedHost,
1416
pause::PauseClient,
@@ -96,7 +98,25 @@ pub trait ApplicationApi: Send + Sync {
9698
auth_token: AuthenticationToken,
9799
) -> anyhow::Result<Identity>;
98100

101+
/// Execute a public query on the root app. This method is used by the sync
102+
/// worker and HTTP API for the majority of traffic as the main entry point
103+
/// for queries.
99104
async fn execute_public_query(
105+
&self,
106+
host: &ResolvedHost,
107+
request_id: RequestId,
108+
identity: Identity,
109+
path: ExportPath,
110+
args: Vec<JsonValue>,
111+
caller: FunctionCaller,
112+
ts: ExecuteQueryTimestamp,
113+
journal: Option<SerializedQueryJournal>,
114+
) -> anyhow::Result<RedactedQueryReturn>;
115+
116+
/// Execute an admin query for a particular component. This method is used
117+
/// by the sync worker for running queries for the dashboard and only works
118+
/// for admin or system identity.
119+
async fn execute_admin_query(
100120
&self,
101121
host: &ResolvedHost,
102122
request_id: RequestId,
@@ -108,44 +128,54 @@ pub trait ApplicationApi: Send + Sync {
108128
journal: Option<SerializedQueryJournal>,
109129
) -> anyhow::Result<RedactedQueryReturn>;
110130

131+
/// Execute a public mutation on the root app.
111132
async fn execute_public_mutation(
112133
&self,
113134
host: &ResolvedHost,
114135
request_id: RequestId,
115136
identity: Identity,
116-
path: CanonicalizedComponentFunctionPath,
137+
path: ExportPath,
117138
args: Vec<JsonValue>,
118139
caller: FunctionCaller,
119140
// Identifier used to make this mutation idempotent.
120141
mutation_identifier: Option<SessionRequestIdentifier>,
121142
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>;
122143

123-
async fn execute_public_action(
144+
/// Execute an admin mutation for a particular component for the dashboard.
145+
async fn execute_admin_mutation(
124146
&self,
125147
host: &ResolvedHost,
126148
request_id: RequestId,
127149
identity: Identity,
128150
path: CanonicalizedComponentFunctionPath,
129151
args: Vec<JsonValue>,
130152
caller: FunctionCaller,
131-
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
153+
mutation_identifier: Option<SessionRequestIdentifier>,
154+
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>>;
132155

133-
async fn execute_any_function(
156+
/// Execute a public action on the root app.
157+
async fn execute_public_action(
134158
&self,
135159
host: &ResolvedHost,
136160
request_id: RequestId,
137161
identity: Identity,
138-
path: CanonicalizedComponentFunctionPath,
162+
path: ExportPath,
139163
args: Vec<JsonValue>,
140164
caller: FunctionCaller,
141-
) -> anyhow::Result<Result<FunctionReturn, FunctionError>>;
165+
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
142166

143-
async fn latest_timestamp(
167+
/// Execute an admin action for a particular component for the dashboard.
168+
async fn execute_admin_action(
144169
&self,
145170
host: &ResolvedHost,
146171
request_id: RequestId,
147-
) -> anyhow::Result<RepeatableTimestamp>;
172+
identity: Identity,
173+
path: CanonicalizedComponentFunctionPath,
174+
args: Vec<JsonValue>,
175+
caller: FunctionCaller,
176+
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>>;
148177

178+
/// Execute an HTTP action on the root app.
149179
async fn execute_http_action(
150180
&self,
151181
host: &ResolvedHost,
@@ -156,6 +186,25 @@ pub trait ApplicationApi: Send + Sync {
156186
response_streamer: HttpActionResponseStreamer,
157187
) -> anyhow::Result<()>;
158188

189+
/// For the dashboard (and the CLI), run any function in any component
190+
/// without knowing its type. This function requires admin identity for
191+
/// calling functions outside the root component.
192+
async fn execute_any_function(
193+
&self,
194+
host: &ResolvedHost,
195+
request_id: RequestId,
196+
identity: Identity,
197+
path: CanonicalizedComponentFunctionPath,
198+
args: Vec<JsonValue>,
199+
caller: FunctionCaller,
200+
) -> anyhow::Result<Result<FunctionReturn, FunctionError>>;
201+
202+
async fn latest_timestamp(
203+
&self,
204+
host: &ResolvedHost,
205+
request_id: RequestId,
206+
) -> anyhow::Result<RepeatableTimestamp>;
207+
159208
async fn check_store_file_authorization(
160209
&self,
161210
host: &ResolvedHost,
@@ -226,7 +275,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
226275
_host: &ResolvedHost,
227276
request_id: RequestId,
228277
identity: Identity,
229-
path: CanonicalizedComponentFunctionPath,
278+
path: ExportPath,
230279
args: Vec<JsonValue>,
231280
caller: FunctionCaller,
232281
ts: ExecuteQueryTimestamp,
@@ -236,7 +285,34 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
236285
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
237286
"This method should not be used by internal callers."
238287
);
288+
let ts = match ts {
289+
ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(),
290+
ExecuteQueryTimestamp::At(ts) => ts,
291+
};
292+
// Public queries always start with the root component.
293+
let path = CanonicalizedComponentFunctionPath {
294+
component: ComponentPath::root(),
295+
udf_path: path.into(),
296+
};
297+
self.read_only_udf_at_ts(request_id, path, args, identity, ts, journal, caller)
298+
.await
299+
}
239300

301+
async fn execute_admin_query(
302+
&self,
303+
_host: &ResolvedHost,
304+
request_id: RequestId,
305+
identity: Identity,
306+
path: CanonicalizedComponentFunctionPath,
307+
args: Vec<JsonValue>,
308+
caller: FunctionCaller,
309+
ts: ExecuteQueryTimestamp,
310+
journal: Option<SerializedQueryJournal>,
311+
) -> anyhow::Result<RedactedQueryReturn> {
312+
anyhow::ensure!(
313+
path.component.is_root() || identity.is_admin() || identity.is_system(),
314+
"Only admin or system users can call functions on non-root components directly"
315+
);
240316
let ts = match ts {
241317
ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(),
242318
ExecuteQueryTimestamp::At(ts) => ts,
@@ -250,7 +326,7 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
250326
_host: &ResolvedHost,
251327
request_id: RequestId,
252328
identity: Identity,
253-
path: CanonicalizedComponentFunctionPath,
329+
path: ExportPath,
254330
args: Vec<JsonValue>,
255331
caller: FunctionCaller,
256332
// Identifier used to make this mutation idempotent.
@@ -260,7 +336,36 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
260336
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
261337
"This method should not be used by internal callers."
262338
);
339+
let path = CanonicalizedComponentFunctionPath {
340+
component: ComponentPath::root(),
341+
udf_path: path.into(),
342+
};
343+
self.mutation_udf(
344+
request_id,
345+
path,
346+
args,
347+
identity,
348+
mutation_identifier,
349+
caller,
350+
PauseClient::new(),
351+
)
352+
.await
353+
}
263354

355+
async fn execute_admin_mutation(
356+
&self,
357+
_host: &ResolvedHost,
358+
request_id: RequestId,
359+
identity: Identity,
360+
path: CanonicalizedComponentFunctionPath,
361+
args: Vec<JsonValue>,
362+
caller: FunctionCaller,
363+
mutation_identifier: Option<SessionRequestIdentifier>,
364+
) -> anyhow::Result<Result<RedactedMutationReturn, RedactedMutationError>> {
365+
anyhow::ensure!(
366+
path.component.is_root() || identity.is_admin() || identity.is_system(),
367+
"Only admin or system users can call functions on non-root components directly"
368+
);
264369
self.mutation_udf(
265370
request_id,
266371
path,
@@ -278,15 +383,35 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
278383
_host: &ResolvedHost,
279384
request_id: RequestId,
280385
identity: Identity,
281-
path: CanonicalizedComponentFunctionPath,
386+
path: ExportPath,
282387
args: Vec<JsonValue>,
283388
caller: FunctionCaller,
284389
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
285390
anyhow::ensure!(
286391
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
287392
"This method should not be used by internal callers."
288393
);
394+
let path = CanonicalizedComponentFunctionPath {
395+
component: ComponentPath::root(),
396+
udf_path: path.into(),
397+
};
398+
self.action_udf(request_id, path, args, identity, caller)
399+
.await
400+
}
289401

402+
async fn execute_admin_action(
403+
&self,
404+
_host: &ResolvedHost,
405+
request_id: RequestId,
406+
identity: Identity,
407+
path: CanonicalizedComponentFunctionPath,
408+
args: Vec<JsonValue>,
409+
caller: FunctionCaller,
410+
) -> anyhow::Result<Result<RedactedActionReturn, RedactedActionError>> {
411+
anyhow::ensure!(
412+
path.component.is_root() || identity.is_admin() || identity.is_system(),
413+
"Only admin or system users can call functions on non-root components directly"
414+
);
290415
self.action_udf(request_id, path, args, identity, caller)
291416
.await
292417
}
@@ -300,6 +425,10 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
300425
args: Vec<JsonValue>,
301426
caller: FunctionCaller,
302427
) -> anyhow::Result<Result<FunctionReturn, FunctionError>> {
428+
anyhow::ensure!(
429+
path.component.is_root() || identity.is_admin() || identity.is_system(),
430+
"Only admin or system users can call functions on non-root components directly"
431+
);
303432
self.any_udf(request_id, path, args, identity, caller).await
304433
}
305434

crates/application/src/application_function_runner/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
762762
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
763763
anyhow::bail!(unauthorized_error("mutation"));
764764
}
765-
let arguments = match parse_udf_args(&path, arguments) {
765+
let arguments = match parse_udf_args(&path.udf_path, arguments) {
766766
Ok(arguments) => arguments,
767767
Err(error) => {
768768
return Ok(Err(MutationError {
@@ -1032,7 +1032,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
10321032
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
10331033
anyhow::bail!(unauthorized_error("action"));
10341034
}
1035-
let arguments = match parse_udf_args(&path, arguments) {
1035+
let arguments = match parse_udf_args(&path.udf_path, arguments) {
10361036
Ok(arguments) => arguments,
10371037
Err(error) => {
10381038
return Ok(Err(ActionError {
@@ -1673,7 +1673,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
16731673
if path.udf_path.is_system() && !(identity.is_admin() || identity.is_system()) {
16741674
anyhow::bail!(unauthorized_error("query"));
16751675
}
1676-
let args = match parse_udf_args(&path, args) {
1676+
let args = match parse_udf_args(&path.udf_path, args) {
16771677
Ok(arguments) => arguments,
16781678
Err(js_error) => {
16791679
return Ok(QueryReturn {

crates/application/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2575,7 +2575,7 @@ impl<RT: Runtime> Application<RT> {
25752575
component: ComponentPath::TODO(),
25762576
udf_path: CanonicalizedUdfPath::new(module_path, function_name),
25772577
};
2578-
let arguments = parse_udf_args(&path, args)?;
2578+
let arguments = parse_udf_args(&path.udf_path, args)?;
25792579
let (result, log_lines) = match analyzed_function.udf_type {
25802580
UdfType::Query => self
25812581
.runner

crates/application/src/tests/cron_jobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn create_cron_job(
7878
};
7979
let cron_spec = CronSpec {
8080
udf_path: path.udf_path.clone(),
81-
udf_args: parse_udf_args(&path, vec![JsonValue::Object(map)])?,
81+
udf_args: parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?,
8282
cron_schedule: CronSchedule::Interval { seconds: 60 },
8383
};
8484
let original_jobs = cron_model.list().await?;

crates/application/src/tests/scheduled_jobs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ async fn create_scheduled_job<'a>(
8181
let job_id = model
8282
.schedule(
8383
path.clone(),
84-
parse_udf_args(&path, vec![JsonValue::Object(map)])?,
84+
parse_udf_args(&path.udf_path, vec![JsonValue::Object(map)])?,
8585
rt.unix_timestamp(),
8686
ExecutionContext::new_for_test(),
8787
)

crates/common/src/components/function_paths.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::str::FromStr;
2+
13
use serde::{
24
Deserialize,
35
Serialize,
@@ -113,3 +115,35 @@ impl HeapSize for CanonicalizedComponentFunctionPath {
113115
self.component.heap_size() + self.udf_path.heap_size()
114116
}
115117
}
118+
119+
pub struct ExportPath {
120+
path: CanonicalizedUdfPath,
121+
}
122+
123+
impl From<CanonicalizedUdfPath> for ExportPath {
124+
fn from(path: CanonicalizedUdfPath) -> Self {
125+
Self { path }
126+
}
127+
}
128+
129+
impl From<ExportPath> for CanonicalizedUdfPath {
130+
fn from(p: ExportPath) -> Self {
131+
p.path
132+
}
133+
}
134+
135+
impl FromStr for ExportPath {
136+
type Err = anyhow::Error;
137+
138+
fn from_str(s: &str) -> Result<Self, Self::Err> {
139+
Ok(Self {
140+
path: CanonicalizedUdfPath::from_str(s)?,
141+
})
142+
}
143+
}
144+
145+
impl From<ExportPath> for String {
146+
fn from(p: ExportPath) -> Self {
147+
p.path.to_string()
148+
}
149+
}

crates/common/src/components/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub use self::{
2222
CanonicalizedComponentFunctionPath,
2323
ComponentDefinitionFunctionPath,
2424
ComponentFunctionPath,
25+
ExportPath,
2526
},
2627
module_paths::CanonicalizedComponentModulePath,
2728
reference::Reference,

crates/isolate/src/environment/helpers/validation.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn validate_schedule_args<RT: Runtime>(
9393
}
9494

9595
// We do serialize the arguments, so this is likely our fault.
96-
let udf_args = parse_udf_args(&path, udf_args)?;
96+
let udf_args = parse_udf_args(&path.udf_path, udf_args)?;
9797

9898
// Even though we might use different version of modules when executing,
9999
// we do validate that the scheduled function exists at time of scheduling.
@@ -435,7 +435,7 @@ impl ValidatedPathAndArgs {
435435
))));
436436
}
437437

438-
match validate_udf_args_size(&path, &args) {
438+
match validate_udf_args_size(&path.udf_path, &args) {
439439
Ok(()) => (),
440440
Err(err) => return Ok(Err(err)),
441441
}

0 commit comments

Comments
 (0)