Skip to content

Commit e1105af

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
read metadata fields from _modules, not _module_versions (#25355)
stop reading the following fields from `_module_versions`: `version`, `analyzeResult`, `sourcePackageId`, `environment`. these fields exist on `_modules` and db-verifier confirms that they match, so we read them instead. now we only need to look up `_module_versions` when reading source code, source maps, or creation time. GitOrigin-RevId: e18ac9a83c8f90c88dccf43de0d66051ef10a67b
1 parent e5df863 commit e1105af

File tree

17 files changed

+95
-100
lines changed

17 files changed

+95
-100
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,13 @@ use model::{
126126
types::FileStorageEntry,
127127
FileStorageId,
128128
},
129-
modules::module_versions::{
130-
AnalyzedModule,
131-
ModuleSource,
132-
SourceMap,
129+
modules::{
130+
module_versions::{
131+
AnalyzedModule,
132+
ModuleSource,
133+
SourceMap,
134+
},
135+
ModuleModel,
133136
},
134137
scheduled_jobs::VirtualSchedulerModel,
135138
session_requests::{
@@ -1189,16 +1192,15 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11891192
let udf_server_version = path_and_args.npm_version().clone();
11901193
// We should not be missing the module given we validated the path above
11911194
// which requires the module to exist.
1192-
let module_version = self
1193-
.module_cache
1194-
.get_module(&mut tx, name.module().clone())
1195+
let module = ModuleModel::new(&mut tx)
1196+
.get_metadata(name.module().clone())
11951197
.await?
1196-
.context("Missing a valid module_version")?;
1198+
.context("Missing a valid module")?;
11971199
let (log_line_sender, log_line_receiver) = mpsc::unbounded();
11981200

11991201
let inert_identity = tx.inert_identity();
1200-
let timer = function_total_timer(module_version.environment, UdfType::Action);
1201-
let completion_result = match module_version.environment {
1202+
let timer = function_total_timer(module.environment, UdfType::Action);
1203+
let completion_result = match module.environment {
12021204
ModuleEnvironment::Isolate => {
12031205
// TODO: This is the only use case of clone. We should get rid of clone,
12041206
// when we deprecate that codepath.
@@ -1215,7 +1217,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12151217
unix_timestamp,
12161218
context.clone(),
12171219
vec![log_line].into(),
1218-
module_version.environment,
1220+
module.environment,
12191221
)
12201222
},
12211223
)
@@ -1236,6 +1238,13 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12361238
})
12371239
},
12381240
ModuleEnvironment::Node => {
1241+
// We should not be missing the module given we validated the path above
1242+
// which requires the module to exist.
1243+
let module_version = self
1244+
.module_cache
1245+
.get_module(&mut tx, name.module().clone())
1246+
.await?
1247+
.context("Missing a valid module_version")?;
12391248
let _request_guard = self
12401249
.node_action_limiter
12411250
.acquire_permit_with_timeout(&self.runtime)
@@ -1245,7 +1254,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12451254
source_maps.insert(name.module().clone(), source_map);
12461255
}
12471256

