Skip to content

Commit 6a775b7

Browse files
Add Sqllogictests for INSERT INTO external table (#7294)
* insert sqllogictests rebase * remove commented line
1 parent 354d4ff commit 6a775b7

File tree

5 files changed

+374
-34
lines changed

5 files changed

+374
-34
lines changed

datafusion/core/src/datasource/listing/url.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::fs;
19+
1820
use crate::datasource::object_store::ObjectStoreUrl;
1921
use datafusion_common::{DataFusionError, Result};
2022
use futures::stream::BoxStream;
@@ -87,6 +89,32 @@ impl ListingTableUrl {
8789
}
8890
}
8991

92+
/// Get object store for specified input_url
93+
/// if input_url is actually not a url, we assume it is a local file path
94+
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
95+
pub fn parse_create_local_if_not_exists(
96+
s: impl AsRef<str>,
97+
is_directory: bool,
98+
) -> Result<Self> {
99+
let s = s.as_ref();
100+
let is_valid_url = Url::parse(s).is_ok();
101+
102+
match is_valid_url {
103+
true => ListingTableUrl::parse(s),
104+
false => {
105+
let path = std::path::PathBuf::from(s);
106+
if !path.exists() {
107+
if is_directory {
108+
fs::create_dir_all(path)?;
109+
} else {
110+
fs::File::create(path)?;
111+
}
112+
}
113+
ListingTableUrl::parse(s)
114+
}
115+
}
116+
}
117+
90118
/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
91119
fn parse_path(s: &str) -> Result<Self> {
92120
let (prefix, glob) = match split_glob_expression(s) {

datafusion/core/src/datasource/listing_table_factory.rs

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,36 @@ impl TableProviderFactory for ListingTableFactory {
145145
},
146146
}?;
147147

148+
let create_local_path_mode = cmd
149+
.options
150+
.get("create_local_path")
151+
.map(|s| s.as_str())
152+
.unwrap_or("false");
153+
let single_file = cmd
154+
.options
155+
.get("single_file")
156+
.map(|s| s.as_str())
157+
.unwrap_or("false");
158+
159+
let single_file = match single_file {
160+
"true" => Ok(true),
161+
"false" => Ok(false),
162+
_ => Err(DataFusionError::Plan(
163+
"Invalid option single_file, must be 'true' or 'false'".into(),
164+
)),
165+
}?;
166+
167+
let table_path = match create_local_path_mode {
168+
"true" => ListingTableUrl::parse_create_local_if_not_exists(
169+
&cmd.location,
170+
!single_file,
171+
),
172+
"false" => ListingTableUrl::parse(&cmd.location),
173+
_ => Err(DataFusionError::Plan(
174+
"Invalid option create_local_path, must be 'true' or 'false'".into(),
175+
)),
176+
}?;
177+
148178
let options = ListingOptions::new(file_format)
149179
.with_collect_stat(state.config().collect_statistics())
150180
.with_file_extension(file_extension)
@@ -154,7 +184,6 @@ impl TableProviderFactory for ListingTableFactory {
154184
.with_file_sort_order(cmd.order_exprs.clone())
155185
.with_insert_mode(insert_mode);
156186

157-
let table_path = ListingTableUrl::parse(&cmd.location)?;
158187
let resolved_schema = match provided_schema {
159188
None => options.infer_schema(state, &table_path).await?,
160189
Some(s) => s,

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ use crate::logical_expr::{
3939
};
4040
use datafusion_common::display::ToStringifiedPlan;
4141
use datafusion_expr::dml::{CopyTo, OutputFileFormat};
42-
use url::Url;
4342

4443
use crate::logical_expr::{Limit, Values};
4544
use crate::physical_expr::create_physical_expr;
@@ -91,7 +90,6 @@ use itertools::{multiunzip, Itertools};
9190
use log::{debug, trace};
9291
use std::collections::HashMap;
9392
use std::fmt::Write;
94-
use std::fs;
9593
use std::sync::Arc;
9694

9795
fn create_function_physical_name(
@@ -565,30 +563,10 @@ impl DefaultPhysicalPlanner {
565563
}) => {
566564
let input_exec = self.create_initial_plan(input, session_state).await?;
567565

568-
// Get object store for specified output_url
569-
// if user did not pass in a url, we assume it is a local file path
570-
// this requires some special handling as copy can create non
571-
// existing file paths
572-
let is_valid_url = Url::parse(output_url).is_ok();
573-
574566
// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
575567
// TODO: add additional configurable options for if existing files should be overwritten or
576568
// appended to
577-
let parsed_url = match is_valid_url {
578-
true => ListingTableUrl::parse(output_url),
579-
false => {
580-
let path = std::path::PathBuf::from(output_url);
581-
if !path.exists(){
582-
if *per_thread_output{
583-
fs::create_dir_all(path)?;
584-
} else{
585-
fs::File::create(path)?;
586-
}
587-
}
588-
ListingTableUrl::parse(output_url)
589-
}
590-
}?;
591-
569+
let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, *per_thread_output)?;
592570
let object_store_url = parsed_url.object_store();
593571

594572
let schema: Schema = (**input.schema()).clone().into();

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,16 +1099,13 @@ impl LogicalPlan {
10991099
per_thread_output,
11001100
options,
11011101
}) => {
1102-
let mut op_str = String::new();
1103-
op_str.push('(');
1104-
for (key, val) in options {
1105-
if !op_str.is_empty() {
1106-
op_str.push(',');
1107-
}
1108-
op_str.push_str(&format!("{key} {val}"));
1109-
}
1110-
op_str.push(')');
1111-
write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: {op_str}")
1102+
let op_str = options
1103+
.iter()
1104+
.map(|(k, v)| format!("{k} {v}"))
1105+
.collect::<Vec<String>>()
1106+
.join(", ");
1107+
1108+
write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: ({op_str})")
11121109
}
11131110
LogicalPlan::Ddl(ddl) => {
11141111
write!(f, "{}", ddl.display())

0 commit comments

Comments
 (0)