Skip to content

Commit 05c9f64

Browse files
Preslav LeConvex, Inc.
Preslav Le
authored and
Convex, Inc.
committed
Add journal, query and timestamp to ApplicadtionApi::execute_public_query (#26115)
Those are needed by the sync worker. GitOrigin-RevId: d2888bd2ad04ff9a34b2e79c6bd81d033d4d1564
1 parent e419bc5 commit 05c9f64

File tree

3 files changed

+50
-26
lines changed

3 files changed

+50
-26
lines changed

crates/application/src/api.rs

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,25 +7,36 @@ use common::{
77
AllowedVisibility,
88
FunctionCaller,
99
},
10-
value::ConvexValue,
1110
RequestId,
1211
};
1312
use model::session_requests::types::SessionRequestIdentifier;
1413
use serde_json::Value as JsonValue;
15-
use sync_types::AuthenticationToken;
14+
use sync_types::{
15+
AuthenticationToken,
16+
SerializedQueryJournal,
17+
Timestamp,
18+
};
1619

1720
use crate::{
18-
redaction::{
19-
RedactedJsError,
20-
RedactedLogLines,
21-
},
2221
Application,
2322
RedactedActionError,
2423
RedactedActionReturn,
2524
RedactedMutationError,
2625
RedactedMutationReturn,
26+
RedactedQueryReturn,
2727
};
2828

29+
#[cfg_attr(
30+
any(test, feature = "testing"),
31+
derive(proptest_derive::Arbitrary, Debug, Clone, PartialEq)
32+
)]
33+
pub enum ExecuteQueryTimestamp {
34+
// Execute the query at the latest timestamp.
35+
Latest,
36+
// Execute the query at a given timestamp.
37+
At(Timestamp),
38+
}
39+
2940
// A trait that abstracts the backend API. It all state and validation logic
3041
// so http routes can be kept thin and stateless. The implementor is also
3142
// responsible for routing the request to the appropriate backend in the hosted
@@ -40,8 +51,9 @@ pub trait ApplicationApi: Send + Sync {
4051
path: ComponentFunctionPath,
4152
args: Vec<JsonValue>,
4253
caller: FunctionCaller,
43-
// TODO(presley): Replace this with RedactedQueryReturn.
44-
) -> anyhow::Result<(Result<ConvexValue, RedactedJsError>, RedactedLogLines)>;
54+
ts: ExecuteQueryTimestamp,
55+
journal: Option<SerializedQueryJournal>,
56+
) -> anyhow::Result<RedactedQueryReturn>;
4557

4658
async fn execute_public_mutation(
4759
&self,
@@ -77,7 +89,9 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
7789
path: ComponentFunctionPath,
7890
args: Vec<JsonValue>,
7991
caller: FunctionCaller,
80-
) -> anyhow::Result<(Result<ConvexValue, RedactedJsError>, RedactedLogLines)> {
92+
ts: ExecuteQueryTimestamp,
93+
journal: Option<SerializedQueryJournal>,
94+
) -> anyhow::Result<RedactedQueryReturn> {
8195
anyhow::ensure!(
8296
caller.allowed_visibility() == AllowedVisibility::PublicOnly,
8397
"This method should not be used by internal callers."
@@ -86,14 +100,12 @@ impl<RT: Runtime> ApplicationApi for Application<RT> {
86100
let validate_time = self.runtime().system_time();
87101
let identity = self.authenticate(auth_token, validate_time).await?;
88102

89-
let ts = *self.now_ts_for_reads();
90-
let journal = None;
91-
92-
let query_return = self
93-
.read_only_udf_at_ts(request_id, path, args, identity, ts, journal, caller)
94-
.await?;
95-
96-
Ok((query_return.result, query_return.log_lines))
103+
let ts = match ts {
104+
ExecuteQueryTimestamp::Latest => *self.now_ts_for_reads(),
105+
ExecuteQueryTimestamp::At(ts) => ts,
106+
};
107+
self.read_only_udf_at_ts(request_id, path, args, identity, ts, journal, caller)
108+
.await
97109
}
98110

99111
async fn execute_public_mutation(

crates/database/src/token.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub type SerializedToken = String;
1919
/// for a transaction. This can be externalized to a user and used to represent
2020
/// current transaction state.
2121
#[derive(Clone, Debug, Eq, PartialEq)]
22-
#[cfg_attr(test, derive(proptest_derive::Arbitrary))]
22+
#[cfg_attr(any(test, feature = "testing"), derive(proptest_derive::Arbitrary))]
2323
pub struct Token {
2424
read_set: ReadSet,
2525
ts: Timestamp,
@@ -68,7 +68,7 @@ impl Token {
6868
)
6969
}
7070

71-
pub(crate) fn new(read_set: ReadSet, ts: Timestamp) -> Self {
71+
pub fn new(read_set: ReadSet, ts: Timestamp) -> Self {
7272
Self { read_set, ts }
7373
}
7474

crates/local_backend/src/public_api.rs

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::sync::Arc;
22

33
use anyhow::anyhow;
44
use application::{
5-
api::ApplicationApi,
5+
api::{
6+
ApplicationApi,
7+
ExecuteQueryTimestamp,
8+
},
69
redaction::{
710
RedactedJsError,
811
RedactedLogLines,
@@ -219,7 +222,8 @@ pub async fn public_query_get(
219222
) -> Result<impl IntoResponse, HttpResponseError> {
220223
let udf_path = parse_udf_path(&req.path)?;
221224
let args = req.args.into_arg_vec();
222-
let (result, log_lines) = api
225+
let journal = None;
226+
let query_result = api
223227
.execute_public_query(
224228
host,
225229
request_id,
@@ -230,10 +234,13 @@ pub async fn public_query_get(
230234
},
231235
args,
232236
FunctionCaller::HttpApi(client_version.clone()),
237+
ExecuteQueryTimestamp::Latest,
238+
journal,
233239
)
234240
.await?;
235241
let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?;
236-
let response = match result {
242+
let log_lines = query_result.log_lines;
243+
let response = match query_result.result {
237244
Ok(value) => UdfResponse::Success {
238245
value: export_value(value, value_format, client_version)?,
239246
log_lines,
@@ -253,7 +260,8 @@ pub async fn public_query_post(
253260
Json(req): Json<UdfPostRequest>,
254261
) -> Result<impl IntoResponse, HttpResponseError> {
255262
let udf_path = parse_udf_path(&req.path)?;
256-
let (result, log_lines) = api
263+
let journal = None;
264+
let query_return = api
257265
.execute_public_query(
258266
host,
259267
request_id,
@@ -264,15 +272,19 @@ pub async fn public_query_post(
264272
},
265273
req.args.into_arg_vec(),
266274
FunctionCaller::HttpApi(client_version.clone()),
275+
ExecuteQueryTimestamp::Latest,
276+
journal,
267277
)
268278
.await?;
269279
let value_format = req.format.as_ref().map(|f| f.parse()).transpose()?;
270-
let response = match result {
280+
let response = match query_return.result {
271281
Ok(value) => UdfResponse::Success {
272282
value: export_value(value, value_format, client_version)?,
273-
log_lines,
283+
log_lines: query_return.log_lines,
284+
},
285+
Err(error) => {
286+
UdfResponse::error(error, query_return.log_lines, value_format, client_version)?
274287
},
275-
Err(error) => UdfResponse::error(error, log_lines, value_format, client_version)?,
276288
};
277289
Ok(Json(response))
278290
}

0 commit comments

Comments
 (0)