1248-
let source_package_id = module_version.source_package_id.ok_or_else(|| {
1257+
let source_package_id = module.source_package_id.ok_or_else(|| {
12491258
anyhow::anyhow!("Source package is required to execute actions")
12501259
})?;
12511260
let source_package = SourcePackageModel::new(&mut tx)
@@ -1320,7 +1329,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13201329
unix_timestamp,
13211330
context.clone(),
13221331
vec![log_line].into(),
1323-
module_version.environment,
1332+
module.environment,
13241333
)
13251334
},
13261335
)
@@ -1366,8 +1375,8 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13661375
Ok(ActionCompletion {
13671376
outcome,
13681377
execution_time: start.elapsed(),
1369-
environment: module_version.environment,
1370-
memory_in_mb: match module_version.environment {
1378+
environment: module.environment,
1379+
memory_in_mb: match module.environment {
13711380
ModuleEnvironment::Isolate => (*ISOLATE_MAX_USER_HEAP_SIZE / (1 << 20))
13721381
.try_into()
13731382
.unwrap(),
@@ -2017,7 +2026,6 @@ impl<RT: Runtime> ActionCallbacks for ApplicationFunctionRunner<RT> {
20172026
// Scheduling from actions is not transaction and happens at latest
20182027
// timestamp.
20192028
self.database.runtime().unix_timestamp(),
2020-
&self.module_cache,
20212029
&mut tx,
20222030
)
20232031
.await?;

crates/application/src/module_cache/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ impl<RT: Runtime> ModuleLoader<RT> for ModuleCache<RT> {
240240
// read same is the same regardless if we hit the cache or not.
241241
// This is not technically needed since the module version is immutable,
242242
// but better safe and consistent that sorry.
243-
ModuleModel::new(tx).record_module_version_read_dependency(key.0, key.1)?;
243+
ModuleModel::new(tx).record_module_version_read_dependency(key.0)?;
244244

245245
timer.finish();
246246
Ok(Some(result))

crates/function_runner/src/module_cache.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,7 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
9898
// read same is the same regardless if we hit the cache or not.
9999
// This is not technically needed since the module version is immutable,
100100
// but better safe and consistent that sorry.
101-
ModuleModel::new(tx)
102-
.record_module_version_read_dependency(key.module_id, key.module_version)?;
101+
ModuleModel::new(tx).record_module_version_read_dependency(key.module_id)?;
103102

104103
Ok(Some(result))
105104
}

crates/isolate/src/environment/action/phase.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use model::{
2323
},
2424
modules::{
2525
module_versions::ModuleVersionMetadata,
26+
types::ModuleMetadata,
2627
ModuleModel,
2728
},
2829
udf_config::UdfConfigModel,
@@ -71,7 +72,7 @@ enum ActionPreloaded<RT: Runtime> {
7172
},
7273
Preloading,
7374
Ready {
74-
modules: BTreeMap<CanonicalizedModulePath, Arc<ModuleVersionMetadata>>,
75+
modules: BTreeMap<CanonicalizedModulePath, (ModuleMetadata, Arc<ModuleVersionMetadata>)>,
7576
env_vars: BTreeMap<EnvVarName, EnvVarValue>,
7677
rng: Option<ChaCha12Rng>,
7778
import_time_unix_timestamp: Option<UnixTimestamp>,
@@ -136,10 +137,10 @@ impl<RT: Runtime> ActionPhase<RT> {
136137
}
137138
let path = metadata.path.clone();
138139
if let Some(module) = module_loader
139-
.get_module_with_metadata(&mut tx, metadata)
140+
.get_module_with_metadata(&mut tx, metadata.clone())
140141
.await?
141142
{
142-
modules.insert(path, module);
143+
modules.insert(path, (metadata.into_value(), module));
143144
}
144145
}
145146

@@ -171,20 +172,20 @@ impl<RT: Runtime> ActionPhase<RT> {
171172
let ActionPreloaded::Ready { ref modules, .. } = self.preloaded else {
172173
anyhow::bail!("Phase not initialized");
173174
};
174-
let module_version = modules
175+
let module = modules
175176
.get(&module_path.clone().canonicalize())
176-
.map(|m| (**m).clone());
177+
.map(|(module, source)| (module, (**source).clone()));
177178

178-
if let Some(module_version) = module_version.as_ref() {
179+
if let Some((module, _)) = module.as_ref() {
179180
anyhow::ensure!(
180-
module_version.environment == ModuleEnvironment::Isolate,
181+
module.environment == ModuleEnvironment::Isolate,
181182
"Trying to execute {:?} in isolate, but it is bundled for {:?}.",
182183
module_path,
183-
module_version.environment
184+
module.environment
184185
);
185186
};
186187

187-
Ok(module_version)
188+
Ok(module.map(|(_, source)| source))
188189
}
189190

190191
pub fn begin_execution(&mut self) -> anyhow::Result<()> {

crates/isolate/src/environment/helpers/module_loader.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,10 @@ pub trait ModuleLoader<RT: Runtime>: Sync + Send + 'static {
5858
tx: &mut Transaction<RT>,
5959
udf_path: &CanonicalizedUdfPath,
6060
) -> anyhow::Result<anyhow::Result<AnalyzedFunction>> {
61-
let Some(module) = self.get_module(tx, udf_path.module().clone()).await? else {
61+
let Some(module) = ModuleModel::new(tx)
62+
.get_metadata(udf_path.module().clone())
63+
.await?
64+
else {
6265
return Ok(Err(ErrorMetadata::bad_request(
6366
"ModuleNotFound",
6467
ModuleNotFoundError::new(udf_path.module().as_str()).to_string(),
@@ -69,7 +72,6 @@ pub trait ModuleLoader<RT: Runtime>: Sync + Send + 'static {
6972
// Dependency modules don't have AnalyzedModule.
7073
if !udf_path.module().is_deps() {
7174
let analyzed_module = module
72-
.as_ref()
7375
.analyze_result
7476
.as_ref()
7577
.ok_or_else(|| anyhow::anyhow!("Expected analyze result for {udf_path:?}"))?;

crates/isolate/src/environment/helpers/validation.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ use model::{
3434
DISABLED_ERROR_MESSAGE,
3535
PAUSED_ERROR_MESSAGE,
3636
},
37-
modules::module_versions::Visibility,
37+
modules::{
38+
module_versions::Visibility,
39+
ModuleModel,
40+
},
3841
udf_config::UdfConfigModel,
3942
};
4043
#[cfg(any(test, feature = "testing"))]
@@ -61,7 +64,6 @@ pub async fn validate_schedule_args<RT: Runtime>(
6164
udf_args: Vec<JsonValue>,
6265
scheduled_ts: UnixTimestamp,
6366
udf_ts: UnixTimestamp,
64-
module_loader: &Arc<dyn ModuleLoader<RT>>,
6567
tx: &mut Transaction<RT>,
6668
) -> anyhow::Result<(UdfPath, ConvexArray)> {
6769
// We validate the following mostly so the developer don't get the timestamp
@@ -88,8 +90,8 @@ pub async fn validate_schedule_args<RT: Runtime>(
8890
// We do it here instead of within transaction in order to leverage the module
8991
// cache.
9092
let canonicalized = udf_path.clone().canonicalize();
91-
let module_version = module_loader
92-
.get_module(tx, canonicalized.module().clone())
93+
let module = ModuleModel::new(tx)
94+
.get_metadata(canonicalized.module().clone())
9395
.await?
9496
.with_context(|| {
9597
let p = String::from(udf_path.module().clone());
@@ -104,7 +106,7 @@ pub async fn validate_schedule_args<RT: Runtime>(
104106
// of analyze, we should always validate in practice. We will tighten
105107
// the interface and make AnalyzedResult non-optional in the future.
106108
let function_name = canonicalized.function_name();
107-
if let Some(analyze_result) = &module_version.analyze_result {
109+
if let Some(analyze_result) = &module.analyze_result {
108110
let found = analyze_result
109111
.functions
110112
.iter()

crates/isolate/src/environment/udf/async_syscall.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,6 @@ impl<RT: Runtime> AsyncSyscallProvider<RT> for DatabaseUdfEnvironment<RT> {
344344
args,
345345
scheduled_ts,
346346
self.phase.unix_timestamp()?,
347-
&self.module_loader,
348347
self.phase.tx()?,
349348
)
350349
.await

crates/isolate/src/environment/udf/mod.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,6 @@ pub struct DatabaseUdfEnvironment<RT: Runtime> {
179179
udf_server_version: Option<semver::Version>,
180180

181181
phase: UdfPhase<RT>,
182-
module_loader: Arc<dyn ModuleLoader<RT>>,
183182
file_storage: TransactionalFileStorage<RT>,
184183

185184
query_manager: QueryManager<RT>,
@@ -353,7 +352,6 @@ impl<RT: Runtime> DatabaseUdfEnvironment<RT> {
353352
udf_server_version,
354353

355354
phase: UdfPhase::new(transaction, rt, module_loader.clone(), system_env_vars),
356-
module_loader,
357355
file_storage,
358356

359357
query_manager: QueryManager::new(),

crates/isolate/src/environment/udf/phase.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ use model::{
3131
EnvironmentVariablesModel,
3232
PreloadedEnvironmentVariables,
3333
},
34-
modules::module_versions::ModuleVersionMetadata,
34+
modules::{
35+
module_versions::ModuleVersionMetadata,
36+
ModuleModel,
37+
},
3538
udf_config::UdfConfigModel,
3639
};
3740
use rand::SeedableRng;
@@ -145,6 +148,12 @@ impl<RT: Runtime> UdfPhase<RT> {
145148
format!("Can't dynamically import {module_path:?} in a query or mutation")
146149
));
147150
}
151+
let module = with_release_permit(
152+
timeout,
153+
permit_slot,
154+
ModuleModel::new(&mut self.tx).get_metadata(module_path.clone().canonicalize()),
155+
)
156+
.await?;
148157
let module_version = with_release_permit(
149158
timeout,
150159
permit_slot,
@@ -153,12 +162,12 @@ impl<RT: Runtime> UdfPhase<RT> {
153162
)
154163
.await?;
155164

156-
if let Some(module_version) = module_version.as_ref() {
165+
if let Some(module) = module.as_ref() {
157166
anyhow::ensure!(
158-
module_version.environment == ModuleEnvironment::Isolate,
167+
module.environment == ModuleEnvironment::Isolate,
159168
"Trying to execute {:?} in isolate, but it is bundled for {:?}.",
160169
module_path,
161-
module_version.environment
170+
module.environment
162171
);
163172
};
164173

crates/isolate/src/isolate2/runner.rs

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,6 @@ async fn run_request<RT: Runtime>(
546546
shared,
547547
key_broker,
548548
execution_context,
549-
module_loader,
550549
);
551550
let r: anyhow::Result<_> = try {
552551
// Update our shared state with the updated table mappings before reentering
@@ -783,8 +782,6 @@ struct Isolate2SyscallProvider<'a, RT: Runtime> {
783782

784783
key_broker: KeyBroker,
785784
context: ExecutionContext,
786-
787-
module_loader: Arc<dyn ModuleLoader<RT>>,
788785
}
789786

790787
impl<'a, RT: Runtime> Isolate2SyscallProvider<'a, RT> {
@@ -797,7 +794,6 @@ impl<'a, RT: Runtime> Isolate2SyscallProvider<'a, RT> {
797794
shared: UdfShared<RT>,
798795
key_broker: KeyBroker,
799796
context: ExecutionContext,
800-
module_loader: Arc<dyn ModuleLoader<RT>>,
801797
) -> Self {
802798
Self {
803799
tx,
@@ -810,7 +806,6 @@ impl<'a, RT: Runtime> Isolate2SyscallProvider<'a, RT> {
810806
syscall_trace: SyscallTrace::new(),
811807
key_broker,
812808
context,
813-
module_loader,
814809
}
815810
}
816811
}
@@ -868,15 +863,7 @@ impl<'a, RT: Runtime> AsyncSyscallProvider<RT> for Isolate2SyscallProvider<'a, R
868863
args: Vec<JsonValue>,
869864
scheduled_ts: UnixTimestamp,
870865
) -> anyhow::Result<(UdfPath, ConvexArray)> {
871-
validate_schedule_args(
872-
udf_path,
873-
args,
874-
scheduled_ts,
875-
self.unix_timestamp,
876-
&self.module_loader,
877-
self.tx,
878-
)
879-
.await
866+
validate_schedule_args(udf_path, args, scheduled_ts, self.unix_timestamp, self.tx).await
880867
}
881868

882869
fn file_storage_generate_upload_url(&self) -> anyhow::Result<String> {

crates/isolate/src/test_helpers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1179,7 +1179,6 @@ impl<RT: Runtime, P: Persistence + Clone> ActionCallbacks for UdfTest<RT, P> {
11791179
// Scheduling from actions is not transaction and happens at latest
11801180
// timestamp.
11811181
self.database.runtime().unix_timestamp(),
1182-
&self.module_loader,
11831182
&mut tx,
11841183
)
11851184
.await?;

0 commit comments

Comments
 (0)