Skip to content

Commit 9e6084f

Browse files
NicolappsConvex, Inc.
authored and
Convex, Inc.
committed
Convert Convex schemas to Fivetran tables (#26982)
GitOrigin-RevId: 256ac08dcfc7d0f83672b1c904d1f7cf5483ca49
1 parent 216205a commit 9e6084f

File tree

2 files changed

+315
-0
lines changed

2 files changed

+315
-0
lines changed

crates/fivetran_destination/src/error.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,23 @@ pub enum TableSchemaError {
176176
WrongPrimaryKeyIndex(SuggestedIndex),
177177
}
178178

179+
#[derive(Debug, Error)]
180+
pub enum DescribeTableError {
181+
#[error(
182+
"The table `{0}` in the Convex destination stores arbitrary documents, which is not \
183+
supported by Fivetran. Please edit the schema of the table in `schema.ts` so that the \
184+
table isn’t defined as `v.any()`."
185+
)]
186+
DestinationHasAnySchema(TableName),
187+
188+
#[error(
189+
"The table `{0}` in the Convex destination stores multiple different types of documents, \
190+
which is not supported by Fivetran. Please edit the schema of the table in `schema.ts` \
191+
so that the table isn’t defiend as `v.union()`."
192+
)]
193+
DestinationHasMultipleSchemas(TableName),
194+
}
195+
179196
/// Wrapper around `TableDefinition` that formats it in the same format as
180197
/// `schema.ts`.
181198
#[derive(Debug)]

crates/fivetran_destination/src/schema.rs

Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
sync::LazyLock,
99
};
1010

11+
use anyhow::bail;
1112
use common::{
1213
bootstrap_model::index::database_index::IndexedFields,
1314
document::CREATION_TIME_FIELD_PATH,
@@ -30,6 +31,7 @@ use common::{
3031
};
3132
use convex_fivetran_common::fivetran_sdk::{
3233
self,
34+
Column,
3335
DataType as FivetranDataType,
3436
};
3537

@@ -52,11 +54,13 @@ use crate::{
5254
SYNCED_FIVETRAN_FIELD_NAME,
5355
},
5456
error::{
57+
DescribeTableError,
5558
DestinationError,
5659
SuggestedIndex,
5760
SuggestedTable,
5861
TableSchemaError,
5962
},
63+
log,
6064
};
6165

6266
/// The default name of the sync index suggested to the user in error messages.
@@ -540,6 +544,165 @@ pub fn validate_destination_schema_table(
540544
Ok(())
541545
}
542546

