Skip to content

Commit 9ca0771

Browse files
NicolappsConvex, Inc.
authored and
Convex, Inc.
committed
Serialize the streaming export selection types (#37352)
This adds serializable equivalents to the streaming export selection types introduced by #37310. The serialization itself is safely done by `serde_json`, but I used some attributes to make sure the JSON format is concise and easy to read. For instance, the following value: ```rs Selection { components: btreemap! { "".to_string() => ComponentSelection::Included { tables: btreemap! { "users".to_string() => TableSelection::Included { columns: btreemap! { "name".to_string() => ColumnInclusion::Included, "_creationTime".to_string() => ColumnInclusion::Included, "_deleted".to_string() => ColumnInclusion::Included, "password".to_string() => ColumnInclusion::Excluded, }, other_columns: InclusionDefault::Excluded, }, }, other_tables: InclusionDefault::Excluded, }, "waitlist".to_string() => ComponentSelection::Excluded, }, other_components: InclusionDefault::Included, } ``` is serialized as the following JSON object: ```js { "": { "users": { "name": "incl", "_creationTime": "incl", "_deleted": "incl", "password": "excl", "_other": "excl" }, "_other": "excl" }, "waitlist": "excl", "_other": "incl" } ``` GitOrigin-RevId: d84ec1933c796e4cf01c12bd407ecf5f69a7368f
1 parent d731a26 commit 9ca0771

File tree

5 files changed

+382
-0
lines changed

5 files changed

+382
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/database/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ async-trait = { workspace = true }
1616
async_lru = { path = "../async_lru" }
1717
cmd_util = { path = "../cmd_util" }
1818
common = { path = "../common" }
19+
convex_fivetran_source = { path = "../fivetran_source" }
1920
convex_macro = { path = "../convex_macro" }
2021
derive_more = { workspace = true }
2122
errors = { path = "../errors" }

crates/database/src/streaming_export_selection.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ use common::{
9191
},
9292
pii::PII,
9393
};
94+
use convex_fivetran_source::api_types::selection as serialized;
95+
#[cfg(test)]
96+
use proptest::prelude::*;
9497
#[cfg(test)]
9598
use proptest_derive::Arbitrary;
9699
use value::{
@@ -104,6 +107,11 @@ use value::{
104107
pub struct StreamingExportSelection {
105108
/// For each listed component, defines what to do with it in the
106109
/// streaming export.
110+
#[cfg_attr(
111+
test,
112+
proptest(strategy = "prop::collection::btree_map(any::<ComponentPath>(), \
113+
any::<StreamingExportComponentSelection>(), 0..3)")
114+
)]
107115
pub components: BTreeMap<ComponentPath, StreamingExportComponentSelection>,
108116

109117
/// Whether to include components that are not listed in `components`.
@@ -187,6 +195,11 @@ impl StreamingExportSelection {
187195
pub enum StreamingExportComponentSelection {
188196
Excluded,
189197
Included {
198+
#[cfg_attr(
199+
test,
200+
proptest(strategy = "prop::collection::btree_map(any::<TableName>(), \
201+
any::<StreamingExportTableSelection>(), 0..3)")
202+
)]
190203
tables: BTreeMap<TableName, StreamingExportTableSelection>,
191204
other_tables: StreamingExportInclusionDefault,
192205
},
@@ -235,6 +248,11 @@ pub enum StreamingExportTableSelection {
235248
/// columns.
236249
#[cfg_attr(test, derive(Clone, Eq, PartialEq, Debug, Arbitrary))]
237250
pub struct StreamingExportColumnSelection {
251+
#[cfg_attr(
252+
test,
253+
proptest(strategy = "prop::collection::btree_map(any::<FieldName>(), \
254+
any::<StreamingExportColumnInclusion>(), 0..3)")
255+
)]
238256
columns: BTreeMap<FieldName, StreamingExportColumnInclusion>,
239257
other_columns: StreamingExportInclusionDefault,
240258
}
@@ -443,6 +461,87 @@ mod tests_is_table_included {
443461
}
444462
}
445463

