Skip to content

Commit bc8fbf0

Browse files
ldanilekConvex, Inc.
authored and
Convex, Inc.
committed
ModuleLoader fetch SourcePackage in main transaction (#27094)
Split behavior in ModuleLoader along better lines: 1. `get_module` looks up ModuleMetadata and SourcePackage and paths to prefetch (previously source package and paths to prefetch would happen later) 2. `get_module_with_metadata` just looks up the source code in storage. It does not need to read from the database at all, because everything from the database was already fetched. Advantages: 1. We don't have to construct new transactions inside the cache 2. Since we don't need new transactions, we can make TransactionIngredients no longer implement Clone, we don't need a Database or system identity, we don't need to check the timestamp, and we don't need to hackily set entries in the ReadSet. 3. by passing in the set of paths to prefetch, we no longer need the hack to convert TabletId -> ComponentId. Disadvantages: 1. in a few places application-level callers were calling `get_module_with_metadata` directly, and I changed these places to call `get_module`, which will return the same result but will do an extra fetch of SourcePackage and ModuleMetadata. Luckily these fetches should hit the system index cache, so I don't anticipate performance issues. GitOrigin-RevId: 42e18a1ca8e2cffb597e89b86e506e1d5bfbe0ea
1 parent 3a1b9a3 commit bc8fbf0

File tree

10 files changed

+141
-177
lines changed

10 files changed

+141
-177
lines changed

crates/application/src/lib.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -529,8 +529,7 @@ impl<RT: Runtime> Application<RT> {
529529
scheduled_jobs_pause_client: PauseClient,
530530
app_auth: Arc<ApplicationAuth>,
531531
) -> anyhow::Result<Self> {
532-
let module_cache =
533-
ModuleCache::new(runtime.clone(), database.clone(), modules_storage.clone()).await;
532+
let module_cache = ModuleCache::new(runtime.clone(), modules_storage.clone()).await;
534533
let module_loader = Arc::new(module_cache.clone());
535534

536535
let system_env_vars = btreemap! {
@@ -818,10 +817,9 @@ impl<RT: Runtime> Application<RT> {
818817
let Some(source_index) = source_mapped.source_index else {
819818
return Ok(None);
820819
};
821-
let full_source = self
822-
.module_cache
823-
.get_module_with_metadata(&mut tx, metadata)
824-
.await?;
820+
let Some(full_source) = self.module_cache.get_module(&mut tx, path).await? else {
821+
return Ok(None);
822+
};
825823
let Some(source_map_str) = &full_source.source_map else {
826824
return Ok(None);
827825
};
@@ -1646,13 +1644,14 @@ impl<RT: Runtime> Application<RT> {
16461644
component: ComponentId::Root,
16471645
module_path: AUTH_CONFIG_FILE_NAME.parse()?,
16481646
};
1649-
let auth_config_metadata = ModuleModel::new(tx).get_metadata(path).await?;
1647+
let auth_config_metadata = ModuleModel::new(tx).get_metadata(path.clone()).await?;
16501648
if let Some(auth_config_metadata) = auth_config_metadata {
16511649
let environment = auth_config_metadata.environment;
16521650
let auth_config_source = runner
16531651
.module_cache
1654-
.get_module_with_metadata(tx, auth_config_metadata)
1655-
.await?;
1652+
.get_module(tx, path)
1653+
.await?
1654+
.context("Module has metadata but no source")?;
16561655
let auth_config_module = ModuleConfig {
16571656
path: AUTH_CONFIG_FILE_NAME.parse()?,
16581657
source: auth_config_source.source.clone(),

crates/application/src/module_cache/mod.rs

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::sync::Arc;
1+
use std::{
2+
collections::BTreeMap,
3+
sync::Arc,
4+
};
25

36
use async_lru::async_lru::AsyncLru;
47
use async_trait::async_trait;
@@ -10,38 +13,34 @@ use common::{
1013
},
1114
runtime::Runtime,
1215
};
13-
use database::{
14-
Database,
15-
Transaction,
16-
};
1716
use futures::FutureExt;
1817
use isolate::environment::helpers::module_loader::get_module_and_prefetch;
19-
use keybroker::Identity;
2018
use model::{
2119
config::module_loader::ModuleLoader,
2220
modules::{
2321
module_versions::FullModuleSource,
2422
types::ModuleMetadata,
25-
ModuleModel,
2623
},
27-
source_packages::types::SourcePackageId,
24+
source_packages::types::{
25+
SourcePackage,
26+
SourcePackageId,
27+
},
2828
};
2929
use storage::Storage;
30+
use sync_types::CanonicalizedModulePath;
3031
use value::ResolvedDocumentId;
3132

3233
mod metrics;
3334

3435
#[derive(Clone)]
3536
pub struct ModuleCache<RT: Runtime> {
36-
database: Database<RT>,
37-
3837
modules_storage: Arc<dyn Storage>,
3938

4039
cache: AsyncLru<RT, (ResolvedDocumentId, SourcePackageId), FullModuleSource>,
4140
}
4241

4342
impl<RT: Runtime> ModuleCache<RT> {
44-
pub async fn new(rt: RT, database: Database<RT>, modules_storage: Arc<dyn Storage>) -> Self {
43+
pub async fn new(rt: RT, modules_storage: Arc<dyn Storage>) -> Self {
4544
let cache = AsyncLru::new(
4645
rt.clone(),
4746
*MODULE_CACHE_MAX_SIZE_BYTES,
@@ -50,7 +49,6 @@ impl<RT: Runtime> ModuleCache<RT> {
5049
);
5150

5251
Self {
53-
database,
5452
modules_storage,
5553
cache,
5654
}
@@ -62,29 +60,27 @@ impl<RT: Runtime> ModuleLoader<RT> for ModuleCache<RT> {
6260
#[minitrace::trace]
6361
async fn get_module_with_metadata(
6462
&self,
65-
tx: &mut Transaction<RT>,
6663
module_metadata: ParsedDocument<ModuleMetadata>,
64+
source_package: ParsedDocument<SourcePackage>,
65+
paths_to_prefetch: BTreeMap<ResolvedDocumentId, CanonicalizedModulePath>,
6766
) -> anyhow::Result<Arc<FullModuleSource>> {
6867
let timer = metrics::module_cache_get_module_timer();
6968

7069
let key = (module_metadata.id(), module_metadata.source_package_id);
71-
let mut cache_tx = self.database.begin(Identity::system()).await?;
7270
let modules_storage = self.modules_storage.clone();
7371
let result = self
7472
.cache
7573
.get_and_prepopulate(
7674
key,
77-
async move {
78-
get_module_and_prefetch(&mut cache_tx, modules_storage, module_metadata).await
79-
}
75+
get_module_and_prefetch(
76+
modules_storage,
77+
module_metadata,
78+
source_package,
79+
paths_to_prefetch,
80+
)
8081
.boxed(),
8182
)
8283
.await?;
83-
// Record read dependency on the module version so the transactions
84-
// read same is the same regardless if we hit the cache or not.
85-
// This is not technically needed since the module version is immutable,
86-
// but better safe and consistent that sorry.
87-
ModuleModel::new(tx).record_module_version_read_dependency(key.0)?;
8884

8985
timer.finish();
9086
Ok(result)

crates/database/src/transaction.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -971,18 +971,6 @@ impl<RT: Runtime> Transaction<RT> {
971971
.await
972972
}
973973

974-
/// Used when a system table is served from cache - to manually add a read
975-
/// dependency on that system table.
976-
pub fn record_system_table_cache_hit(
977-
&mut self,
978-
index_name: TabletIndexName,
979-
fields: IndexedFields,
980-
interval: Interval,
981-
) {
982-
self.reads
983-
.record_indexed_derived(index_name, fields, interval)
984-
}
985-
986974
#[cfg(any(test, feature = "testing"))]
987975
pub fn set_index_size_hard_limit(&mut self, size: usize) {
988976
self.index_size_override = Some(size);

crates/function_runner/src/in_memory_indexes.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,8 @@ use crate::{
8787
FunctionWrites,
8888
};
8989

90-
/// Convenience struct for data required to create a [Transaction]. This struct
91-
/// is cloneable, while [Transaction] is not.
92-
#[derive(Clone)]
90+
/// Struct from which you can create a [Transaction].
91+
/// TODO: delete this in favor of using Transaction directly.
9392
pub struct TransactionIngredients<RT: Runtime> {
9493
pub ts: RepeatableTimestamp,
9594
pub identity: Identity,

crates/function_runner/src/module_cache.rs

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::sync::Arc;
1+
use std::{
2+
collections::BTreeMap,
3+
sync::Arc,
4+
};
25

36
use async_lru::async_lru::AsyncLru;
47
use async_trait::async_trait;
@@ -10,23 +13,23 @@ use common::{
1013
},
1114
runtime::Runtime,
1215
};
13-
use database::Transaction;
1416
use futures::FutureExt;
1517
use isolate::environment::helpers::module_loader::get_module_and_prefetch;
1618
use model::{
1719
config::module_loader::ModuleLoader,
1820
modules::{
1921
module_versions::FullModuleSource,
2022
types::ModuleMetadata,
21-
ModuleModel,
2223
},
23-
source_packages::types::SourcePackageId,
24+
source_packages::types::{
25+
SourcePackage,
26+
SourcePackageId,
27+
},
2428
};
2529
use storage::Storage;
30+
use sync_types::CanonicalizedModulePath;
2631
use value::ResolvedDocumentId;
2732

28-
use crate::in_memory_indexes::TransactionIngredients;
29-
3033
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
3134
pub(crate) struct ModuleCacheKey {
3235
instance_name: String,
@@ -51,7 +54,6 @@ impl<RT: Runtime> ModuleCache<RT> {
5154
pub(crate) struct FunctionRunnerModuleLoader<RT: Runtime> {
5255
pub cache: ModuleCache<RT>,
5356
pub instance_name: String,
54-
pub transaction_ingredients: TransactionIngredients<RT>,
5557
pub modules_storage: Arc<dyn Storage>,
5658
}
5759

@@ -60,30 +62,30 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
6062
#[minitrace::trace]
6163
async fn get_module_with_metadata(
6264
&self,
63-
tx: &mut Transaction<RT>,
6465
module_metadata: ParsedDocument<ModuleMetadata>,
66+
source_package: ParsedDocument<SourcePackage>,
67+
paths_to_prefetch: BTreeMap<ResolvedDocumentId, CanonicalizedModulePath>,
6568
) -> anyhow::Result<Arc<FullModuleSource>> {
66-
// The transaction we're getting modules for should be from the same ts as when
67-
// this module loader was created.
68-
assert_eq!(tx.begin_timestamp(), self.transaction_ingredients.ts);
69-
7069
let instance_name = self.instance_name.clone();
7170
let key = ModuleCacheKey {
7271
instance_name: self.instance_name.clone(),
7372
module_id: module_metadata.id(),
7473
source_package_id: module_metadata.source_package_id,
7574
};
76-
let mut transaction = self.transaction_ingredients.clone().try_into()?;
7775
let modules_storage = self.modules_storage.clone();
7876
let result = self
7977
.cache
8078
.0
8179
.get_and_prepopulate(
8280
key.clone(),
8381
async move {
84-
let modules =
85-
get_module_and_prefetch(&mut transaction, modules_storage, module_metadata)
86-
.await;
82+
let modules = get_module_and_prefetch(
83+
modules_storage,
84+
module_metadata,
85+
source_package,
86+
paths_to_prefetch,
87+
)
88+
.await;
8789
modules
8890
.into_iter()
8991
.map(move |((module_id, source_package_id), source)| {
@@ -101,11 +103,6 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
101103
.boxed(),
102104
)
103105
.await?;
104-
// Record read dependency on the module version so the transactions
105-
// read same is the same regardless if we hit the cache or not.
106-
// This is not technically needed since the module version is immutable,
107-
// but better safe and consistent that sorry.
108-
ModuleModel::new(tx).record_module_version_read_dependency(key.module_id)?;
109106

110107
Ok(result)
111108
}

crates/function_runner/src/server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
325325
retention_validator,
326326
)
327327
.await?;
328-
let transaction = transaction_ingredients.clone().try_into()?;
328+
let transaction = transaction_ingredients.try_into()?;
329329
Ok(transaction)
330330
}
331331

@@ -387,7 +387,7 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
387387
retention_validator,
388388
)
389389
.await?;
390-
let mut transaction = transaction_ingredients.clone().try_into()?;
390+
let mut transaction = transaction_ingredients.try_into()?;
391391
let storage = self
392392
.storage
393393
.storage_for_instance(&mut transaction, StorageUseCase::Files)
@@ -406,7 +406,6 @@ impl<RT: Runtime, S: StorageForInstance<RT>> FunctionRunnerCore<RT, S> {
406406
module_loader: Arc::new(FunctionRunnerModuleLoader {
407407
instance_name: instance_name.clone(),
408408
cache: self.module_cache.clone(),
409-
transaction_ingredients,
410409
modules_storage,
411410
}),
412411
};

crates/isolate/src/environment/action/phase.rs

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ use std::{
44
sync::Arc,
55
};
66

7+
use anyhow::Context;
78
use common::{
89
components::{
9-
ComponentId,
1010
ComponentPath,
1111
Reference,
1212
Resource,
@@ -38,6 +38,7 @@ use model::{
3838
types::ModuleMetadata,
3939
ModuleModel,
4040
},
41+
source_packages::SourcePackageModel,
4142
udf_config::UdfConfigModel,
4243
};
4344
use parking_lot::Mutex;
@@ -154,40 +155,45 @@ impl<RT: Runtime> ActionPhase<RT> {
154155
.map(|c| ChaCha12Rng::from_seed(c.import_phase_rng_seed));
155156
let import_time_unix_timestamp = udf_config.as_ref().map(|c| c.import_phase_unix_timestamp);
156157

157-
let module_metadata = with_release_permit(timeout, permit_slot, async {
158-
let result = if !*COMPONENTS_ENABLED {
158+
let (module_metadata, source_package) = with_release_permit(timeout, permit_slot, async {
159+
let module_metadata = ModuleModel::new(&mut tx)
160+
.get_all_metadata(component_id)
161+
.await?;
162+
let source_package = SourcePackageModel::new(&mut tx, component_id.into())
163+
.get_latest()
164+
.await?;
165+
if !*COMPONENTS_ENABLED {
159166
anyhow::ensure!(self.component.is_root());
160-
ModuleModel::new(&mut tx)
161-
.get_all_metadata(ComponentId::Root)
162-
.await?
163167
} else {
164-
let module_metadata = ModuleModel::new(&mut tx)
165-
.get_all_metadata(component_id)
166-
.await?;
167-
168168
let loaded_resources = ComponentsModel::new(&mut tx)
169169
.preload_resources(component_id)
170170
.await?;
171171
{
172172
let mut resources = resources.lock();
173173
*resources = loaded_resources;
174174
}
175-
176-
module_metadata
177-
};
178-
Ok(result)
175+
}
176+
Ok((module_metadata, source_package))
179177
})
180178
.await?;
181179

182180
let modules = with_release_permit(timeout, permit_slot, async {
183181
let mut modules = BTreeMap::new();
182+
let paths_to_prefetch: BTreeMap<_, _> = module_metadata
183+
.iter()
184+
.map(|metadata| (metadata.id(), metadata.path.clone()))
185+
.collect();
184186
for metadata in module_metadata {
185187
if metadata.path.is_system() {
186188
continue;
187189
}
188190
let path = metadata.path.clone();
189191
let module = module_loader
190-
.get_module_with_metadata(&mut tx, metadata.clone())
192+
.get_module_with_metadata(
193+
metadata.clone(),
194+
source_package.clone().context("source package not found")?,
195+
paths_to_prefetch.clone(),
196+
)
191197
.await?;
192198
modules.insert(path, (metadata.into_value(), module));
193199
}

0 commit comments

Comments
 (0)