Skip to content

Commit 26068a6

Browse files
committed
Add COPY .. TO .. syntax support
1 parent 7563cdb commit 26068a6

File tree

4 files changed

+299
-9
lines changed

4 files changed

+299
-9
lines changed
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
# tests for copy command
19+
20+
statement ok
21+
create table source_table(col1 integer, col2 varchar) as values (1, 'Foo'), (2, 'Bar');
22+
23+
# create an external parquet file
24+
# TODO figure out how to use a real tempfile name
25+
#statement ok
26+
#COPY source_table to '/tmp/table.parquet';
27+
28+
29+
# create an external parquet file from a query
30+
# TODO figure out how to use a real tempfile name
31+
statement ok
32+
COPY (select col2, sum(col1) from source_table group by col2 order by col2) to '/tmp/table.parquet';
33+
34+
# Read the data back in
35+
statement ok
36+
CREATE external table new_table STORED as PARQUET LOCATION '/tmp/table.parquet';
37+
38+
# Read the data back in
39+
query IT
40+
select * from new_table;
41+
----
42+
1 Foo
43+
2 Bar
44+
45+
statement ok
46+
drop table new_table;
47+
48+
49+
# TODO error cases:
50+
# incomplete statement
51+
# pass non literal to COPY options
52+
53+
54+
55+
56+
57+
# TODO add support for reading directly for files without needing to create an external table
58+
# (TODO find ticket)
59+
60+
61+
62+
# TODO test copying to partitioned target (directory)
63+
64+
65+
# TODO test copying with options
66+
67+
68+
69+
# TODO test query source
70+
71+
72+
73+
# TODO: support CSV, JSON and AVRO
74+
75+
# TODO: document different options
76+
77+
78+
79+
statement ok
80+
drop table source_table;

datafusion/sql/src/parser.rs