464+
impl TryFrom<serialized::Selection> for StreamingExportSelection {
465+
type Error = anyhow::Error;
466+
467+
fn try_from(
468+
serialized::Selection {
469+
components,
470+
other_components,
471+
}: serialized::Selection,
472+
) -> Result<Self, Self::Error> {
473+
Ok(Self {
474+
components: components
475+
.into_iter()
476+
.map(|(k, v)| -> anyhow::Result<_> { Ok((k.parse()?, v.try_into()?)) })
477+
.try_collect()?,
478+
other_components: other_components.into(),
479+
})
480+
}
481+
}
482+
483+
impl From<serialized::ColumnInclusion> for StreamingExportColumnInclusion {
484+
fn from(value: serialized::ColumnInclusion) -> Self {
485+
match value {
486+
serialized::ColumnInclusion::Excluded => Self::Excluded,
487+
serialized::ColumnInclusion::Included => Self::Included,
488+
}
489+
}
490+
}
491+
492+
impl TryFrom<serialized::ComponentSelection> for StreamingExportComponentSelection {
493+
type Error = anyhow::Error;
494+
495+
fn try_from(value: serialized::ComponentSelection) -> Result<Self, Self::Error> {
496+
Ok(match value {
497+
serialized::ComponentSelection::Excluded => Self::Excluded,
498+
serialized::ComponentSelection::Included {
499+
tables,
500+
other_tables,
501+
} => Self::Included {
502+
tables: tables
503+
.into_iter()
504+
.map(|(k, v)| -> anyhow::Result<_> { Ok((k.parse()?, v.try_into()?)) })
505+
.try_collect()?,
506+
other_tables: other_tables.into(),
507+
},
508+
})
509+
}
510+
}
511+
512+
impl TryFrom<serialized::TableSelection> for StreamingExportTableSelection {
513+
type Error = anyhow::Error;
514+
515+
fn try_from(value: serialized::TableSelection) -> Result<Self, Self::Error> {
516+
Ok(match value {
517+
serialized::TableSelection::Excluded => Self::Excluded,
518+
serialized::TableSelection::Included {
519+
columns,
520+
other_columns,
521+
} => {
522+
let column_selection = StreamingExportColumnSelection {
523+
columns: columns
524+
.into_iter()
525+
.map(|(k, v)| -> anyhow::Result<_> { Ok((k.parse()?, v.into())) })
526+
.try_collect()?,
527+
other_columns: other_columns.into(),
528+
};
529+
530+
Self::Included(column_selection)
531+
},
532+
})
533+
}
534+
}
535+
536+
impl From<serialized::InclusionDefault> for StreamingExportInclusionDefault {
537+
fn from(value: serialized::InclusionDefault) -> Self {
538+
match value {
539+
serialized::InclusionDefault::Excluded => Self::Excluded,
540+
serialized::InclusionDefault::Included => Self::Included,
541+
}
542+
}
543+
}
544+
446545
#[cfg(test)]
447546
mod tests_column_filtering {
448547
use common::document::CreationTime;
@@ -601,3 +700,95 @@ mod tests_column_filtering {
601700
Ok(())
602701
}
603702
}
703+
704+
#[cfg(test)]
705+
mod tests_serialization {
706+
use cmd_util::env::env_config;
707+
use proptest::prelude::*;
708+
709+
use super::*;
710+
711+
impl From<StreamingExportSelection> for serialized::Selection {
712+
fn from(
713+
StreamingExportSelection {
714+
components,
715+
other_components,
716+
}: StreamingExportSelection,
717+
) -> Self {
718+
Self {
719+
components: components
720+
.into_iter()
721+
.map(|(k, v)| (k.to_string(), v.into()))
722+
.collect(),
723+
other_components: other_components.into(),
724+
}
725+
}
726+
}
727+
728+
impl From<StreamingExportColumnInclusion> for serialized::ColumnInclusion {
729+
fn from(value: StreamingExportColumnInclusion) -> Self {
730+
match value {
731+
StreamingExportColumnInclusion::Excluded => Self::Excluded,
732+
StreamingExportColumnInclusion::Included => Self::Included,
733+
}
734+
}
735+
}
736+
737+
impl From<StreamingExportComponentSelection> for serialized::ComponentSelection {
738+
fn from(value: StreamingExportComponentSelection) -> Self {
739+
match value {
740+
StreamingExportComponentSelection::Excluded => Self::Excluded,
741+
StreamingExportComponentSelection::Included {
742+
tables,
743+
other_tables,
744+
} => Self::Included {
745+
tables: tables
746+
.into_iter()
747+
.map(|(k, v)| (k.to_string(), v.into()))
748+
.collect(),
749+
other_tables: other_tables.into(),
750+
},
751+
}
752+
}
753+
}
754+
755+
impl From<StreamingExportTableSelection> for serialized::TableSelection {
756+
fn from(value: StreamingExportTableSelection) -> Self {
757+
match value {
758+
StreamingExportTableSelection::Excluded => Self::Excluded,
759+
StreamingExportTableSelection::Included(StreamingExportColumnSelection {
760+
columns,
761+
other_columns,
762+
}) => Self::Included {
763+
columns: columns
764+
.into_iter()
765+
.map(|(k, v)| (k.to_string(), v.into()))
766+
.collect(),
767+
other_columns: other_columns.into(),
768+
},
769+
}
770+
}
771+
}
772+
773+
impl From<StreamingExportInclusionDefault> for serialized::InclusionDefault {
774+
fn from(value: StreamingExportInclusionDefault) -> Self {
775+
match value {
776+
StreamingExportInclusionDefault::Excluded => Self::Excluded,
777+
StreamingExportInclusionDefault::Included => Self::Included,
778+
}
779+
}
780+
}
781+
782+
proptest! {
783+
#![proptest_config(ProptestConfig {
784+
cases: 64 * env_config("CONVEX_PROPTEST_MULTIPLIER", 1),
785+
failure_persistence: None, ..ProptestConfig::default()
786+
})]
787+
#[test]
788+
fn test_streaming_export_selection_roundtrip(value in any::<StreamingExportSelection>()) {
789+
let serialized: serialized::Selection = value.clone().into();
790+
let deserialized: StreamingExportSelection = serialized.try_into().expect("Can't roundtrip");
791+
prop_assert_eq!(value, deserialized);
792+
}
793+
}
794+
}

crates/fivetran_source/src/api_types.rs renamed to crates/fivetran_source/src/api_types/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// API types for the HTTP APIs used by the Fivetran and Airbyte source
22
// connectors
33

4+
pub mod selection;
5+
46
use std::collections::BTreeMap;
57

68
use serde::{

0 commit comments

Comments
 (0)