Skip to content

Commit f0898cc

Browse files
Preslav LeConvex, Inc.
authored andcommitted
Move executing http action in ApplicationFunctionRunner (#24837)
Move run_http_action from Application to ApplicationFunctionRunner. This makes things ApplicationFunctionRunner the only responsible to lot to udf execution log instead of responsibility being spread out. I have also cleaned up and moved the analyze and http isolate clients to ApplicationFunctionRunner from Applicaction. GitOrigin-RevId: 8159cb0684aea9706c3cd0aa9a289357f5e5e0bf
1 parent 526bddb commit f0898cc

File tree

2 files changed

+185
-193
lines changed
  • crates/application/src

2 files changed

+185
-193
lines changed

crates/application/src/application_function_runner/mod.rs

Lines changed: 161 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ use common::{
1616
backoff::Backoff,
1717
errors::JsError,
1818
execution_context::ExecutionContext,
19+
http::fetch::FetchClient,
1920
identity::InertIdentity,
2021
knobs::{
2122
APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT,
23+
APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS,
2224
APPLICATION_MAX_CONCURRENT_MUTATIONS,
2325
APPLICATION_MAX_CONCURRENT_NODE_ACTIONS,
2426
APPLICATION_MAX_CONCURRENT_QUERIES,
2527
APPLICATION_MAX_CONCURRENT_V8_ACTIONS,
28+
BACKEND_ISOLATE_ACTIVE_THREADS_PERCENT,
2629
ISOLATE_MAX_USER_HEAP_SIZE,
2730
UDF_EXECUTOR_OCC_INITIAL_BACKOFF,
2831
UDF_EXECUTOR_OCC_MAX_BACKOFF,
2932
UDF_EXECUTOR_OCC_MAX_RETRIES,
33+
UDF_ISOLATE_MAX_EXEC_THREADS,
3034
},
3135
log_lines::{
3236
run_function_and_collect_log_lines,
@@ -77,22 +81,30 @@ use futures::{
7781
try_join,
7882
FutureExt,
7983
};
84+
use http::StatusCode;
8085
use isolate::{
8186
parse_udf_args,
8287
validate_schedule_args,
8388
ActionCallbacks,
8489
ActionOutcome,
8590
AuthConfig,
91+
BackendIsolateWorker,
92+
ConcurrencyLimiter,
8693
FunctionOutcome,
8794
FunctionResult,
95+
HttpActionRequest,
8896
IsolateClient,
97+
IsolateConfig,
98+
IsolateHeapStats,
8999
JsonPackedValue,
90100
ModuleLoader,
91101
UdfOutcome,
102+
ValidatedHttpPath,
92103
ValidatedUdfPathAndArgs,
93104
};
94105
use keybroker::{
95106
Identity,
107+
InstanceSecret,
96108
KeyBroker,
97109
};
98110
use model::{
@@ -198,9 +210,6 @@ static BUILD_DEPS_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| Duration::from_
198210
/// route requests.
199211
#[derive(Clone)]
200212
pub struct FunctionRouter<RT: Runtime> {
201-
// Execute within local process.
202-
udf_isolate: IsolateClient<RT>,
203-
204213
function_runner: Arc<dyn FunctionRunner<RT>>,
205214
query_limiter: Arc<Limiter>,
206215
mutation_limiter: Arc<Limiter>,
@@ -213,14 +222,12 @@ pub struct FunctionRouter<RT: Runtime> {
213222

214223
impl<RT: Runtime> FunctionRouter<RT> {
215224
pub fn new(
216-
udf_isolate: IsolateClient<RT>,
217225
function_runner: Arc<dyn FunctionRunner<RT>>,
218226
rt: RT,
219227
database: Database<RT>,
220228
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
221229
) -> Self {
222230
Self {
223-
udf_isolate,
224231
function_runner,
225232
rt,
226233
database,
@@ -242,11 +249,6 @@ impl<RT: Runtime> FunctionRouter<RT> {
242249
)),
243250
}
244251
}
245-
246-
pub async fn shutdown(&self) -> anyhow::Result<()> {
247-
self.udf_isolate.shutdown().await?;
248-
Ok(())
249-
}
250252
}
251253

252254
impl<RT: Runtime> FunctionRouter<RT> {
@@ -503,6 +505,9 @@ pub struct ApplicationFunctionRunner<RT: Runtime> {
503505
key_broker: KeyBroker,
504506

505507
isolate_functions: FunctionRouter<RT>,
508+
// Used for analyze, schema, etc.
509+
analyze_isolate: IsolateClient<RT>,
510+
http_actions: IsolateClient<RT>,
506511
node_actions: Actions,
507512

508513
pub(crate) module_cache: Arc<dyn ModuleLoader<RT>>,
@@ -514,6 +519,7 @@ pub struct ApplicationFunctionRunner<RT: Runtime> {
514519
cache_manager: CacheManager<RT>,
515520
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
516521
node_action_limiter: Limiter,
522+
fetch_client: Arc<dyn FetchClient>,
517523
}
518524

519525
impl<RT: Runtime> HeapSize for ApplicationFunctionRunner<RT> {
@@ -523,21 +529,67 @@ impl<RT: Runtime> HeapSize for ApplicationFunctionRunner<RT> {
523529
}
524530

525531
impl<RT: Runtime> ApplicationFunctionRunner<RT> {
526-
pub async fn new(
532+
pub fn new(
533+
instance_name: String,
534+
instance_secret: InstanceSecret,
527535
runtime: RT,
528536
database: Database<RT>,
529537
key_broker: KeyBroker,
530-
udf_isolate: IsolateClient<RT>,
531538
function_runner: Arc<dyn FunctionRunner<RT>>,
532539
node_actions: Actions,
533540
file_storage: TransactionalFileStorage<RT>,
534541
modules_storage: Arc<dyn Storage>,
535542
module_cache: Arc<dyn ModuleLoader<RT>>,
536543
function_log: FunctionExecutionLog<RT>,
537544
system_env_vars: BTreeMap<EnvVarName, EnvVarValue>,
545+
fetch_client: Arc<dyn FetchClient>,
538546
) -> Self {
547+
// We limit the isolates to only consume fraction of the available
548+
// cores leaving the rest for tokio. This is still over-provisioning
549+
// in case there are multiple active backends per server.
550+
let isolate_concurrency_limit =
551+
*BACKEND_ISOLATE_ACTIVE_THREADS_PERCENT * num_cpus::get_physical() / 100;
552+
let limiter = ConcurrencyLimiter::new(isolate_concurrency_limit);
553+
tracing::info!(
554+
"Limiting isolate concurrency to {} ({}% out of {} physical cores)",
555+
isolate_concurrency_limit,
556+
*BACKEND_ISOLATE_ACTIVE_THREADS_PERCENT,
557+
num_cpus::get_physical(),
558+
);
559+
560+
let http_actions_worker = BackendIsolateWorker::new(
561+
runtime.clone(),
562+
IsolateConfig::new("actions", limiter.clone()),
563+
);
564+
let http_actions = IsolateClient::new(
565+
runtime.clone(),
566+
http_actions_worker,
567+
*APPLICATION_MAX_CONCURRENT_HTTP_ACTIONS,
568+
true,
569+
instance_name.clone(),
570+
instance_secret,
571+
file_storage.clone(),
572+
system_env_vars.clone(),
573+
module_cache.clone(),
574+
);
575+
576+
let analyze_isolate_worker = BackendIsolateWorker::new(
577+
runtime.clone(),
578+
IsolateConfig::new("database_executor", limiter),
579+
);
580+
let analyze_isolate = IsolateClient::new(
581+
runtime.clone(),
582+
analyze_isolate_worker,
583+
*UDF_ISOLATE_MAX_EXEC_THREADS,
584+
false,
585+
instance_name,
586+
instance_secret,
587+
file_storage.clone(),
588+
system_env_vars.clone(),
589+
module_cache.clone(),
590+
);
591+
539592
let isolate_functions = FunctionRouter::new(
540-
udf_isolate,
541593
function_runner,
542594
runtime.clone(),
543595
database.clone(),
@@ -556,6 +608,8 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
556608
database,
557609
key_broker,
558610
isolate_functions,
611+
analyze_isolate,
612+
http_actions,
559613
node_actions,
560614
module_cache,
561615
modules_storage,
@@ -568,15 +622,25 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
568622
UdfType::Action,
569623
*APPLICATION_MAX_CONCURRENT_NODE_ACTIONS,
570624
),
625+
fetch_client,
571626
}
572627
}
573628

574629
pub(crate) async fn shutdown(&self) -> anyhow::Result<()> {
575-
self.isolate_functions.shutdown().await?;
630+
self.analyze_isolate.shutdown().await?;
631+
self.http_actions.shutdown().await?;
576632
self.node_actions.shutdown();
577633
Ok(())
578634
}
579635

636+
pub fn database_heap_size(&self) -> IsolateHeapStats {
637+
self.analyze_isolate.aggregate_heap_stats()
638+
}
639+
640+
pub fn http_actions_heap_size(&self) -> IsolateHeapStats {
641+
self.http_actions.aggregate_heap_stats()
642+
}
643+
580644
// Only used for running queries from REPLs.
581645
pub async fn run_query_without_caching(
582646
&self,
@@ -1275,6 +1339,86 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12751339
}
12761340
}
12771341

1342+
pub async fn run_http_action(
1343+
&self,
1344+
request_id: RequestId,
1345+
name: UdfPath,
1346+
http_request: HttpActionRequest,
1347+
identity: Identity,
1348+
caller: FunctionCaller,
1349+
action_callbacks: Arc<dyn ActionCallbacks>,
1350+
) -> anyhow::Result<Result<isolate::HttpActionResponse, JsError>> {
1351+
let start = self.runtime.monotonic_now();
1352+
let usage_tracker = FunctionUsageTracker::new();
1353+
let mut tx = self
1354+
.database
1355+
.begin_with_usage(identity.clone(), usage_tracker.clone())
1356+
.await?;
1357+
1358+
// Before any developer-visible logging takes place, bail out if it's
1359+
// clear the application has no HTTP actions routed.
1360+
// This should spares developer not using HTTP from the deluge of
1361+
// logspam and other bot traffic.
1362+
if !self.module_cache.has_http(&mut tx).await? {
1363+
drop(tx);
1364+
return Ok(Ok(isolate::HttpActionResponse::from_text(
1365+
StatusCode::NOT_FOUND,
1366+
"This Convex deployment does not have HTTP actions enabled.".into(),
1367+
)));
1368+
}
1369+
let validated_path = match ValidatedHttpPath::new(
1370+
&mut tx,
1371+
name.canonicalize().clone(),
1372+
self.module_cache.as_ref(),
1373+
)
1374+
.await?
1375+
{
1376+
Ok(validated_path) => validated_path,
1377+
Err(e) => return Ok(Err(e)),
1378+
};
1379+
let unix_timestamp = self.runtime.unix_timestamp();
1380+
let context = ExecutionContext::new(request_id, &caller);
1381+
1382+
let route = http_request.head.route_for_failure()?;
1383+
let (log_line_sender, log_line_receiver) = mpsc::unbounded();
1384+
let outcome_future = self
1385+
.http_actions
1386+
.execute_http_action(
1387+
validated_path,
1388+
http_request,
1389+
identity,
1390+
action_callbacks,
1391+
self.fetch_client.clone(),
1392+
log_line_sender,
1393+
tx,
1394+
context.clone(),
1395+
)
1396+
.boxed();
1397+
1398+
let (outcome_result, log_lines) =
1399+
run_function_and_collect_log_lines(outcome_future, log_line_receiver, |log_line| {
1400+
self.function_log.log_http_action_progress(
1401+
route.clone(),
1402+
unix_timestamp,
1403+
context.clone(),
1404+
vec![log_line].into(),
1405+
// http actions are always run in Isolate
1406+
ModuleEnvironment::Isolate,
1407+
)
1408+
})
1409+
.await;
1410+
let outcome = outcome_result?;
1411+
self.function_log.log_http_action(
1412+
outcome.clone(),
1413+
log_lines,
1414+
start.elapsed(),
1415+
caller,
1416+
usage_tracker,
1417+
context,
1418+
);
1419+
Ok(outcome.result)
1420+
}
1421+
12781422
#[minitrace::trace]
12791423
pub async fn build_deps(
12801424
&self,
@@ -1320,8 +1464,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
13201464

13211465
let mut result = BTreeMap::new();
13221466
match self
1323-
.isolate_functions
1324-
.udf_isolate
1467+
.analyze_isolate
13251468
.analyze(udf_config, isolate_modules, environment_variables.clone())
13261469
.await?
13271470
{
@@ -1460,8 +1603,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
14601603
source_map: Option<SourceMap>,
14611604
rng_seed: [u8; 32],
14621605
) -> anyhow::Result<DatabaseSchema> {
1463-
self.isolate_functions
1464-
.udf_isolate
1606+
self.analyze_isolate
14651607
.evaluate_schema(schema_bundle, source_map, rng_seed)
14661608
.await
14671609
}
@@ -1473,8 +1615,7 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
14731615
mut environment_variables: BTreeMap<EnvVarName, EnvVarValue>,
14741616
) -> anyhow::Result<AuthConfig> {
14751617
environment_variables.extend(self.system_env_vars.clone());
1476-
self.isolate_functions
1477-
.udf_isolate
1618+
self.analyze_isolate
14781619
.evaluate_auth_config(auth_config_bundle, source_map, environment_variables)
14791620
.await
14801621
}

0 commit comments

Comments
 (0)