Lines changed: 167 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! DataFusion SQL Parser based on [`sqlparser`]
1919
2020
use datafusion_common::parsers::CompressionTypeVariant;
21-
use sqlparser::ast::OrderByExpr;
21+
use sqlparser::ast::{OrderByExpr, Query, Value};
2222
use sqlparser::{
2323
ast::{
2424
ColumnDef, ColumnOptionDef, ObjectName, Statement as SQLStatement,
@@ -42,6 +42,46 @@ fn parse_file_type(s: &str) -> Result<String, ParserError> {
4242
Ok(s.to_uppercase())
4343
}
4444

45+
/// DataFusion extension DDL for `COPY`
46+
///
47+
/// Syntax:
48+
///
49+
/// ```text
50+
/// COPY <table_name | (<query>)>
51+
/// TO
52+
/// <destination_url>
53+
/// (key_value_list)
54+
///
55+
/// ```
56+
/// Examples
57+
/// ``sql
58+
/// COPY lineitem TO 'lineitem'
59+
/// (format parquet,
60+
/// partitions 16,
61+
/// row_group_limit_rows 100000,
62+
// row_group_limit_bytes
63+
/// )
64+
///
65+
/// COPY (SELECT l_orderkey from lineitem) to 'lineitem.parquet';
66+
/// ```
67+
#[derive(Debug, Clone, PartialEq, Eq)]
68+
pub struct CopyToStatement {
69+
/// From where the data comes from
70+
pub source: CopyToSource,
71+
/// The URL to where the data is heading
72+
pub target: String,
73+
/// Target specific options
74+
pub options: HashMap<String, Value>,
75+
}
76+
77+
#[derive(Debug, Clone, PartialEq, Eq)]
78+
pub enum CopyToSource {
79+
/// `COPY <table> TO ...`
80+
Relation(ObjectName),
81+
/// COPY (query...) TO ...
82+
Query(Query),
83+
}
84+
4585
/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
4686
///
4787
/// Syntax:
@@ -117,12 +157,14 @@ pub struct DescribeTableStmt {
117157
/// Tokens parsed by [`DFParser`] are converted into these values.
118158
#[derive(Debug, Clone, PartialEq, Eq)]
119159
pub enum Statement {
120-
/// ANSI SQL AST node
160+
/// ANSI SQL AST node (from sqlparser-rs)
121161
Statement(Box<SQLStatement>),
122162
/// Extension: `CREATE EXTERNAL TABLE`
123163
CreateExternalTable(CreateExternalTable),
124164
/// Extension: `DESCRIBE TABLE`
125165
DescribeTableStmt(DescribeTableStmt),
166+
/// Extension: `COPY TO`
167+
CopyTo(CopyToStatement),
126168
}
127169

128170
/// DataFusion SQL Parser based on [`sqlparser`]
@@ -211,6 +253,11 @@ impl<'a> DFParser<'a> {
211253
// use custom parsing
212254
self.parse_create()
213255
}
256+
Keyword::COPY => {
257+
// move one token forward
258+
self.parser.next_token();
259+
self.parse_copy()
260+
}
214261
Keyword::DESCRIBE => {
215262
// move one token forward
216263
self.parser.next_token();
@@ -242,6 +289,37 @@ impl<'a> DFParser<'a> {
242289
}))
243290
}
244291

292+
/// Parse a SQL `COPY TO` statement
293+
pub fn parse_copy(&mut self) -> Result<Statement, ParserError> {
294+
// parse as a query
295+
let source = if self.parser.consume_token(&Token::LParen) {
296+
let query = self.parser.parse_query()?;
297+
self.parser.expect_token(&Token::RParen)?;
298+
CopyToSource::Query(query)
299+
} else {
300+
// parse as table reference
301+
let table_name = self.parser.parse_object_name()?;
302+
CopyToSource::Relation(table_name)
303+
};
304+
305+
self.parser.expect_keyword(Keyword::TO)?;
306+
307+
let target = self.parser.parse_literal_string()?;
308+
309+
// check for options in parens
310+
let options = if self.parser.peek_token().token == Token::LParen {
311+
self.parse_value_options()?
312+
} else {
313+
HashMap::new()
314+
};
315+
316+
Ok(Statement::CopyTo(CopyToStatement {
317+
source,
318+
target,
319+
options,
320+
}))
321+
}
322+
245323
/// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
246324
pub fn parse_create(&mut self) -> Result<Statement, ParserError> {
247325
if self.parser.parse_keyword(Keyword::EXTERNAL) {
@@ -457,7 +535,7 @@ impl<'a> DFParser<'a> {
457535
builder.table_partition_cols = Some(self.parse_partitions()?)
458536
} else if self.parser.parse_keyword(Keyword::OPTIONS) {
459537
ensure_not_set(&builder.options, "OPTIONS")?;
460-
builder.options = Some(self.parse_options()?);
538+
builder.options = Some(self.parse_string_options()?);
461539
} else {
462540
break;
463541
}
@@ -513,14 +591,40 @@ impl<'a> DFParser<'a> {
513591
}
514592
}
515593

516-
fn parse_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
517-
let mut options: HashMap<String, String> = HashMap::new();
594+
/// Parses (key value) style options, but values can only be literal strings
595+
/// TODO maybe change this to be real expressions rather than just strings
596+
/// the reason
597+
fn parse_string_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
598+
let mut options = HashMap::new();
518599
self.parser.expect_token(&Token::LParen)?;
519600

520601
loop {
521602
let key = self.parser.parse_literal_string()?;
522603
let value = self.parser.parse_literal_string()?;
523-
options.insert(key.to_string(), value.to_string());
604+
options.insert(key, value);
605+
let comma = self.parser.consume_token(&Token::Comma);
606+
if self.parser.consume_token(&Token::RParen) {
607+
// allow a trailing comma, even though it's not in standard
608+
break;
609+
} else if !comma {
610+
return self.expected(
611+
"',' or ')' after option definition",
612+
self.parser.peek_token(),
613+
);
614+
}
615+
}
616+
Ok(options)
617+
}
618+
619+
/// parses (foo bar) style options into a map of String --> [`Value`]
620+
fn parse_value_options(&mut self) -> Result<HashMap<String, Value>, ParserError> {
621+
let mut options = HashMap::new();
622+
self.parser.expect_token(&Token::LParen)?;
623+
624+
loop {
625+
let key = self.parser.parse_literal_string()?;
626+
let value = self.parser.parse_value()?;
627+
options.insert(key, value);
524628
let comma = self.parser.consume_token(&Token::Comma);
525629
if self.parser.consume_token(&Token::RParen) {
526630
// allow a trailing comma, even though it's not in standard
@@ -560,7 +664,7 @@ mod tests {
560664
1,
561665
"Expected to parse exactly one statement"
562666
);
563-
assert_eq!(statements[0], expected);
667+
assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]);
564668
Ok(())
565669
}
566670

@@ -980,4 +1084,60 @@ mod tests {
9801084

9811085
Ok(())
9821086
}
1087+
1088+
#[test]
1089+
fn copy_to_table_to_table() -> Result<(), ParserError> {
1090+
// positive case
1091+
let sql = "COPY foo TO bar";
1092+
let expected = Statement::CopyTo(CopyToStatement {
1093+
source: object_name("foo"),
1094+
target: "bar".to_string(),
1095+
options: HashMap::new(),
1096+
});
1097+
1098+
expect_parse_ok(sql, expected)?;
1099+
Ok(())
1100+
}
1101+
1102+
#[test]
1103+
fn copy_to_query_to_table() -> Result<(), ParserError> {
1104+
let mut statements = Parser::parse_sql(&GenericDialect {}, "select 1")?;
1105+
assert_eq!(statements.len(), 1);
1106+
let statement = statements.pop().unwrap();
1107+
let query = if let SQLStatement::Query(query) = statement {
1108+
*query
1109+
} else {
1110+
panic!("Expected query, got {statement:?}");
1111+
};
1112+
1113+
let sql = "COPY (select 1) TO bar";
1114+
let expected = Statement::CopyTo(CopyToStatement {
1115+
source: CopyToSource::Query(query),
1116+
target: "bar".to_string(),
1117+
options: HashMap::new(),
1118+
});
1119+
expect_parse_ok(sql, expected)?;
1120+
Ok(())
1121+
}
1122+
1123+
#[test]
1124+
fn copy_to_options() -> Result<(), ParserError> {
1125+
let sql = "COPY foo TO bar (row_group_size 55)";
1126+
let expected = Statement::CopyTo(CopyToStatement {
1127+
source: object_name("foo"),
1128+
target: "bar".to_string(),
1129+
options: HashMap::from([(
1130+
"row_group_size".to_string(),
1131+
Value::Number("55".to_string(), false),
1132+
)]),
1133+
});
1134+
expect_parse_ok(sql, expected)?;
1135+
Ok(())
1136+
}
1137+
1138+
// For error cases, see: `copy.slt`
1139+
1140+
fn object_name(name: &str) -> CopyToSource {
1141+
CopyToSource::Relation(ObjectName(vec![Ident::new(name)]))
1142+
}
9831143
}

datafusion/sql/src/relation/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use sqlparser::ast::TableFactor;
2323
mod join;
2424

2525
impl<'a, S: ContextProvider> SqlToRel<'a, S> {
26+
/// Create a `LogicalPlan` that scans the named relation
2627
fn create_relation(
2728
&self,
2829
relation: TableFactor,

0 commit comments

Comments
 (0)