Skip to content

Commit 69d2f8f

Browse files
ldanilekConvex, Inc.
authored andcommitted
refactor module fetcher (#26197)
make names more explicit and allow more complex module fetcher code inline GitOrigin-RevId: d1db3471f7856482cecfad9953ec8420d6fa4387
1 parent f378333 commit 69d2f8f

File tree

4 files changed

+31
-61
lines changed

4 files changed

+31
-61
lines changed

crates/application/src/module_cache/mod.rs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use database::{
1515
Transaction,
1616
};
1717
use futures::FutureExt;
18+
use isolate::environment::helpers::module_loader::get_module;
1819
use keybroker::Identity;
1920
use model::{
2021
config::module_loader::ModuleLoader,
@@ -36,24 +37,6 @@ use value::{
3637

3738
mod metrics;
3839

39-
#[derive(Clone)]
40-
pub struct ModuleVersionFetcher<RT: Runtime> {
41-
database: Database<RT>,
42-
// TODO(lee) read module source from storage.
43-
#[allow(unused)]
44-
modules_storage: Arc<dyn Storage>,
45-
}
46-
47-
impl<RT: Runtime> ModuleVersionFetcher<RT> {
48-
async fn generate_value(
49-
self,
50-
key: (ResolvedDocumentId, ModuleVersion),
51-
) -> anyhow::Result<FullModuleSource> {
52-
let mut tx = self.database.begin(Identity::system()).await?;
53-
ModuleModel::new(&mut tx).get_source(key.0, key.1).await
54-
}
55-
}
56-
5740
#[derive(Clone)]
5841
pub struct ModuleCache<RT: Runtime> {
5942
database: Database<RT>,
@@ -97,19 +80,21 @@ impl<RT: Runtime> ModuleLoader<RT> for ModuleCache<RT> {
9780
.id(&MODULE_VERSIONS_TABLE)?;
9881
if tx.writes().has_written_to(&module_versions_table_id) {
9982
let source = ModuleModel::new(tx)
100-
.get_source(module_metadata.id(), module_metadata.latest_version)
83+
.get_source_from_db(module_metadata.id(), module_metadata.latest_version)
10184
.await?;
10285
return Ok(Arc::new(source));
10386
}
10487

10588
let key = (module_metadata.id(), module_metadata.latest_version);
106-
let fetcher = ModuleVersionFetcher {
107-
database: self.database.clone(),
108-
modules_storage: self.modules_storage.clone(),
109-
};
89+
let mut cache_tx = self.database.begin(Identity::system()).await?;
90+
let modules_storage = self.modules_storage.clone();
11091
let result = self
11192
.cache
112-
.get(key, fetcher.generate_value(key).boxed())
93+
.get(
94+
key,
95+
async move { get_module(&mut cache_tx, modules_storage, module_metadata).await }
96+
.boxed(),
97+
)
11398
.await?;
11499
// Record read dependency on the module version so the transactions
115100
// read same is the same regardless if we hit the cache or not.

crates/function_runner/src/module_cache.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,7 @@ use model::{
2626
},
2727
};
2828
use storage::Storage;
29-
use value::{
30-
ResolvedDocumentId,
31-
TableNamespace,
32-
};
29+
use value::ResolvedDocumentId;
3330

3431
use crate::in_memory_indexes::TransactionIngredients;
3532

@@ -72,15 +69,18 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
7269
// this module loader was created.
7370
assert_eq!(tx.begin_timestamp(), self.transaction_ingredients.ts);
7471

72+
let namespace = tx
73+
.table_mapping()
74+
.tablet_namespace(module_metadata.id().table().tablet_id)?;
7575
// If this transaction wrote to module_versions (true for REPLs), we cannot use
7676
// the cache, load the module directly.
7777
let module_versions_table_id = tx
7878
.table_mapping()
79-
.namespace(TableNamespace::Global)
79+
.namespace(namespace)
8080
.id(&MODULE_VERSIONS_TABLE)?;
8181
if tx.writes().has_written_to(&module_versions_table_id) {
8282
let source = ModuleModel::new(tx)
83-
.get_source(module_metadata.id(), module_metadata.latest_version)
83+
.get_source_from_db(module_metadata.id(), module_metadata.latest_version)
8484
.await?;
8585
return Ok(Arc::new(source));
8686
}
@@ -90,13 +90,15 @@ impl<RT: Runtime> ModuleLoader<RT> for FunctionRunnerModuleLoader<RT> {
9090
module_id: module_metadata.id(),
9191
module_version: module_metadata.latest_version,
9292
};
93-
let transaction = self.transaction_ingredients.clone().try_into()?;
93+
let mut transaction = self.transaction_ingredients.clone().try_into()?;
94+
let modules_storage = self.modules_storage.clone();
9495
let result = self
9596
.cache
9697
.0
9798
.get(
9899
key.clone(),
99-
get_module(transaction, self.modules_storage.clone(), module_metadata).boxed(),
100+
async move { get_module(&mut transaction, modules_storage, module_metadata).await }
101+
.boxed(),
100102
)
101103
.await?;
102104
// Record read dependency on the module version so the transactions

crates/isolate/src/environment/helpers/module_loader.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ use crate::{
2121
};
2222

2323
pub async fn get_module<RT: Runtime>(
24-
mut tx: Transaction<RT>,
24+
tx: &mut Transaction<RT>,
2525
// TODO(lee) fetch from module storage
2626
_modules_storage: Arc<dyn Storage>,
2727
module_metadata: ParsedDocument<ModuleMetadata>,
2828
) -> anyhow::Result<FullModuleSource> {
2929
let _timer = module_load_timer();
30-
let source = ModuleModel::new(&mut tx)
31-
.get_source(module_metadata.id(), module_metadata.latest_version)
30+
let source = ModuleModel::new(tx)
31+
.get_source_from_db(module_metadata.id(), module_metadata.latest_version)
3232
.await?;
3333
Ok(source)
3434
}

crates/model/src/modules/mod.rs

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ use common::{
99
CanonicalizedComponentFunctionPath,
1010
CanonicalizedComponentModulePath,
1111
ComponentDefinitionId,
12-
COMPONENTS_ENABLED,
1312
},
1413
document::{
1514
ParsedDocument,
@@ -184,10 +183,6 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
184183
&mut self,
185184
component: ComponentDefinitionId,
186185
) -> anyhow::Result<Vec<ParsedDocument<ModuleMetadata>>> {
187-
// TODO(CX-6379): Remove this branch once we've made modules component-aware.
188-
if !*COMPONENTS_ENABLED {
189-
anyhow::ensure!(component.is_root());
190-
}
191186
let index_query = Query::full_table_scan(MODULES_TABLE.clone(), Order::Asc);
192187
let mut query_stream = ResolvedQuery::new(self.tx, component.into(), index_query)?;
193188

@@ -256,7 +251,11 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
256251
order: Order::Asc,
257252
};
258253
let module_query = Query::index_range(index_range);
259-
let mut query_stream = ResolvedQuery::new(self.tx, TableNamespace::Global, module_query)?;
254+
let namespace = self
255+
.tx
256+
.table_mapping()
257+
.tablet_namespace(module_id.table().tablet_id)?;
258+
let mut query_stream = ResolvedQuery::new(self.tx, namespace, module_query)?;
260259
let module_version: ParsedDocument<ModuleVersionMetadata> = query_stream
261260
.expect_at_most_one(self.tx)
262261
.await?
@@ -269,7 +268,7 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
269268
Ok(module_version)
270269
}
271270

272-
pub async fn get_source(
271+
pub async fn get_source_from_db(
273272
&mut self,
274273
module_id: ResolvedDocumentId,
275274
version: ModuleVersion,
@@ -299,12 +298,7 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
299298
) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> {
300299
let timer = get_module_metadata_timer();
301300

302-
// TODO(CX-6379): Remove this branch once we've made modules component-aware.
303-
let is_system = if !*COMPONENTS_ENABLED {
304-
path.as_root_module_path()?.is_system()
305-
} else {
306-
path.module_path.is_system()
307-
};
301+
let is_system = path.module_path.is_system();
308302
if is_system && !(self.tx.identity().is_admin() || self.tx.identity().is_system()) {
309303
anyhow::bail!(unauthorized_error("get_module"))
310304
}
@@ -435,14 +429,8 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
435429
&mut self,
436430
path: CanonicalizedComponentModulePath,
437431
) -> anyhow::Result<Option<ParsedDocument<ModuleMetadata>>> {
438-
// TODO(CX-6379): Remove this branch once we've made modules component-aware.
439-
let module_path = if !*COMPONENTS_ENABLED {
440-
path.as_root_module_path()?
441-
} else {
442-
&path.module_path
443-
};
444432
let namespace = path.component.into();
445-
let module_path = ConvexValue::try_from(module_path.as_str())?;
433+
let module_path = ConvexValue::try_from(path.module_path.as_str())?;
446434
let index_range = IndexRange {
447435
index_name: MODULE_INDEX_BY_PATH.clone(),
448436
range: vec![IndexRangeExpression::Eq(
@@ -468,12 +456,7 @@ impl<'a, RT: Runtime> ModuleModel<'a, RT> {
468456
&mut self,
469457
path: &CanonicalizedComponentFunctionPath,
470458
) -> anyhow::Result<anyhow::Result<AnalyzedFunction>> {
471-
// TODO(CX-6379): Remove this branch once we've made modules component-aware.
472-
let udf_path = if !*COMPONENTS_ENABLED {
473-
path.as_root_udf_path()?
474-
} else {
475-
&path.udf_path
476-
};
459+
let udf_path = &path.udf_path;
477460
let Some(module) = self.get_metadata_for_function(path.clone()).await? else {
478461
let err = ModuleNotFoundError::new(udf_path.module().as_str());
479462
return Ok(Err(ErrorMetadata::bad_request(

0 commit comments

Comments
 (0)