Skip to content

Add Sqllogictests for INSERT INTO external table #7294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions datafusion/core/src/datasource/listing/url.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::fs;

use crate::datasource::object_store::ObjectStoreUrl;
use datafusion_common::{DataFusionError, Result};
use futures::stream::BoxStream;
Expand Down Expand Up @@ -87,6 +89,32 @@ impl ListingTableUrl {
}
}

/// Get object store for specified input_url
/// if input_url is actually not a url, we assume it is a local file path
/// if we have a local path, create it if not exists so ListingTableUrl::parse works
pub fn parse_create_local_if_not_exists(
s: impl AsRef<str>,
is_directory: bool,
) -> Result<Self> {
let s = s.as_ref();
let is_valid_url = Url::parse(s).is_ok();

match is_valid_url {
true => ListingTableUrl::parse(s),
false => {
let path = std::path::PathBuf::from(s);
if !path.exists() {
if is_directory {
fs::create_dir_all(path)?;
} else {
fs::File::create(path)?;
}
}
ListingTableUrl::parse(s)
}
}
}

/// Creates a new [`ListingTableUrl`] interpreting `s` as a filesystem path
fn parse_path(s: &str) -> Result<Self> {
let (prefix, glob) = match split_glob_expression(s) {
Expand Down
31 changes: 30 additions & 1 deletion datafusion/core/src/datasource/listing_table_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,36 @@ impl TableProviderFactory for ListingTableFactory {
},
}?;

let create_local_path_mode = cmd
.options
.get("create_local_path")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be great to eventually get the Options into a structured form (#7283 (comment)) so we can have the compiler check values like this rather than relying on string matching. I have it on my list today to file a ticket about that

.map(|s| s.as_str())
.unwrap_or("false");
let single_file = cmd
.options
.get("single_file")
.map(|s| s.as_str())
.unwrap_or("false");

let single_file = match single_file {
"true" => Ok(true),
"false" => Ok(false),
_ => Err(DataFusionError::Plan(
"Invalid option single_file, must be 'true' or 'false'".into(),
)),
}?;

let table_path = match create_local_path_mode {
"true" => ListingTableUrl::parse_create_local_if_not_exists(
&cmd.location,
!single_file,
),
"false" => ListingTableUrl::parse(&cmd.location),
_ => Err(DataFusionError::Plan(
"Invalid option create_local_path, must be 'true' or 'false'".into(),
)),
}?;

let options = ListingOptions::new(file_format)
.with_collect_stat(state.config().collect_statistics())
.with_file_extension(file_extension)
Expand All @@ -154,7 +184,6 @@ impl TableProviderFactory for ListingTableFactory {
.with_file_sort_order(cmd.order_exprs.clone())
.with_insert_mode(insert_mode);

let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = match provided_schema {
None => options.infer_schema(state, &table_path).await?,
Some(s) => s,
Expand Down
24 changes: 1 addition & 23 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ use crate::logical_expr::{
};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_expr::dml::{CopyTo, OutputFileFormat};
use url::Url;

use crate::logical_expr::{Limit, Values};
use crate::physical_expr::create_physical_expr;
Expand Down Expand Up @@ -91,7 +90,6 @@ use itertools::{multiunzip, Itertools};
use log::{debug, trace};
use std::collections::HashMap;
use std::fmt::Write;
use std::fs;
use std::sync::Arc;

fn create_function_physical_name(
Expand Down Expand Up @@ -565,30 +563,10 @@ impl DefaultPhysicalPlanner {
}) => {
let input_exec = self.create_initial_plan(input, session_state).await?;

// Get object store for specified output_url
// if user did not pass in a url, we assume it is a local file path
// this requires some special handling as copy can create non
// existing file paths
let is_valid_url = Url::parse(output_url).is_ok();

// TODO: make this behavior configurable via options (should copy to create path/file as needed?)
// TODO: add additional configurable options for if existing files should be overwritten or
// appended to
let parsed_url = match is_valid_url {
true => ListingTableUrl::parse(output_url),
false => {
let path = std::path::PathBuf::from(output_url);
if !path.exists(){
if *per_thread_output{
fs::create_dir_all(path)?;
} else{
fs::File::create(path)?;
}
}
ListingTableUrl::parse(output_url)
}
}?;

let parsed_url = ListingTableUrl::parse_create_local_if_not_exists(output_url, *per_thread_output)?;
let object_store_url = parsed_url.object_store();

let schema: Schema = (**input.schema()).clone().into();
Expand Down
17 changes: 7 additions & 10 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1099,16 +1099,13 @@ impl LogicalPlan {
per_thread_output,
options,
}) => {
let mut op_str = String::new();
op_str.push('(');
for (key, val) in options {
if !op_str.is_empty() {
op_str.push(',');
}
op_str.push_str(&format!("{key} {val}"));
}
op_str.push(')');
write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: {op_str}")
let op_str = options
.iter()
.map(|(k, v)| format!("{k} {v}"))
.collect::<Vec<String>>()
.join(", ");

write!(f, "CopyTo: format={file_format} output_url={output_url} per_thread_output={per_thread_output} options: ({op_str})")
}
LogicalPlan::Ddl(ddl) => {
write!(f, "{}", ddl.display())
Expand Down
Loading