Skip to content

Commit 01eb43a

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
running functions use ComponentId, not path (#28872)
When a function is ready to run, it should run with a ComponentId, not a ComponentPath. ComponentPath is a way of referring to a mounted component, and sometimes we want to run functions in unmounted components. In particular there's now a way for system UDFs to request running such a function: `PublicFunctionPath::ResolvedComponent`. in addition to existing tests, I manually tested the dashboard and made sure system UDFs components still work. GitOrigin-RevId: 48074ae3ea3b71dd9be74a8c7f92843735ce1da6
1 parent f7cc6b1 commit 01eb43a

File tree

28 files changed

+353
-247
lines changed

28 files changed

+353
-247
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use common::{
2323
},
2424
components::{
2525
CanonicalizedComponentFunctionPath,
26+
CanonicalizedComponentModulePath,
2627
ComponentDefinitionPath,
2728
ComponentId,
2829
ComponentName,
@@ -76,7 +77,6 @@ use common::{
7677
};
7778
use database::{
7879
unauthorized_error,
79-
BootstrapComponentsModel,
8080
Database,
8181
Token,
8282
Transaction,
@@ -1023,9 +1023,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
10231023
FunctionOutcome::Mutation(o) => o,
10241024
_ => anyhow::bail!("Received non-mutation outcome for mutation"),
10251025
};
1026-
let (_, component) = BootstrapComponentsModel::new(&mut tx)
1027-
.component_path_to_ids(path.component.clone())
1028-
.await?;
1026+
let component = path.component;
10291027

10301028
let table_mapping = tx.table_mapping().namespace(component.into());
10311029

@@ -1185,9 +1183,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11851183
},
11861184
};
11871185

1188-
let (_, component) = BootstrapComponentsModel::new(&mut tx)
1189-
.component_path_to_ids(path_and_args.path().component.clone())
1190-
.await?;
1186+
let component = path_and_args.path().component;
11911187

