Skip to content

Commit 275380c

Browse files
Sujay JayakarConvex, Inc.
Sujay Jayakar
authored and
Convex, Inc.
committed
Resubmit component HTTP actions (#28175)
GitOrigin-RevId: f8ab13539dab58d2a58a4ca5f1e2e3f99fd5acdc
1 parent 5d8374b commit 275380c

File tree

24 files changed

+1031
-329
lines changed

24 files changed

+1031
-329
lines changed
Lines changed: 379 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,379 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::Context;
4+
use common::{
5+
components::{
6+
CanonicalizedComponentFunctionPath,
7+
ComponentPath,
8+
Reference,
9+
},
10+
errors::JsError,
11+
execution_context::ExecutionContext,
12+
http::RoutedHttpPath,
13+
log_lines::{
14+
run_function_and_collect_log_lines,
15+
LogLevel,
16+
LogLine,
17+
LogLines,
18+
SystemLogMetadata,
19+
},
20+
runtime::{
21+
Runtime,
22+
RuntimeInstant,
23+
},
24+
types::{
25+
FunctionCaller,
26+
ModuleEnvironment,
27+
RoutableMethod,
28+
},
29+
RequestId,
30+
};
31+
use database::{
32+
BootstrapComponentsModel,
33+
Transaction,
34+
};
35+
use errors::ErrorMetadataAnyhowExt;
36+
use futures::{
37+
channel::mpsc,
38+
select_biased,
39+
FutureExt,
40+
StreamExt,
41+
};
42+
use http::StatusCode;
43+
use isolate::{
44+
ActionCallbacks,
45+
HttpActionOutcome,
46+
HttpActionRequest,
47+
HttpActionRequestHead,
48+
HttpActionResponsePart,
49+
HttpActionResponseStreamer,
50+
HttpActionResult,
51+
ValidatedHttpPath,
52+
};
53+
use keybroker::Identity;
54+
use model::modules::{
55+
ModuleModel,
56+
HTTP_MODULE_PATH,
57+
};
58+
use sync_types::{
59+
CanonicalizedUdfPath,
60+
FunctionName,
61+
};
62+
use usage_tracking::FunctionUsageTracker;
63+
64+
use super::ApplicationFunctionRunner;
65+
use crate::function_log::HttpActionStatusCode;
66+
67+
impl<RT: Runtime> ApplicationFunctionRunner<RT> {
68+
#[minitrace::trace]
69+
pub async fn run_http_action(
70+
&self,
71+
request_id: RequestId,
72+
http_request: HttpActionRequest,
73+
mut response_streamer: HttpActionResponseStreamer,
74+
identity: Identity,
75+
caller: FunctionCaller,
76+
action_callbacks: Arc<dyn ActionCallbacks>,
77+
) -> anyhow::Result<isolate::HttpActionResult> {
78+
let start = self.runtime.monotonic_now();
79+
let usage_tracker = FunctionUsageTracker::new();
80+
81+
let mut tx = self
82+
.database
83+
.begin_with_usage(identity.clone(), usage_tracker.clone())
84+
.await?;
85+
86+
let (component_path, routed_path) =
87+
match self.route_http_action(&mut tx, &http_request.head).await? {
88+
Some(r) => r,
89+
None => {
90+
drop(tx);
91+
let response_parts = isolate::HttpActionResponsePart::from_text(
92+
StatusCode::NOT_FOUND,
93+
"This Convex deployment does not have HTTP actions enabled.".to_string(),
94+
);
95+
for part in response_parts {
96+
response_streamer.send_part(part)?;
97+
}
98+
return Ok(isolate::HttpActionResult::Streamed);
99+
},
100+
};
101+
let path = CanonicalizedComponentFunctionPath {
102+
component: component_path,
103+
udf_path: CanonicalizedUdfPath::new(
104+
HTTP_MODULE_PATH.clone(),
105+
FunctionName::default_export(),
106+
),
107+
};
108+
let validated_path = match ValidatedHttpPath::new(&mut tx, path).await? {
109+
Ok(validated_path) => validated_path,
110+
Err(e) => return Ok(isolate::HttpActionResult::Error(e)),
111+
};
112+
let unix_timestamp = self.runtime.unix_timestamp();
113+
let context = ExecutionContext::new(request_id, &caller);
114+
115+
let request_head = http_request.head.clone();
116+
let route = http_request.head.route_for_failure();
117+
let (log_line_sender, log_line_receiver) = mpsc::unbounded();
118+
// We want to intercept the response head so we can log it on function
119+
// completion, but still stream the response as it comes in, so we
120+
// create another channel here.
121+
let (isolate_response_sender, mut isolate_response_receiver) = mpsc::unbounded();
122+
let outcome_future = self
123+
.http_actions
124+
.execute_http_action(
125+
validated_path,
126+
routed_path,
127+
http_request,
128+
identity.clone(),
129+
action_callbacks,
130+
self.fetch_client.clone(),
131+
log_line_sender,
132+
HttpActionResponseStreamer::new(isolate_response_sender),
133+
tx,
134+
context.clone(),
135+
)
136+
.boxed();
137+
138+
let context_ = context.clone();
139+
let mut outcome_and_log_lines_fut = Box::pin(
140+
run_function_and_collect_log_lines(outcome_future, log_line_receiver, |log_line| {
141+
self.function_log.log_http_action_progress(
142+
route.clone(),
143+
unix_timestamp,
144+
context_.clone(),
145+
vec![log_line].into(),
146+
// http actions are always run in Isolate
147+
ModuleEnvironment::Isolate,
148+
)
149+
})
150+
.fuse(),
151+
);
152+
153+
let mut result_for_logging = None;
154+
let (outcome_result, mut log_lines): (anyhow::Result<HttpActionOutcome>, LogLines) = loop {
155+
select_biased! {
156+
result = isolate_response_receiver.select_next_some() => {
157+
match result {
158+
HttpActionResponsePart::Head(h) => {
159+
result_for_logging = Some(Ok(HttpActionStatusCode(h.status)));
160+
response_streamer.send_part(HttpActionResponsePart::Head(h))?;
161+
},
162+
HttpActionResponsePart::BodyChunk(bytes) => {
163+
response_streamer.send_part(HttpActionResponsePart::BodyChunk(bytes))?;
164+
}
165+
}
166+
},
167+
outcome_and_log_lines = outcome_and_log_lines_fut => {
168+
break outcome_and_log_lines
169+
}
170+
}
171+
};
172+
173+
while let Some(part) = isolate_response_receiver.next().await {
174+
match part {
175+
HttpActionResponsePart::Head(h) => {
176+
result_for_logging = Some(Ok(HttpActionStatusCode(h.status)));
177+
response_streamer.send_part(HttpActionResponsePart::Head(h))?;
178+
},
179+
HttpActionResponsePart::BodyChunk(bytes) => {
180+
response_streamer.send_part(HttpActionResponsePart::BodyChunk(bytes))?;
181+
},
182+
}
183+
}
184+
match outcome_result {
185+
Ok(outcome) => {
186+
let result = outcome.result.clone();
187+
let result_for_logging = match &result {
188+
HttpActionResult::Error(e) => Err(e.clone()),
189+
HttpActionResult::Streamed => result_for_logging.ok_or_else(|| {
190+
anyhow::anyhow!(
191+
"Result should be populated for successfully completed HTTP action"
192+
)
193+
})?,
194+
};
195+
self.function_log.log_http_action(
196+
outcome,
197+
result_for_logging,
198+
log_lines,
199+
start.elapsed(),
200+
caller,
201+
usage_tracker,
202+
context,
203+
);
204+
Ok(result)
205+
},
206+
Err(e) if e.is_deterministic_user_error() => {
207+
let js_err = JsError::from_error(e);
208+
match result_for_logging {
209+
Some(r) => {
210+
let outcome = HttpActionOutcome::new(
211+
None,
212+
request_head,
213+
identity.into(),
214+
unix_timestamp,
215+
HttpActionResult::Streamed,
216+
None,
217+
None,
218+
);
219+
log_lines.push(LogLine::new_system_log_line(
220+
LogLevel::Warn,
221+
vec![js_err.to_string()],
222+
outcome.unix_timestamp,
223+
SystemLogMetadata {
224+
code: "error:httpAction".to_string(),
225+
},
226+
));
227+
self.function_log.log_http_action(
228+
outcome.clone(),
229+
r,
230+
log_lines,
231+
start.elapsed(),
232+
caller,
233+
usage_tracker,
234+
context,
235+
);
236+
Ok(HttpActionResult::Streamed)
237+
},
238+
None => {
239+
let result = isolate::HttpActionResult::Error(js_err.clone());
240+
let outcome = HttpActionOutcome::new(
241+
None,
242+
request_head,
243+
identity.into(),
244+
unix_timestamp,
245+
result.clone(),
246+
None,
247+
None,
248+
);
249+
self.function_log.log_http_action(
250+
outcome.clone(),
251+
Err(js_err),
252+
log_lines,
253+
start.elapsed(),
254+
caller,
255+
usage_tracker,
256+
context,
257+
);
258+
Ok(result)
259+
},
260+
}
261+
},
262+
Err(e) => {
263+
self.function_log.log_http_action_system_error(
264+
&e,
265+
request_head,
266+
identity.into(),
267+
start,
268+
caller,
269+
log_lines,
270+
context,
271+
);
272+
Err(e)
273+
},
274+
}
275+
}
276+
277+
async fn route_http_action(
278+
&self,
279+
tx: &mut Transaction<RT>,
280+
head: &HttpActionRequestHead,
281+
) -> anyhow::Result<Option<(ComponentPath, RoutedHttpPath)>> {
282+
let mut model = BootstrapComponentsModel::new(tx);
283+
let mut current_component_path = ComponentPath::root();
284+
let mut routed_path = RoutedHttpPath(head.url.path().to_string());
285+
let method = RoutableMethod::try_from(head.method.clone())?;
286+
loop {
287+
let (definition_id, current_id) = model
288+
.component_path_to_ids(current_component_path.clone())
289+
.await?;
290+
let definition = model.load_definition_metadata(definition_id).await?;
291+
let http_routes = ModuleModel::new(model.tx)
292+
.get_http(current_id)
293+
.await?
294+
.map(|m| {
295+
m.into_value()
296+
.analyze_result
297+
.context("Missing analyze result for http module")?
298+
.http_routes
299+
.context("Missing http routes")
300+
})
301+
.transpose()?;
302+
303+
if http_routes.is_none() && definition.http_mounts.is_empty() {
304+
return Ok(None);
305+
}
306+
307+
// First, try matching an exact path from `http.js`, which will always
308+
// be the most specific match.
309+
if let Some(ref http_routes) = http_routes {
310+
if http_routes.route_exact(&routed_path[..], method) {
311+
return Ok(Some((current_component_path, routed_path)));
312+
}
313+
}
314+
315+
// Next, try finding the most specific prefix match from both `http.js`
316+
// and the component-level mounts.
317+
enum CurrentMatch<'a> {
318+
CurrentHttpJs,
319+
MountedComponent(&'a Reference),
320+
}
321+
let mut longest_match = None;
322+
323+
if let Some(ref http_routes) = http_routes {
324+
if let Some(match_suffix) = http_routes.route_prefix(&routed_path, method) {
325+
longest_match = Some((match_suffix, CurrentMatch::CurrentHttpJs));
326+
}
327+
}
328+
for (mount_path, reference) in &definition.http_mounts {
329+
let Some(match_suffix) = routed_path.strip_prefix(&mount_path[..]) else {
330+
continue;
331+
};
332+
let new_match = RoutedHttpPath(format!("/{match_suffix}"));
333+
if let Some((ref existing_suffix, _)) = longest_match {
334+
// If the existing longest match has a shorter suffix, then it
335+
// matches a longer prefix.
336+
if existing_suffix.len() < match_suffix.len() {
337+
continue;
338+
}
339+
}
340+
longest_match = Some((new_match, CurrentMatch::MountedComponent(reference)));
341+
}
342+
match longest_match {
343+
None => {
344+
// If we couldn't match the route, forward the request to the current
345+
// component's `http.js` if present. This lets the JS layer uniformly handle
346+
// 404s when defined.
347+
if http_routes.is_some() {
348+
return Ok(Some((
349+
current_component_path,
350+
RoutedHttpPath(routed_path.to_string()),
351+
)));
352+
} else {
353+
return Ok(None);
354+
}
355+
},
356+
Some((_, CurrentMatch::CurrentHttpJs)) => {
357+
return Ok(Some((
358+
current_component_path,
359+
RoutedHttpPath(routed_path.to_string()),
360+
)));
361+
},
362+
Some((match_suffix, CurrentMatch::MountedComponent(reference))) => {
363+
let Reference::ChildComponent {
364+
component: name,
365+
attributes,
366+
} = reference
367+
else {
368+
anyhow::bail!("Invalid reference in component definition: {reference:?}");
369+
};
370+
anyhow::ensure!(attributes.is_empty());
371+
372+
current_component_path = current_component_path.join(name.clone());
373+
routed_path = match_suffix;
374+
continue;
375+
},
376+
}
377+
}
378+
}
379+
}

0 commit comments

Comments
 (0)