Skip to content

Commit 18a03bd

Browse files
devinjdangeloalamb
andauthored
Support Configuring Arrow RecordBatch Writers via SQL Statement Options (#7390)
* squash, merge main * cargo fmt * try to fix pyarrow compile failure * Apply suggestions from code review Co-authored-by: Andrew Lamb <[email protected]> * review comments + add test cases * update Display for FileTypeWriterOptions * add unit tests --------- Co-authored-by: Andrew Lamb <[email protected]>
1 parent 7338863 commit 18a03bd

32 files changed

+1783
-417
lines changed

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ path = "src/lib.rs"
3535
[features]
3636
avro = ["apache-avro"]
3737
compression = ["xz2", "bzip2", "flate2", "zstd", "async-compression"]
38-
default = ["compression"]
38+
default = ["compression", "parquet"]
3939
pyarrow = ["pyo3", "arrow/pyarrow"]
4040

4141
[dependencies]

datafusion/common/src/error.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,9 @@ pub enum DataFusionError {
7373
/// This error happens whenever a plan is not valid. Examples include
7474
/// impossible casts.
7575
Plan(String),
76+
/// This error happens when an invalid or unsupported option is passed
77+
/// in a SQL statement
78+
Configuration(String),
7679
/// This error happens with schema-related errors, such as schema inference not possible
7780
/// and non-unique column names.
7881
SchemaError(SchemaError),
@@ -288,6 +291,9 @@ impl Display for DataFusionError {
288291
DataFusionError::SQL(ref desc) => {
289292
write!(f, "SQL error: {desc:?}")
290293
}
294+
DataFusionError::Configuration(ref desc) => {
295+
write!(f, "Invalid or Unsupported Configuration: {desc}")
296+
}
291297
DataFusionError::NotImplemented(ref desc) => {
292298
write!(f, "This feature is not implemented: {desc}")
293299
}
@@ -338,6 +344,7 @@ impl Error for DataFusionError {
338344
DataFusionError::SQL(e) => Some(e),
339345
DataFusionError::NotImplemented(_) => None,
340346
DataFusionError::Internal(_) => None,
347+
DataFusionError::Configuration(_) => None,
341348
DataFusionError::Plan(_) => None,
342349
DataFusionError::SchemaError(e) => Some(e),
343350
DataFusionError::Execution(_) => None,
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Options related to how Arrow files should be written
19+
20+
use crate::{
21+
config::ConfigOptions,
22+
error::{DataFusionError, Result},
23+
};
24+
25+
use super::StatementOptions;
26+
27+
#[derive(Clone, Debug)]
28+
pub struct ArrowWriterOptions {}
29+
30+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for ArrowWriterOptions {
31+
type Error = DataFusionError;
32+
33+
fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
34+
Ok(ArrowWriterOptions {})
35+
}
36+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Options related to how avro files should be written
19+
20+
use crate::{
21+
config::ConfigOptions,
22+
error::{DataFusionError, Result},
23+
};
24+
25+
use super::StatementOptions;
26+
27+
#[derive(Clone, Debug)]
28+
pub struct AvroWriterOptions {}
29+
30+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for AvroWriterOptions {
31+
type Error = DataFusionError;
32+
33+
fn try_from(_value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
34+
Ok(AvroWriterOptions {})
35+
}
36+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Options related to how csv files should be written
19+
20+
use std::str::FromStr;
21+
22+
use arrow::csv::WriterBuilder;
23+
24+
use crate::{
25+
config::ConfigOptions,
26+
error::{DataFusionError, Result},
27+
parsers::CompressionTypeVariant,
28+
};
29+
30+
use super::StatementOptions;
31+
32+
/// Options for writing CSV files
33+
#[derive(Clone, Debug)]
34+
pub struct CsvWriterOptions {
35+
/// Struct from the arrow crate which contains all csv writing related settings
36+
pub writer_options: WriterBuilder,
37+
/// Compression to apply after ArrowWriter serializes RecordBatches.
38+
/// This compression is applied by DataFusion not the ArrowWriter itself.
39+
pub compression: CompressionTypeVariant,
40+
/// Indicates whether WriterBuilder.has_header() is set to true.
41+
/// This is duplicative as WriterBuilder also stores this information.
42+
/// However, WriterBuilder does not allow public read access to the
43+
/// has_header parameter.
44+
pub has_header: bool,
45+
// TODO: expose a way to read has_header in arrow create
46+
// https://github.com/apache/arrow-rs/issues/4735
47+
}
48+
49+
impl TryFrom<(&ConfigOptions, &StatementOptions)> for CsvWriterOptions {
50+
type Error = DataFusionError;
51+
52+
fn try_from(value: (&ConfigOptions, &StatementOptions)) -> Result<Self> {
53+
let _configs = value.0;
54+
let statement_options = value.1;
55+
let mut has_header = true;
56+
let mut builder = WriterBuilder::default();
57+
let mut compression = CompressionTypeVariant::UNCOMPRESSED;
58+
for (option, value) in &statement_options.options {
59+
builder = match option.to_lowercase().as_str(){
60+
"header" => {
61+
has_header = value.parse()
62+
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
63+
builder.has_headers(has_header)
64+
},
65+
"date_format" => builder.with_date_format(value.to_owned()),
66+
"datetime_format" => builder.with_datetime_format(value.to_owned()),
67+
"timestamp_format" => builder.with_timestamp_format(value.to_owned()),
68+
"time_format" => builder.with_time_format(value.to_owned()),
69+
"rfc3339" => {
70+
let value_bool = value.parse()
71+
.map_err(|_| DataFusionError::Configuration(format!("Unable to parse {value} as bool as required for {option}!")))?;
72+
if value_bool{
73+
builder.with_rfc3339()
74+
} else{
75+
builder
76+
}
77+
},
78+
"null_value" => builder.with_null(value.to_owned()),
79+
"compression" => {
80+
compression = CompressionTypeVariant::from_str(value.replace('\'', "").as_str())?;
81+
builder
82+
},
83+
"delimeter" => {
84+
// Ignore string literal single quotes passed from sql parsing
85+
let value = value.replace('\'', "");
86+
let chars: Vec<char> = value.chars().collect();
87+
if chars.len()>1{
88+
return Err(DataFusionError::Configuration(format!(
89+
"CSV Delimeter Option must be a single char, got: {}", value
90+
)))
91+
}
92+
builder.with_delimiter(chars[0].try_into().map_err(|_| {
93+
DataFusionError::Internal(
94+
"Unable to convert CSV delimiter into u8".into(),
95+
)
96+
})?)
97+
},
98+
_ => return Err(DataFusionError::Configuration(format!("Found unsupported option {option} with value {value} for CSV format!")))
99+
}
100+
}
101+
Ok(CsvWriterOptions {
102+
has_header,
103+
writer_options: builder,
104+
compression,
105+
})
106+
}
107+
}

0 commit comments

Comments
 (0)