Skip to content

Commit 190ee2e

Browse files
author
Sean Loiselle
committed
sql: add useless EXPOSE PROGRESS AS clause to CREATE SOURCE
we will let users specify a name for progress collections, so add parsing but error if specified
1 parent efbc8df commit 190ee2e

File tree

8 files changed

+64
-1
lines changed

8 files changed

+64
-1
lines changed

src/sql-parser/src/ast/defs/statement.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ pub struct CreateSourceStatement<T: AstInfo> {
560560
pub key_constraint: Option<KeyConstraint>,
561561
pub with_options: Vec<CreateSourceOption<T>>,
562562
pub referenced_subsources: Option<ReferencedSubsources<T>>,
563+
pub progress_subsource: Option<DeferredObjectName<T>>,
563564
}
564565

565566
impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
@@ -604,6 +605,11 @@ impl<T: AstInfo> AstDisplay for CreateSourceStatement<T> {
604605
f.write_node(subsources);
605606
}
606607

608+
if let Some(progress) = &self.progress_subsource {
609+
f.write_str(" EXPOSE PROGRESS AS ");
610+
f.write_node(progress);
611+
}
612+
607613
if !self.with_options.is_empty() {
608614
f.write_str(" WITH (");
609615
f.write_node(&display::comma_separated(&self.with_options));

src/sql-parser/src/keywords.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ Execute
126126
Exists
127127
Expected
128128
Explain
129+
Expose
129130
Extract
130131
Factor
131132
False

src/sql-parser/src/parser.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2439,6 +2439,12 @@ impl<'a> Parser<'a> {
24392439
None
24402440
};
24412441

2442+
let progress_subsource = if self.parse_keywords(&[EXPOSE, PROGRESS, AS]) {
2443+
Some(self.parse_deferred_object_name()?)
2444+
} else {
2445+
None
2446+
};
2447+
24422448
// New WITH block
24432449
let with_options = if self.parse_keyword(WITH) {
24442450
self.expect_token(&Token::LParen)?;
@@ -2459,8 +2465,9 @@ impl<'a> Parser<'a> {
24592465
envelope,
24602466
if_not_exists,
24612467
key_constraint,
2462-
with_options,
24632468
referenced_subsources,
2469+
progress_subsource,
2470+
with_options,
24642471
}))
24652472
}
24662473

src/sql-parser/tests/testdata/ddl

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1631,3 +1631,10 @@ CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') WI
16311631
error: Expected end of statement, found FOR
16321632
CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') WITH (SIZE = 'small') FOR ALL TABLES;
16331633
^
1634+
1635+
parse-statement
1636+
CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION 'mz_source') FOR ALL TABLES EXPOSE PROGRESS AS foo.bar WITH (SIZE = 'small');
1637+
----
1638+
CREATE SOURCE mz_source FROM POSTGRES CONNECTION pg (PUBLICATION = 'mz_source') FOR ALL TABLES EXPOSE PROGRESS AS foo.bar WITH (SIZE = 'small')
1639+
=>
1640+
CreateSource(CreateSourceStatement { name: UnresolvedObjectName([Ident("mz_source")]), in_cluster: None, col_names: [], connection: Postgres { connection: Name(UnresolvedObjectName([Ident("pg")])), options: [PgConfigOption { name: Publication, value: Some(Value(String("mz_source"))) }] }, include_metadata: [], format: None, envelope: None, if_not_exists: false, key_constraint: None, with_options: [CreateSourceOption { name: Size, value: Some(Value(String("small"))) }], referenced_subsources: Some(All), progress_subsource: Some(Deferred(UnresolvedObjectName([Ident("foo"), Ident("bar")]))) })

src/sql/src/normalize.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ pub fn create_statement(
307307
key_constraint: _,
308308
with_options: _,
309309
referenced_subsources: _,
310+
progress_subsource: _,
310311
}) => {
311312
*name = allocate_name(name)?;
312313
*if_not_exists = false;

src/sql/src/plan/statement/ddl.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -347,8 +347,14 @@ pub fn plan_create_source(
347347
include_metadata,
348348
with_options,
349349
referenced_subsources,
350+
progress_subsource,
350351
} = &stmt;
351352

353+
// TODO: enable progress subsources.
354+
if progress_subsource.is_some() {
355+
sql_bail!("[internal error] should have errored in purification");
356+
}
357+
352358
let envelope = envelope.clone().unwrap_or(Envelope::None);
353359

354360
const SAFE_WITH_OPTIONS: &[CreateSourceOptionName] = &[CreateSourceOptionName::Size];

src/sql/src/pure.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,14 @@ pub async fn purify_create_source(
111111
envelope,
112112
include_metadata: _,
113113
referenced_subsources: requested_subsources,
114+
progress_subsource,
114115
..
115116
} = &mut stmt;
116117

118+
if progress_subsource.is_some() {
119+
bail_unsupported!("PROGRESS subsources not yet supported");
120+
}
121+
117122
// Disallow manually targetting subsources, this syntax is reserved for purification only
118123
if let Some(ReferencedSubsources::Subset(subsources)) = requested_subsources {
119124
for CreateSourceSubsource {

test/testdrive/kafka-progress.td

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Copyright Materialize, Inc. and contributors. All rights reserved.
2+
#
3+
# Use of this software is governed by the Business Source License
4+
# included in the LICENSE file at the root of this repository.
5+
#
6+
# As of the Change Date specified in that file, in accordance with
7+
# the Business Source License, use of this software will be governed
8+
# by the Apache License, Version 2.0.
9+
10+
# Create sources and verify they can ingest data while `environmentd` is online.
11+
12+
$ kafka-create-topic topic=data
13+
$ kafka-ingest format=bytes topic=data
14+
one
15+
16+
> CREATE CONNECTION kafka_conn
17+
TO KAFKA (BROKER '${testdrive.kafka-addr}');
18+
19+
> CREATE SOURCE data
20+
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
21+
FORMAT TEXT;
22+
23+
> SELECT * from data
24+
one
25+
26+
! CREATE SOURCE d
27+
FROM KAFKA CONNECTION kafka_conn (TOPIC 'testdrive-data-${testdrive.seed}')
28+
FORMAT TEXT
29+
EXPOSE PROGRESS AS exposed_progress_data;
30+
contains:PROGRESS subsources not yet supported

0 commit comments

Comments
 (0)