Skip to content

Commit 0c6d9d6

Browse files
author
Sean Loiselle
committed
sql: add sql::plan::DataSourceDesc to express more source types
We previously only had Option<Ingestion> to describe sources, but with the coming progress subsources, we need more expressiveness.
1 parent 190ee2e commit 0c6d9d6

File tree

4 files changed

+43
-24
lines changed

4 files changed

+43
-24
lines changed

src/adapter/src/catalog.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5545,20 +5545,25 @@ impl Catalog {
55455545
..
55465546
}) => CatalogItem::Source(Source {
55475547
create_sql: source.create_sql,
5548-
data_source: match source.ingestion {
5549-
Some(ingestion) => DataSourceDesc::Ingestion(Ingestion {
5550-
desc: ingestion.desc,
5551-
source_imports: ingestion.source_imports,
5552-
subsource_exports: ingestion.subsource_exports,
5553-
cluster_id: match cluster_config {
5554-
plan::SourceSinkClusterConfig::Existing { id } => id,
5555-
plan::SourceSinkClusterConfig::Linked { .. }
5556-
| plan::SourceSinkClusterConfig::Undefined => {
5557-
self.state.clusters_by_linked_object_id[&id]
5558-
}
5559-
},
5560-
}),
5561-
None => DataSourceDesc::Source,
5548+
data_source: match source.data_source {
5549+
mz_sql::plan::DataSourceDesc::Ingestion(ingestion) => {
5550+
DataSourceDesc::Ingestion(Ingestion {
5551+
desc: ingestion.desc,
5552+
source_imports: ingestion.source_imports,
5553+
subsource_exports: ingestion.subsource_exports,
5554+
cluster_id: match cluster_config {
5555+
plan::SourceSinkClusterConfig::Existing { id } => id,
5556+
plan::SourceSinkClusterConfig::Linked { .. }
5557+
| plan::SourceSinkClusterConfig::Undefined => {
5558+
self.state.clusters_by_linked_object_id[&id]
5559+
}
5560+
},
5561+
})
5562+
}
5563+
mz_sql::plan::DataSourceDesc::Progress => {
5564+
unreachable!("progress subsources error in purification")
5565+
}
5566+
mz_sql::plan::DataSourceDesc::Source => DataSourceDesc::Source,
55625567
},
55635568
desc: source.desc,
55645569
timeline,

src/adapter/src/coord/sequencer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -494,8 +494,8 @@ impl Coordinator {
494494
let source_oid = self.catalog.allocate_oid()?;
495495
let source = catalog::Source {
496496
create_sql: plan.source.create_sql,
497-
data_source: match plan.source.ingestion {
498-
Some(ingestion) => {
497+
data_source: match plan.source.data_source {
498+
mz_sql::plan::DataSourceDesc::Ingestion(ingestion) => {
499499
let cluster_id = self
500500
.create_linked_cluster_ops(
501501
source_id,
@@ -511,7 +511,10 @@ impl Coordinator {
511511
cluster_id,
512512
})
513513
}
514-
None => {
514+
mz_sql::plan::DataSourceDesc::Progress => {
515+
unreachable!("PROGRESS subsources error in purification");
516+
}
517+
mz_sql::plan::DataSourceDesc::Source => {
515518
assert!(
516519
matches!(
517520
plan.cluster_config,

src/sql/src/plan.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -636,10 +636,20 @@ pub struct Table {
636636
#[derive(Clone, Debug)]
637637
pub struct Source {
638638
pub create_sql: String,
639-
pub ingestion: Option<Ingestion>,
639+
pub data_source: DataSourceDesc,
640640
pub desc: RelationDesc,
641641
}
642642

643+
#[derive(Debug, Clone)]
644+
pub enum DataSourceDesc {
645+
/// Receives data from an external system
646+
Ingestion(Ingestion),
647+
/// Receives data from some other source
648+
Source,
649+
/// Receives data from the source's reclocking/remapping operations.
650+
Progress,
651+
}
652+
643653
#[derive(Clone, Debug)]
644654
pub struct Ingestion {
645655
pub desc: SourceDesc,

src/sql/src/plan/statement/ddl.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,11 @@ use crate::plan::{
109109
ComputeReplicaIntrospectionConfig, CreateClusterPlan, CreateClusterReplicaPlan,
110110
CreateConnectionPlan, CreateDatabasePlan, CreateIndexPlan, CreateMaterializedViewPlan,
111111
CreateRolePlan, CreateSchemaPlan, CreateSecretPlan, CreateSinkPlan, CreateSourcePlan,
112-
CreateTablePlan, CreateTypePlan, CreateViewPlan, DropClusterReplicasPlan, DropClustersPlan,
113-
DropDatabasePlan, DropItemsPlan, DropRolesPlan, DropSchemaPlan, FullObjectName, HirScalarExpr,
114-
Index, Ingestion, MaterializedView, Params, Plan, QueryContext, ReplicaConfig, RotateKeysPlan,
115-
Secret, Sink, Source, SourceSinkClusterConfig, Table, Type, View,
112+
CreateTablePlan, CreateTypePlan, CreateViewPlan, DataSourceDesc, DropClusterReplicasPlan,
113+
DropClustersPlan, DropDatabasePlan, DropItemsPlan, DropRolesPlan, DropSchemaPlan,
114+
FullObjectName, HirScalarExpr, Index, Ingestion, MaterializedView, Params, Plan, QueryContext,
115+
ReplicaConfig, RotateKeysPlan, Secret, Sink, Source, SourceSinkClusterConfig, Table, Type,
116+
View,
116117
};
117118

118119
pub fn describe_create_database(
@@ -1069,7 +1070,7 @@ pub fn plan_create_source(
10691070

10701071
let source = Source {
10711072
create_sql,
1072-
ingestion: Some(Ingestion {
1073+
data_source: DataSourceDesc::Ingestion(Ingestion {
10731074
desc: source_desc,
10741075
// Currently no source reads from another source
10751076
source_imports: BTreeSet::new(),
@@ -1179,7 +1180,7 @@ pub fn plan_create_subsource(
11791180

11801181
let source = Source {
11811182
create_sql,
1182-
ingestion: None,
1183+
data_source: DataSourceDesc::Source,
11831184
desc,
11841185
};
11851186

0 commit comments

Comments
 (0)