547+
/// Converts the given Convex schema table to a Fivetran table. This is used in
548+
/// the implementation of the `AlterTable` endpoint so that Fivetran can be
549+
/// aware of the current state of the Convex destination.
550+
#[allow(dead_code)]
551+
pub fn to_fivetran_table(
552+
convex_table: &TableDefinition,
553+
) -> anyhow::Result<fivetran_sdk::Table, DescribeTableError> {
554+
let fivetran_columns = to_fivetran_columns(convex_table)?;
555+
556+
Ok(fivetran_sdk::Table {
557+
name: convex_table.table_name.to_string(),
558+
columns: fivetran_columns,
559+
})
560+
}
561+
562+
/// Returns the validator for the `fivetran` field of the given Convex table
563+
/// definition.
564+
///
565+
/// Returns `None` if the `fivetran` field isn’t specified or is incorrectly
566+
/// specified.
567+
fn metadata_field_validator(validator: &ObjectValidator) -> Option<&ObjectValidator> {
568+
// System columns
569+
let field_validator = validator.0.get(&METADATA_CONVEX_FIELD_NAME.clone())?;
570+
let Validator::Object(metadata_object_validator) = field_validator.validator() else {
571+
return None;
572+
};
573+
574+
Some(metadata_object_validator)
575+
}
576+
577+
fn user_columns(table_def: &TableDefinition, validator: &ObjectValidator) -> Vec<Column> {
578+
let primary_key_index = table_def.indexes.get(&PRIMARY_KEY_INDEX_DESCRIPTOR);
579+
if primary_key_index.is_none() {
580+
log(&format!(
581+
"The table {} in your Convex schema is missing a `by_primary_key` index, so Fivetran \
582+
will not able to identify the columns of its primary key.",
583+
table_def.table_name
584+
));
585+
}
586+
587+
validator
588+
.0
589+
.iter()
590+
.filter(|(field_name, _)| **field_name != *METADATA_CONVEX_FIELD_NAME)
591+
.flat_map(|(field_name, field_validator)| {
592+
let fivetran_data_type = recognize_fivetran_type(field_validator.validator()).ok();
593+
if fivetran_data_type.is_none() {
594+
log(&format!(
595+
"The type of the field `field_name` in the table `{}` isn’t supported by \
596+
Fivetran.",
597+
table_def.table_name
598+
))
599+
}
600+
601+
Some(fivetran_sdk::Column {
602+
name: field_name.to_string(),
603+
r#type: fivetran_data_type.unwrap_or(FivetranDataType::Unspecified) as i32,
604+
primary_key: primary_key_index.is_some_and(|primary_key_index| {
605+
primary_key_index
606+
.fields
607+
.contains(&FieldPath::for_root_field(field_name.clone()))
608+
}),
609+
decimal: None,
610+
})
611+
})
612+
.collect()
613+
}
614+
615+
fn to_fivetran_columns(
616+
table_def: &TableDefinition,
617+
) -> Result<Vec<fivetran_sdk::Column>, DescribeTableError> {
618+
let Some(DocumentSchema::Union(validators)) = &table_def.document_type else {
619+
return Err(DescribeTableError::DestinationHasAnySchema(
620+
table_def.table_name.clone(),
621+
));
622+
};
623+
let [validator] = &validators[..] else {
624+
return Err(DescribeTableError::DestinationHasMultipleSchemas(
625+
table_def.table_name.clone(),
626+
));
627+
};
628+
629+
let mut columns: Vec<fivetran_sdk::Column> = Vec::new();
630+
631+
// System columns
632+
let metadata_validator = metadata_field_validator(validator);
633+
if let Some(metadata_validator) = metadata_validator {
634+
// Soft delete
635+
if metadata_validator
636+
.0
637+
.contains_key(&SOFT_DELETE_CONVEX_FIELD_NAME.clone())
638+
{
639+
columns.push(fivetran_sdk::Column {
640+
name: SOFT_DELETE_FIVETRAN_FIELD_NAME.to_string(),
641+
r#type: FivetranDataType::Boolean as i32,
642+
primary_key: false,
643+
decimal: None,
644+
});
645+
}
646+
647+
// Fivetran pseudo-ID
648+
if let Some(field_validator) = metadata_validator.0.get(&ID_CONVEX_FIELD_NAME.clone()) {
649+
let id_field_type = recognize_fivetran_type(field_validator.validator()).ok();
650+
if id_field_type.is_none() {
651+
log(&format!(
652+
"The type of the field `convex.id` in the table `{}` isn’t supported by \
653+
Fivetran.",
654+
table_def.table_name
655+
))
656+
}
657+
658+
columns.push(fivetran_sdk::Column {
659+
name: ID_FIVETRAN_FIELD_NAME.to_string(),
660+
r#type: id_field_type.unwrap_or(FivetranDataType::Unspecified) as i32,
661+
primary_key: true,
662+
decimal: None,
663+
});
664+
}
665+
666+
// Synchronization timestamp
667+
columns.push(fivetran_sdk::Column {
668+
name: SYNCED_FIVETRAN_FIELD_NAME.to_string(),
669+
r#type: FivetranDataType::UtcDatetime as i32,
670+
primary_key: false,
671+
decimal: None,
672+
});
673+
}
674+
675+
// User columns
676+
columns.append(&mut user_columns(table_def, validator));
677+
678+
Ok(columns)
679+
}
680+
681+
fn recognize_fivetran_type(validator: &Validator) -> anyhow::Result<FivetranDataType> {
682+
match validator {
683+
Validator::Float64 => Ok(FivetranDataType::Double),
684+
Validator::Int64 => Ok(FivetranDataType::Long),
685+
Validator::Boolean => Ok(FivetranDataType::Boolean),
686+
Validator::String => Ok(FivetranDataType::String),
687+
Validator::Bytes => Ok(FivetranDataType::Binary),
688+
Validator::Object(_) | Validator::Array(_) => Ok(FivetranDataType::Json),
689+
690+
// Allow nullable types
691+
Validator::Union(validators) => match &validators[..] {
692+
[v] | [Validator::Null, v] | [v, Validator::Null] => recognize_fivetran_type(v),
693+
_ => bail!("Unsupported union"),
694+
},
695+
696+
Validator::Null
697+
| Validator::Literal(_)
698+
| Validator::Id(_)
699+
| Validator::Set(_)
700+
| Validator::Record(..)
701+
| Validator::Map(..)
702+
| Validator::Any => bail!("The type of this Convex column isn’t supported by Fivetran."),
703+
}
704+
}
705+
543706
#[cfg(test)]
544707
mod tests {
545708
use std::{
@@ -593,6 +756,7 @@ mod tests {
593756
};
594757
use crate::{
595758
error::DestinationError,
759+
schema::to_fivetran_table,
596760
testing::fivetran_table_strategy,
597761
};
598762

@@ -1132,6 +1296,140 @@ mod tests {
11321296
Ok(())
11331297
}
11341298

1299+
#[test]
1300+
fn it_converts_convex_tables_to_fivetran_tables() -> anyhow::Result<()> {
1301+
assert_eq!(
1302+
to_fivetran_table(&convex_table(
1303+
btreemap! {
1304+
"id" => FieldValidator::required_field_type(Validator::Int64),
1305+
"name" => FieldValidator::required_field_type(Validator::Union(vec![
1306+
Validator::Null,
1307+
Validator::String,
1308+
])),
1309+
"fivetran" => FieldValidator::required_field_type(Validator::Object(
1310+
object_validator!(
1311+
"synced" => FieldValidator::required_field_type(Validator::Float64),
1312+
),
1313+
)),
1314+
},
1315+
btreemap! {
1316+
"by_primary_key" => vec![
1317+
FieldPath::new(vec![
1318+
IdentifierFieldName::from_str("id")?,
1319+
])?,
1320+
CREATION_TIME_FIELD_PATH.clone(),
1321+
],
1322+
"sync_index" => vec![
1323+
FieldPath::new(vec![
1324+
IdentifierFieldName::from_str("fivetran")?,
1325+
IdentifierFieldName::from_str("synced")?,
1326+
])?,
1327+
CREATION_TIME_FIELD_PATH.clone(),
1328+
],
1329+
},
1330+
))?,
1331+
Table {
1332+
name: "table_name".into(),
1333+
columns: vec![
1334+
Column {
1335+
name: "_fivetran_synced".to_string(),
1336+
r#type: FivetranDataType::UtcDatetime as i32,
1337+
primary_key: false,
1338+
decimal: None,
1339+
},
1340+
Column {
1341+
name: "id".to_string(),
1342+
r#type: FivetranDataType::Long as i32,
1343+
primary_key: true,
1344+
decimal: None,
1345+
},
1346+
Column {
1347+
name: "name".to_string(),
1348+
r#type: FivetranDataType::String as i32,
1349+
primary_key: false,
1350+
decimal: None,
1351+
},
1352+
],
1353+
}
1354+
);
1355+
1356+
Ok(())
1357+
}
1358+
1359+
#[test]
1360+
fn it_converts_convex_tables_to_fivetran_tables_with_soft_deletes_and_fivetran_id(
1361+
) -> anyhow::Result<()> {
1362+
assert_eq!(
1363+
to_fivetran_table(&convex_table(
1364+
btreemap! {
1365+
"data" => FieldValidator::required_field_type(Validator::Bytes),
1366+
"fivetran" => FieldValidator::required_field_type(Validator::Object(
1367+
object_validator!(
1368+
"synced" => FieldValidator::required_field_type(Validator::Float64),
1369+
"id" => FieldValidator::required_field_type(Validator::String),
1370+
"deleted" => FieldValidator::required_field_type(Validator::Boolean),
1371+
),
1372+
)),
1373+
},
1374+
btreemap! {
1375+
"by_primary_key" => vec![
1376+
FieldPath::new(vec![
1377+
IdentifierFieldName::from_str("fivetran")?,
1378+
IdentifierFieldName::from_str("deleted")?,
1379+
])?,
1380+
FieldPath::new(vec![
1381+
IdentifierFieldName::from_str("fivetran")?,
1382+
IdentifierFieldName::from_str("id")?,
1383+
])?,
1384+
CREATION_TIME_FIELD_PATH.clone(),
1385+
],
1386+
"sync_index" => vec![
1387+
FieldPath::new(vec![
1388+
IdentifierFieldName::from_str("fivetran")?,
1389+
IdentifierFieldName::from_str("deleted")?,
1390+
])?,
1391+
FieldPath::new(vec![
1392+
IdentifierFieldName::from_str("fivetran")?,
1393+
IdentifierFieldName::from_str("synced")?,
1394+
])?,
1395+
CREATION_TIME_FIELD_PATH.clone(),
1396+
],
1397+
},
1398+
))?,
1399+
Table {
1400+
name: "table_name".into(),
1401+
columns: vec![
1402+
Column {
1403+
name: "_fivetran_deleted".to_string(),
1404+
r#type: FivetranDataType::Boolean as i32,
1405+
primary_key: false,
1406+
decimal: None,
1407+
},
1408+
Column {
1409+
name: "_fivetran_id".to_string(),
1410+
r#type: FivetranDataType::String as i32,
1411+
primary_key: true,
1412+
decimal: None,
1413+
},
1414+
Column {
1415+
name: "_fivetran_synced".to_string(),
1416+
r#type: FivetranDataType::UtcDatetime as i32,
1417+
primary_key: false,
1418+
decimal: None,
1419+
},
1420+
Column {
1421+
name: "data".to_string(),
1422+
r#type: FivetranDataType::Binary as i32,
1423+
primary_key: false,
1424+
decimal: None,
1425+
},
1426+
],
1427+
}
1428+
);
1429+
1430+
Ok(())
1431+
}
1432+
11351433
#[test]
11361434
fn it_suggests_convex_tables() -> anyhow::Result<()> {
11371435
let fivetran_table = fivetran_table_schema(

0 commit comments

Comments
 (0)