Skip to content

Commit 43fea72

Browse files
NicolappsConvex, Inc.
authored and
Convex, Inc.
committed
Add the application code for the Fivetran destination connector (#27105)
GitOrigin-RevId: c0776aa6896cf4674e8757850c319e33b72a4363
1 parent fb98411 commit 43fea72

File tree

7 files changed

+700
-27
lines changed

7 files changed

+700
-27
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/fivetran_destination/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ async-trait = { workspace = true }
2121
base64 = { workspace = true }
2222
cbc = { workspace = true }
2323
chrono = { workspace = true }
24+
clap = { workspace = true, features = ["derive"] }
2425
common = { path = "../common" }
2526
convex_fivetran_common = { path = "../fivetran_common" }
2627
csv-async = { workspace = true }
2728
derive_more = { workspace = true }
2829
futures = { workspace = true }
30+
futures-async-stream = { workspace = true }
2931
maplit = { workspace = true }
3032
prost = { workspace = true }
3133
prost-types = { workspace = true }
Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
use std::{
2+
collections::BTreeMap,
3+
str::FromStr,
4+
};
5+
6+
use chrono::{
7+
DateTime,
8+
Utc,
9+
};
10+
use common::{
11+
try_chunks::TryChunksExt,
12+
value::{
13+
ConvexObject,
14+
TableName,
15+
},
16+
};
17+
use convex_fivetran_common::fivetran_sdk::{
18+
self,
19+
Compression,
20+
CsvFileParams,
21+
Encryption,
22+
};
23+
use futures::{
24+
stream::{
25+
self,
26+
},
27+
StreamExt,
28+
};
29+
use futures_async_stream::try_stream;
30+
31+
use crate::{
32+
api_types::{
33+
BatchWriteOperation,
34+
BatchWriteRow,
35+
DeleteType,
36+
FivetranTableName,
37+
},
38+
convex_api::Destination,
39+
error::{
40+
DestinationError,
41+
SuggestedTable,
42+
},
43+
file_reader::{
44+
create_csv_deserializer,
45+
read_rows,
46+
FivetranFileEncryption,
47+
FivetranReaderParams,
48+
},
49+
schema::{
50+
suggested_convex_table,
51+
to_fivetran_table,
52+
validate_destination_schema_table,
53+
FivetranTableSchema,
54+
},
55+
};
56+
57+
const ROWS_BY_REQUEST: usize = 500;
58+
59+
pub enum DescribeTableResponse {
60+
NotFound,
61+
Table(fivetran_sdk::Table),
62+
}
63+
64+
pub async fn describe_table(
65+
destination: impl Destination,
66+
table_name: String,
67+
) -> Result<DescribeTableResponse, DestinationError> {
68+
let convex_table_name = TableName::from_str(&table_name)
69+
.map_err(|err| DestinationError::UnsupportedTableName(table_name, err))?;
70+
71+
let Some(schema) = destination
72+
.get_schema()
73+
.await
74+
.map_err(DestinationError::DeploymentError)?
75+
else {
76+
return Ok(DescribeTableResponse::NotFound);
77+
};
78+
79+
let Some(convex_table) = schema.tables.get(&convex_table_name) else {
80+
return Ok(DescribeTableResponse::NotFound);
81+
};
82+
83+
Ok(DescribeTableResponse::Table(to_fivetran_table(
84+
convex_table,
85+
)?))
86+
}
87+
88+
pub async fn create_table(
89+
destination: impl Destination,
90+
table: fivetran_sdk::Table,
91+
) -> Result<(), DestinationError> {
92+
let convex_table_name = TableName::from_str(&table.name)
93+
.map_err(|err| DestinationError::UnsupportedTableName(table.name.to_string(), err))?;
94+
95+
let schema = destination
96+
.get_schema()
97+
.await
98+
.map_err(DestinationError::DeploymentError)?
99+
.ok_or_else(|| match suggested_convex_table(table.clone()) {
100+
Ok(suggested_table) => {
101+
DestinationError::DestinationHasNoSchema(SuggestedTable(suggested_table))
102+
},
103+
Err(err) => DestinationError::DestinationHasNoSchemaWithoutSuggestion(Box::new(err)),
104+
})?;
105+
106+
let Some(convex_table) = schema.tables.get(&convex_table_name) else {
107+
return Err(match suggested_convex_table(table) {
108+
Ok(suggested_table) => {
109+
DestinationError::MissingTable(convex_table_name, SuggestedTable(suggested_table))
110+
},
111+
Err(err) => {
112+
DestinationError::MissingTableWithoutSuggestion(convex_table_name, Box::new(err))
113+
},
114+
});
115+
};
116+
117+
validate_destination_schema_table(table, convex_table)?;
118+
119+
Ok(())
120+
}
121+
122+
pub async fn alter_table(
123+
destination: impl Destination,
124+
table: fivetran_sdk::Table,
125+
) -> Result<(), DestinationError> {
126+
// AlterTable is implemented the same way as CreateTable, as it merely checks
127+
// that the table in the Convex destination complies to what we expect.
128+
create_table(destination, table).await
129+
}
130+
131+
pub async fn truncate(
132+
destination: impl Destination,
133+
table_name: String,
134+
delete_before: Option<DateTime<Utc>>,
135+
delete_type: DeleteType,
136+
) -> Result<(), DestinationError> {
137+
let convex_table_name = TableName::from_str(&table_name)
138+
.map_err(|err| DestinationError::UnsupportedTableName(table_name.to_string(), err))?;
139+
140+
destination
141+
.truncate_table(convex_table_name, delete_type, delete_before)
142+
.await
143+
.map_err(DestinationError::DeploymentError)?;
144+
145+
Ok(())
146+
}
147+
148+
pub async fn write_batch(
149+
destination: impl Destination,
150+
table: fivetran_sdk::Table,
151+
keys: BTreeMap<String, Vec<u8>>,
152+
replace_files: Vec<String>,
153+
update_files: Vec<String>,
154+
delete_files: Vec<String>,
155+
csv_file_params: CsvFileParams,
156+
) -> Result<(), DestinationError> {
157+
let reader_params = FivetranReaderParams::from(csv_file_params.clone());
158+
let table_name = FivetranTableName::from_str(&table.name)
159+
.map_err(|err| DestinationError::InvalidTableName(table.name.clone(), err))?;
160+
let schema = FivetranTableSchema::try_from(table)?;
161+
162+
let mut streams = vec![];
163+
for file in replace_files {
164+
streams.push(row_stream(
165+
file,
166+
BatchWriteOperation::Upsert,
167+
&keys,
168+
csv_file_params.encryption(),
169+
csv_file_params.compression(),
170+
&reader_params,
171+
&table_name,
172+
&schema,
173+
));
174+
}
175+
for file in update_files {
176+
streams.push(row_stream(
177+
file,
178+
BatchWriteOperation::Update,
179+
&keys,
180+
csv_file_params.encryption(),
181+
csv_file_params.compression(),
182+
&reader_params,
183+
&table_name,
184+
&schema,
185+
));
186+
}
187+
for file in delete_files {
188+
streams.push(row_stream(
189+
file,
190+
BatchWriteOperation::HardDelete,
191+
&keys,
192+
csv_file_params.encryption(),
193+
csv_file_params.compression(),
194+
&reader_params,
195+
&table_name,
196+
&schema,
197+
));
198+
}
199+
200+
let mut concatenated_stream =
201+
Box::pin(stream::iter(streams).flatten().try_chunks2(ROWS_BY_REQUEST));
202+
while let Some(result) = concatenated_stream.next().await {
203+
destination
204+
.batch_write(result?)
205+
.await
206+
.map_err(DestinationError::DeploymentError)?;
207+
}
208+
209+
Ok(())
210+
}
211+
212+
#[try_stream(ok = BatchWriteRow, error = DestinationError)]
213+
async fn row_stream<'a>(
214+
file: String,
215+
operation: BatchWriteOperation,
216+
keys: &'a BTreeMap<String, Vec<u8>>,
217+
encryption: Encryption,
218+
compression: Compression,
219+
reader_params: &'a FivetranReaderParams,
220+
table_name: &'a FivetranTableName,
221+
schema: &'a FivetranTableSchema,
222+
) {
223+
let encryption: FivetranFileEncryption = if encryption == Encryption::Aes {
224+
let key = keys.get(&file).ok_or(DestinationError::InvalidKey)?;
225+
FivetranFileEncryption::Aes {
226+
key: key.clone().try_into()?,
227+
}
228+
} else {
229+
FivetranFileEncryption::None
230+
};
231+
232+
let mut deserializer = create_csv_deserializer(&file, compression, encryption)
233+
.await
234+
.map_err(|err| DestinationError::FileReadError(file.clone(), err))?;
235+
let mut reader = read_rows(&mut deserializer, reader_params, schema);
236+
237+
while let Some(row) = reader.next().await {
238+
let row: ConvexObject = row
239+
.map_err(|err| DestinationError::FileReadError(file.clone(), err))?
240+
.try_into()
241+
.map_err(DestinationError::InvalidRow)?;
242+
243+
yield BatchWriteRow {
244+
table: table_name.to_string(),
245+
operation,
246+
row,
247+
}
248+
}
249+
}

0 commit comments

Comments
 (0)