Skip to content

Commit cb9c617

Browse files
NicolappsConvex, Inc.
authored and
Convex, Inc.
committed
Fivetran: use POST requests for list_snapshot and document_deltas (#37305)
This switches to using POST requests for `list_snapshot` and `document_deltas`. This improves the type safety by using the shared `Args` objects moved in #37281, and allows for longer request bodies. Since we no longer do requests with query parameters I also simplified the implementation of `.get` to remove the `parameters` parameter. I verified manually that `convex_api` still works with the Convex backend. GitOrigin-RevId: 8d8fd85df57569da6a33951bc99984070b4fcc05
1 parent 93677b4 commit cb9c617

File tree

2 files changed

+68
-29
lines changed

2 files changed

+68
-29
lines changed

crates/fivetran_source/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ derive_more = { workspace = true }
1919
futures = { workspace = true }
2020
futures-async-stream = { workspace = true }
2121
headers = { workspace = true }
22-
maplit = { workspace = true }
2322
prost = { workspace = true }
2423
prost-types = { workspace = true }
2524
reqwest = { workspace = true }
@@ -40,6 +39,7 @@ tonic-build = { workspace = true }
4039
cmd_util = { path = "../cmd_util" }
4140
convex = { path = "../convex", features = ["testing"] }
4241
convex_fivetran_common = { path = "../fivetran_common", features = ["testing"] }
42+
maplit = { workspace = true }
4343
proptest = { workspace = true }
4444
proptest-derive = { workspace = true }
4545
rand = { workspace = true }

crates/fivetran_source/src/convex_api.rs

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@ use std::{
77
use anyhow::Context;
88
use async_trait::async_trait;
99
use convex_fivetran_common::config::Config;
10+
use convex_fivetran_source::api_types::{
11+
DocumentDeltasArgs,
12+
ListSnapshotArgs,
13+
};
1014
use derive_more::{
1115
Display,
1216
From,
@@ -16,7 +20,6 @@ use headers::{
1620
HeaderName,
1721
HeaderValue,
1822
};
19-
use maplit::btreemap;
2023
use serde::{
2124
de::DeserializeOwned,
2225
Deserialize,
@@ -68,35 +71,70 @@ pub struct ConvexApi {
6871
}
6972

7073
impl ConvexApi {
71-
/// Performs a GET HTTP request to a given endpoint of the Convex API using
72-
/// the given query parameters.
73-
async fn get<T: DeserializeOwned>(
74+
/// Performs a GET HTTP request to a given endpoint of the Convex API.
75+
async fn get<T: DeserializeOwned>(&self, endpoint: &str) -> anyhow::Result<T> {
76+
let url = self
77+
.config
78+
.deploy_url
79+
.join("api/")
80+
.unwrap()
81+
.join(endpoint)
82+
.unwrap();
83+
84+
match reqwest::Client::new()
85+
.get(url)
86+
.header(CONVEX_CLIENT_HEADER, &*CONVEX_CLIENT_HEADER_VALUE)
87+
.header(
88+
reqwest::header::AUTHORIZATION,
89+
format!("Convex {}", self.config.deploy_key),
90+
)
91+
.send()
92+
.await
93+
{
94+
Ok(resp) if resp.status().is_success() => Ok(resp
95+
.json::<T>()
96+
.await
97+
.context("Failed to deserialize query result")?),
98+
Ok(resp) => {
99+
if let Ok(text) = resp.text().await {
100+
anyhow::bail!(
101+
"Call to {endpoint} on {} returned an unsuccessful response: {text}",
102+
self.config.deploy_url
103+
)
104+
} else {
105+
anyhow::bail!(
106+
"Call to {endpoint} on {} returned no response",
107+
self.config.deploy_url
108+
)
109+
}
110+
},
111+
Err(e) => anyhow::bail!(e.to_string()),
112+
}
113+
}
114+
115+
/// Performs a POST HTTP request to a given endpoint of the Convex API using
116+
/// the given parameters as a JSON body.
117+
async fn post<P: Serialize, T: DeserializeOwned>(
74118
&self,
75119
endpoint: &str,
76-
parameters: BTreeMap<&str, Option<String>>,
120+
parameters: P,
77121
) -> anyhow::Result<T> {
78-
let non_null_parameters: BTreeMap<&str, String> = parameters
79-
.into_iter()
80-
.filter_map(|(key, value)| value.map(|value| (key, value)))
81-
.collect();
82-
83-
let mut url = self
122+
let url = self
84123
.config
85124
.deploy_url
86125
.join("api/")
87126
.unwrap()
88127
.join(endpoint)
89128
.unwrap();
90129

91-
url.query_pairs_mut().extend_pairs(non_null_parameters);
92-
93130
match reqwest::Client::new()
94-
.get(url)
131+
.post(url)
95132
.header(CONVEX_CLIENT_HEADER, &*CONVEX_CLIENT_HEADER_VALUE)
96133
.header(
97134
reqwest::header::AUTHORIZATION,
98135
format!("Convex {}", self.config.deploy_key),
99136
)
137+
.json(&parameters)
100138
.send()
101139
.await
102140
{
@@ -125,8 +163,7 @@ impl ConvexApi {
125163
#[async_trait]
126164
impl Source for ConvexApi {
127165
async fn test_streaming_export_connection(&self) -> anyhow::Result<()> {
128-
self.get("test_streaming_export_connection", btreemap! {})
129-
.await
166+
self.get("test_streaming_export_connection").await
130167
}
131168

132169
async fn list_snapshot(
@@ -135,13 +172,14 @@ impl Source for ConvexApi {
135172
cursor: Option<ListSnapshotCursor>,
136173
table_name: Option<String>,
137174
) -> anyhow::Result<ListSnapshotResponse> {
138-
self.get(
175+
self.post(
139176
"list_snapshot",
140-
btreemap! {
141-
"snapshot" => snapshot.map(|n| n.to_string()),
142-
"cursor" => cursor.map(|n| n.to_string()),
143-
"tableName" => table_name,
144-
"format" => Some("convex_encoded_json".to_string()),
177+
ListSnapshotArgs {
178+
snapshot,
179+
cursor: cursor.map(|c| c.into()),
180+
table_name,
181+
component: None,
182+
format: Some("convex_encoded_json".to_string()),
145183
},
146184
)
147185
.await
@@ -152,20 +190,21 @@ impl Source for ConvexApi {
152190
cursor: DocumentDeltasCursor,
153191
table_name: Option<String>,
154192
) -> anyhow::Result<DocumentDeltasResponse> {
155-
self.get(
193+
self.post(
156194
"document_deltas",
157-
btreemap! {
158-
"cursor" => Some(cursor.to_string()),
159-
"tableName" => table_name,
160-
"format" => Some("convex_encoded_json".to_string()),
195+
DocumentDeltasArgs {
196+
cursor: Some(cursor.into()),
197+
table_name,
198+
component: None,
199+
format: Some("convex_encoded_json".to_string()),
161200
},
162201
)
163202
.await
164203
}
165204

166205
async fn get_tables_and_columns(&self) -> anyhow::Result<BTreeMap<TableName, Vec<FieldName>>> {
167206
let tables_to_columns: BTreeMap<TableName, Vec<String>> =
168-
self.get("get_tables_and_columns", btreemap! {}).await?;
207+
self.get("get_tables_and_columns").await?;
169208

170209
tables_to_columns
171210
.into_iter()

0 commit comments

Comments
 (0)