Skip to content

Commit 469c948

Browse files
Sujay JayakarConvex, Inc.
Sujay Jayakar
authored and
Convex, Inc.
committed
Wire up UdfOutcome to isolate2 (#24680)
I've added test helpers to run a single UDF test with both isolate v1 and isolate v2! Over time I'll port more tests as we implement features. GitOrigin-RevId: 938d8101c1a3027551835fa25bcc71dcc6afcec8
1 parent 1613cb8 commit 469c948

File tree

7 files changed

+596
-439
lines changed

7 files changed

+596
-439
lines changed

crates/isolate/src/isolate2/client.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use value::{
2727
};
2828

2929
use super::{
30+
environment::EnvironmentOutcome,
3031
FunctionId,
3132
PromiseId,
3233
};
@@ -58,7 +59,10 @@ pub enum IsolateThreadRequest {
5859

5960
#[derive(Debug)]
6061
pub enum EvaluateResult {
61-
Ready(ConvexValue),
62+
Ready {
63+
result: ConvexValue,
64+
outcome: EnvironmentOutcome,
65+
},
6266
Pending {
6367
async_syscalls: Vec<PendingAsyncSyscall>,
6468
},

crates/isolate/src/isolate2/context.rs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use deno_core::{
88
},
99
ModuleSpecifier,
1010
};
11-
use futures::future::Either;
1211
use value::ConvexObject;
1312

1413
use super::{
@@ -111,16 +110,13 @@ impl Context {
111110
let function_id = self.next_function_id;
112111
self.next_function_id += 1;
113112

114-
let result = match self.enter(session, |mut ctx| {
113+
let (promise, result) = self.enter(session, |mut ctx| {
115114
ctx.start_evaluate_function(udf_type, module, name, args)
116-
})? {
117-
Either::Left(result) => (function_id, EvaluateResult::Ready(result)),
118-
Either::Right((promise, async_syscalls)) => {
119-
self.pending_functions.insert(function_id, promise);
120-
(function_id, EvaluateResult::Pending { async_syscalls })
121-
},
115+
})?;
116+
if let EvaluateResult::Pending { .. } = result {
117+
self.pending_functions.insert(function_id, promise);
122118
};
123-
Ok(result)
119+
Ok((function_id, result))
124120
}
125121

126122
pub fn poll_function(
@@ -129,18 +125,13 @@ impl Context {
129125
function_id: FunctionId,
130126
completions: Vec<AsyncSyscallCompletion>,
131127
) -> anyhow::Result<EvaluateResult> {
132-
let promise = self
133-
.pending_functions
134-
.remove(&function_id)
135-
.ok_or_else(|| anyhow!("Function {function_id} not found"))?;
136-
let result =
137-
match self.enter(session, |mut ctx| ctx.poll_function(completions, &promise))? {
138-
Either::Left(result) => EvaluateResult::Ready(result),
139-
Either::Right((promise, async_syscalls)) => {
140-
self.pending_functions.insert(function_id, promise);
141-
EvaluateResult::Pending { async_syscalls }
142-
},
143-
};
128+
let Some(promise) = self.pending_functions.remove(&function_id) else {
129+
anyhow::bail!("Function {function_id} not found");
130+
};
131+
let result = self.enter(session, |mut ctx| ctx.poll_function(completions, &promise))?;
132+
if let EvaluateResult::Pending { .. } = result {
133+
self.pending_functions.insert(function_id, promise);
134+
}
144135
Ok(result)
145136
}
146137

crates/isolate/src/isolate2/entered_context.rs

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use deno_core::{
1212
ModuleSpecifier,
1313
};
1414
use errors::ErrorMetadata;
15-
use futures::future::Either;
1615
use serde_json::Value as JsonValue;
1716
use value::{
1817
ConvexObject,
@@ -22,6 +21,7 @@ use value::{
2221
use super::{
2322
client::{
2423
AsyncSyscallCompletion,
24+
EvaluateResult,
2525
PendingAsyncSyscall,
2626
},
2727
context_state::ContextState,
@@ -302,10 +302,10 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> {
302302
url: &ModuleSpecifier,
303303
name: &str,
304304
args: ConvexObject,
305-
) -> anyhow::Result<Either<ConvexValue, (v8::Global<v8::Promise>, Vec<PendingAsyncSyscall>)>>
306-
{
305+
) -> anyhow::Result<(v8::Global<v8::Promise>, EvaluateResult)> {
307306
let module_global = {
308-
let context_state = self.context_state()?;
307+
let context_state = self.context_state_mut()?;
308+
context_state.environment.start_execution()?;
309309
context_state
310310
.module_map
311311
.modules
@@ -392,32 +392,15 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> {
392392
.ok_or_else(|| anyhow!("Failed to call invoke function"))?
393393
.try_into()?;
394394

395-
match promise.state() {
396-
v8::PromiseState::Pending => {
397-
let promise = v8::Global::new(self.scope, promise);
398-
let pending = mem::take(&mut self.context_state_mut()?.pending_async_syscalls);
399-
Ok(Either::Right((promise, pending)))
400-
},
401-
v8::PromiseState::Fulfilled => {
402-
let result: v8::Local<v8::String> = promise.result(self.scope).try_into()?;
403-
let result = helpers::to_rust_string(self.scope, &result)?;
404-
// TODO: `deserialize_udf_result`
405-
let result_json: JsonValue = serde_json::from_str(&result)?;
406-
let result = ConvexValue::try_from(result_json)?;
407-
Ok(Either::Left(result))
408-
},
409-
v8::PromiseState::Rejected => {
410-
todo!()
411-
},
412-
}
395+
let evaluate_result = self.check_promise_result(&promise)?;
396+
Ok((v8::Global::new(self.scope, promise), evaluate_result))
413397
}
414398

415399
pub fn poll_function(
416400
&mut self,
417401
completions: Vec<AsyncSyscallCompletion>,
418402
promise: &v8::Global<v8::Promise>,
419-
) -> anyhow::Result<Either<ConvexValue, (v8::Global<v8::Promise>, Vec<PendingAsyncSyscall>)>>
420-
{
403+
) -> anyhow::Result<EvaluateResult> {
421404
let completed = {
422405
let context_state = self.context_state_mut()?;
423406
let mut completed = vec![];
@@ -451,19 +434,27 @@ impl<'enter, 'scope: 'enter> EnteredContext<'enter, 'scope> {
451434
self.execute_user_code(|s| s.perform_microtask_checkpoint())?;
452435

453436
let promise = v8::Local::new(self.scope, promise);
437+
self.check_promise_result(&promise)
438+
}
439+
440+
fn check_promise_result(
441+
&mut self,
442+
promise: &v8::Local<v8::Promise>,
443+
) -> anyhow::Result<EvaluateResult> {
454444
match promise.state() {
455445
v8::PromiseState::Pending => {
456-
let promise = v8::Global::new(self.scope, promise);
457-
let pending = mem::take(&mut self.context_state_mut()?.pending_async_syscalls);
458-
Ok(Either::Right((promise, pending)))
446+
let async_syscalls =
447+
mem::take(&mut self.context_state_mut()?.pending_async_syscalls);
448+
Ok(EvaluateResult::Pending { async_syscalls })
459449
},
460450
v8::PromiseState::Fulfilled => {
461451
let result: v8::Local<v8::String> = promise.result(self.scope).try_into()?;
462452
let result = helpers::to_rust_string(self.scope, &result)?;
463453
// TODO: `deserialize_udf_result`
464454
let result_json: JsonValue = serde_json::from_str(&result)?;
465455
let result = ConvexValue::try_from(result_json)?;
466-
Ok(Either::Left(result))
456+
let outcome = self.context_state_mut()?.environment.finish_execution()?;
457+
Ok(EvaluateResult::Ready { result, outcome })
467458
},
468459
v8::PromiseState::Rejected => {
469460
todo!()

crates/isolate/src/isolate2/environment.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ use common::{
1212
use rand_chacha::ChaCha12Rng;
1313
use serde_json::Value as JsonValue;
1414

15+
#[derive(Debug)]
16+
pub struct EnvironmentOutcome {
17+
pub observed_rng: bool,
18+
pub observed_time: bool,
19+
}
20+
1521
pub trait Environment {
1622
fn syscall(&mut self, name: &str, args: JsonValue) -> anyhow::Result<JsonValue>;
1723

@@ -28,4 +34,10 @@ pub trait Environment {
2834

2935
fn get_environment_variable(&mut self, name: EnvVarName)
3036
-> anyhow::Result<Option<EnvVarValue>>;
37+
38+
// Signal that we've finished the import phase and are ready to start execution.
39+
fn start_execution(&mut self) -> anyhow::Result<()>;
40+
41+
// Signal that we've finished execution.
42+
fn finish_execution(&mut self) -> anyhow::Result<EnvironmentOutcome>;
3143
}

0 commit comments

Comments
 (0)