11921188
// We should use table mappings from the same transaction as the output
11931189
// validator was retrieved.
@@ -1198,7 +1194,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11981194
// which requires the module to exist.
11991195
let path = path_and_args.path().clone();
12001196
let module = ModuleModel::new(&mut tx)
1201-
.get_metadata_for_function(path.clone())
1197+
.get_metadata_for_function_by_id(&path)
12021198
.await?
12031199
.context("Missing a valid module")?;
12041200
let (log_line_sender, log_line_receiver) = mpsc::unbounded();
@@ -1218,7 +1214,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12181214
log_line_receiver,
12191215
|log_line| {
12201216
self.function_log.log_action_progress(
1221-
path.clone(),
1217+
path.clone().for_logging(),
12221218
unix_timestamp,
12231219
context.clone(),
12241220
vec![log_line].into(),
@@ -1251,9 +1247,10 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12511247
ModuleEnvironment::Node => {
12521248
// We should not be missing the module given we validated the path above
12531249
// which requires the module to exist.
1254-
let module_path = BootstrapComponentsModel::new(&mut tx)
1255-
.function_path_to_module(path.clone())
1256-
.await?;
1250+
let module_path = CanonicalizedComponentModulePath {
1251+
component: path.component,
1252+
module_path: path.udf_path.module().clone(),
1253+
};
12571254
let module_version = self
12581255
.module_cache
12591256
.get_module(&mut tx, module_path.clone())
@@ -1336,7 +1333,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13361333
log_line_receiver,
13371334
|log_line| {
13381335
self.function_log.log_action_progress(
1339-
path.clone(),
1336+
path.clone().for_logging(),
13401337
unix_timestamp,
13411338
context.clone(),
13421339
vec![log_line].into(),
@@ -1362,7 +1359,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13621359

13631360
node_outcome_result.map(|node_outcome| {
13641361
let outcome = ActionOutcome {
1365-
path: path.clone(),
1362+
path: path.clone().for_logging(),
13661363
arguments: arguments.clone(),
13671364
identity: tx.inert_identity(),
13681365
unix_timestamp,
@@ -1393,7 +1390,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13931390
Err(e) if e.is_deterministic_user_error() => {
13941391
let outcome = ValidatedActionOutcome::from_error(
13951392
JsError::from_error(e),
1396-
path,
1393+
path.for_logging(),
13971394
arguments,
13981395
inert_identity,
13991396
self.runtime.clone(),

crates/application/src/cache/mod.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use common::{
4141
RequestId,
4242
};
4343
use database::{
44-
BootstrapComponentsModel,
4544
Database,
4645
Token,
4746
};
@@ -469,9 +468,7 @@ impl<RT: Runtime> CacheManager<RT> {
469468
};
470469
if let Ok(ref json_packed_value) = &query_outcome.result {
471470
let output: ConvexValue = json_packed_value.unpack();
472-
let (_, component) = BootstrapComponentsModel::new(&mut tx)
473-
.component_path_to_ids(path_and_args.path().component.clone())
474-
.await?;
471+
let component = path_and_args.path().component;
475472
let table_mapping = tx.table_mapping().namespace(component.into());
476473
let virtual_system_mapping = tx.virtual_system_mapping();
477474
let returns_validation_error = returns_validator.check_output(

crates/application/src/function_log.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -718,7 +718,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
718718
},
719719
TrackUsage::SystemError => AggregatedFunctionUsageStats::default(),
720720
};
721-
if outcome.path.is_system() {
721+
if outcome.path.udf_path.is_system() {
722722
return;
723723
}
724724
let execution = FunctionExecution {
@@ -806,7 +806,7 @@ impl<RT: Runtime> FunctionExecutionLog<RT> {
806806
},
807807
TrackUsage::SystemError => AggregatedFunctionUsageStats::default(),
808808
};
809-
if outcome.path.is_system() {
809+
if outcome.path.udf_path.is_system() {
810810
return;
811811
}
812812
let execution = FunctionExecution {

crates/common/src/components/function_paths.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use value::heap_size::HeapSize;
1313

1414
use super::{
1515
component_definition_path::ComponentDefinitionPath,
16+
ComponentId,
1617
ComponentPath,
1718
};
1819

@@ -21,6 +22,24 @@ pub struct ComponentDefinitionFunctionPath {
2122
pub udf_path: UdfPath,
2223
}
2324

25+
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
26+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
27+
pub struct ResolvedComponentFunctionPath {
28+
pub component: ComponentId,
29+
pub udf_path: CanonicalizedUdfPath,
30+
// For error messages and logging.
31+
pub component_path: Option<ComponentPath>,
32+
}
33+
34+
impl ResolvedComponentFunctionPath {
35+
pub fn for_logging(self) -> CanonicalizedComponentFunctionPath {
36+
CanonicalizedComponentFunctionPath {
37+
component: self.component_path.unwrap_or_else(ComponentPath::root),
38+
udf_path: self.udf_path,
39+
}
40+
}
41+
}
42+
2443
#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
2544
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
2645
pub struct ComponentFunctionPath {
@@ -184,20 +203,23 @@ impl HeapSize for ExportPath {
184203
pub enum PublicFunctionPath {
185204
RootExport(ExportPath),
186205
Component(CanonicalizedComponentFunctionPath),
206+
ResolvedComponent(ResolvedComponentFunctionPath),
187207
}
188208

189209
impl PublicFunctionPath {
190210
pub fn is_system(&self) -> bool {
191211
match self {
192212
PublicFunctionPath::RootExport(path) => path.is_system(),
193213
PublicFunctionPath::Component(path) => path.udf_path.is_system(),
214+
PublicFunctionPath::ResolvedComponent(path) => path.udf_path.is_system(),
194215
}
195216
}
196217

197218
pub fn udf_path(&self) -> &CanonicalizedUdfPath {
198219
match self {
199220
PublicFunctionPath::RootExport(path) => path.udf_path(),
200221
PublicFunctionPath::Component(path) => &path.udf_path,
222+
PublicFunctionPath::ResolvedComponent(path) => &path.udf_path,
201223
}
202224
}
203225

@@ -208,6 +230,7 @@ impl PublicFunctionPath {
208230
udf_path: path.into(),
209231
},
210232
PublicFunctionPath::Component(path) => path,
233+
PublicFunctionPath::ResolvedComponent(path) => path.for_logging(),
211234
}
212235
}
213236
}
@@ -217,6 +240,7 @@ impl HeapSize for PublicFunctionPath {
217240
match self {
218241
PublicFunctionPath::RootExport(path) => path.heap_size(),
219242
PublicFunctionPath::Component(path) => path.heap_size(),
243+
PublicFunctionPath::ResolvedComponent(path) => path.udf_path.heap_size(),
220244
}
221245
}
222246
}

crates/common/src/components/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ pub use self::{
2424
ComponentFunctionPath,
2525
ExportPath,
2626
PublicFunctionPath,
27+
ResolvedComponentFunctionPath,
2728
},
2829
module_paths::CanonicalizedComponentModulePath,
2930
reference::Reference,

crates/common/src/components/resource.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,35 @@ use value::ConvexValue;
88
use super::{
99
function_paths::SerializedComponentFunctionPath,
1010
CanonicalizedComponentFunctionPath,
11+
ResolvedComponentFunctionPath,
1112
};
1213

1314
/// `Resource`s are resolved `Reference`s to objects within the components
1415
/// data model. For now, we only have free standing `ConvexValue`s and
1516
/// functions within a component.
1617
#[derive(Debug, Clone, PartialEq, Eq, Ord, PartialOrd)]
17-
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
1818
pub enum Resource {
1919
Value(ConvexValue),
2020
Function(CanonicalizedComponentFunctionPath),
21+
/// A system UDF running in a component by ID (not path).
22+
ResolvedSystemUdf(ResolvedComponentFunctionPath),
23+
}
24+
25+
#[cfg(any(test, feature = "testing"))]
26+
impl proptest::prelude::Arbitrary for Resource {
27+
type Parameters = ();
28+
type Strategy = proptest::strategy::BoxedStrategy<Self>;
29+
30+
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
31+
use proptest::prelude::*;
32+
use value::ConvexValue;
33+
34+
prop_oneof![
35+
ConvexValue::arbitrary().prop_map(Resource::Value),
36+
CanonicalizedComponentFunctionPath::arbitrary().prop_map(Resource::Function),
37+
]
38+
.boxed()
39+
}
2140
}
2241

2342
#[derive(Debug, Serialize, Deserialize)]
@@ -42,6 +61,9 @@ impl TryFrom<Resource> for SerializedResource {
4261
Resource::Function(path) => Ok(Self::Function {
4362
path: path.try_into()?,
4463
}),
64+
Resource::ResolvedSystemUdf(path) => Ok(Self::Function {
65+
path: path.for_logging().try_into()?,
66+
}),
4567
}
4668
}
4769
}

crates/isolate/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1765,7 +1765,7 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
17651765
request.http_module_path.path().udf_path.clone();
17661766
let environment = ActionEnvironment::new(
17671767
self.rt.clone(),
1768-
request.http_module_path.path().component.clone(),
1768+
request.http_module_path.path().component,
17691769
environment_data,
17701770
request.identity,
17711771
request.transaction,
@@ -1810,7 +1810,7 @@ impl<RT: Runtime> IsolateWorker<RT> for BackendIsolateWorker<RT> {
18101810
} => {
18111811
drop(queue_timer);
18121812
let timer = metrics::service_request_timer(&UdfType::Action);
1813-
let component_path = request.params.path_and_args.path().component.clone();
1813+
let component_path = request.params.path_and_args.path().component;
18141814
let environment = ActionEnvironment::new(
18151815
self.rt.clone(),
18161816
component_path,

crates/isolate/src/environment/action/async_syscall.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,8 @@ use crate::{
5050
};
5151

5252
impl<RT: Runtime> TaskExecutor<RT> {
53-
pub fn component_id(&self) -> anyhow::Result<ComponentId> {
53+
pub fn component_id(&self) -> ComponentId {
5454
self.component_id
55-
.lock()
56-
.context("component_id was not initialized")
5755
}
5856

5957
#[minitrace::trace]
@@ -261,7 +259,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
261259
.action_callbacks
262260
.schedule_job(
263261
self.identity.clone(),
264-
self.component_id()?,
262+
self.component_id(),
265263
path,
266264
args.into_arg_vec(),
267265
scheduled_ts,
@@ -294,7 +292,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
294292
#[convex_macro::instrument_future]
295293
async fn async_syscall_vectorSearch(&self, args: JsonValue) -> anyhow::Result<JsonValue> {
296294
let VectorSearchRequest { query } = serde_json::from_value(args)?;
297-
let component_id = self.component_id()?;
295+
let component_id = self.component_id();
298296
let mut vector_search_query: VectorSearchJson = serde_json::from_value(query)?;
299297
vector_search_query.insert_component_id(component_id);
300298

@@ -320,7 +318,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
320318
_args: JsonValue,
321319
) -> anyhow::Result<JsonValue> {
322320
let issued_ts = self.rt.unix_timestamp();
323-
let component = self.component_id()?;
321+
let component = self.component_id();
324322
let postUrl =
325323
self.file_storage
326324
.generate_upload_url(&self.key_broker, issued_ts, component)?;
@@ -340,7 +338,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
340338
})?;
341339
let url = self
342340
.action_callbacks
343-
.storage_get_url(self.identity.clone(), self.component_id()?, storage_id)
341+
.storage_get_url(self.identity.clone(), self.component_id(), storage_id)
344342
.await?;
345343
Ok(url.into())
346344
}
@@ -358,7 +356,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
358356
})?;
359357

360358
self.action_callbacks
361-
.storage_delete(self.identity.clone(), self.component_id()?, storage_id)
359+
.storage_delete(self.identity.clone(), self.component_id(), storage_id)
362360
.await?;
363361

364362
Ok(JsonValue::Null)
@@ -386,7 +384,7 @@ impl<RT: Runtime> TaskExecutor<RT> {
386384
}
387385
let file_metadata = self
388386
.action_callbacks
389-
.storage_get_file_entry(self.identity.clone(), self.component_id()?, storage_id)
387+
.storage_get_file_entry(self.identity.clone(), self.component_id(), storage_id)
390388
.await?
391389
.map(
392390
|FileStorageEntry {

0 commit comments

Comments
 (0)