Skip to content

Commit a10ef0a

Browse files
committed
sqllogic tests rebased
1 parent 90484bb commit a10ef0a

File tree

29 files changed

+1098
-144
lines changed

29 files changed

+1098
-144
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,7 @@ datafusion/CHANGELOG.md.bak
103103
.githubchangeloggenerator.cache*
104104

105105
# Generated tpch data
106-
datafusion/core/tests/sqllogictests/test_files/tpch/data/*
106+
datafusion/sqllogictests/test_files/tpch/data/*
107+
108+
# Scratch temp dir for sqllogictests
109+
datafusion/sqllogictest/test_files/scratch*

datafusion/core/src/dataframe.rs

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use arrow::compute::{cast, concat};
2525
use arrow::datatypes::{DataType, Field};
2626
use async_trait::async_trait;
2727
use datafusion_common::{DataFusionError, SchemaError, UnnestOptions};
28+
use datafusion_expr::dml::OutputFileFormat;
2829
use parquet::file::properties::WriterProperties;
2930

3031
use datafusion_common::{Column, DFSchema, ScalarValue};
@@ -37,7 +38,6 @@ use crate::arrow::datatypes::Schema;
3738
use crate::arrow::datatypes::SchemaRef;
3839
use crate::arrow::record_batch::RecordBatch;
3940
use crate::arrow::util::pretty;
40-
use crate::datasource::physical_plan::{plan_to_csv, plan_to_json, plan_to_parquet};
4141
use crate::datasource::{provider_as_source, MemTable, TableProvider};
4242
use crate::error::Result;
4343
use crate::execution::{
@@ -992,28 +992,55 @@ impl DataFrame {
992992
}
993993

994994
/// Write a `DataFrame` to a CSV file.
995-
pub async fn write_csv(self, path: &str) -> Result<()> {
996-
let plan = self.session_state.create_physical_plan(&self.plan).await?;
997-
let task_ctx = Arc::new(self.task_ctx());
998-
plan_to_csv(task_ctx, plan, path).await
995+
pub async fn write_csv(
996+
self,
997+
path: &str,
998+
) -> Result<Vec<RecordBatch>, DataFusionError> {
999+
let plan = LogicalPlanBuilder::copy_to(
1000+
self.plan,
1001+
path.into(),
1002+
OutputFileFormat::CSV,
1003+
true,
1004+
// TODO implement options
1005+
vec![],
1006+
)?
1007+
.build()?;
1008+
DataFrame::new(self.session_state, plan).collect().await
9991009
}
10001010

10011011
/// Write a `DataFrame` to a Parquet file.
10021012
pub async fn write_parquet(
10031013
self,
10041014
path: &str,
1005-
writer_properties: Option<WriterProperties>,
1006-
) -> Result<()> {
1007-
let plan = self.session_state.create_physical_plan(&self.plan).await?;
1008-
let task_ctx = Arc::new(self.task_ctx());
1009-
plan_to_parquet(task_ctx, plan, path, writer_properties).await
1015+
_writer_properties: Option<WriterProperties>,
1016+
) -> Result<Vec<RecordBatch>, DataFusionError> {
1017+
let plan = LogicalPlanBuilder::copy_to(
1018+
self.plan,
1019+
path.into(),
1020+
OutputFileFormat::PARQUET,
1021+
true,
1022+
// TODO implement options
1023+
vec![],
1024+
)?
1025+
.build()?;
1026+
DataFrame::new(self.session_state, plan).collect().await
10101027
}
10111028

10121029
/// Executes a query and writes the results to a partitioned JSON file.
1013-
pub async fn write_json(self, path: impl AsRef<str>) -> Result<()> {
1014-
let plan = self.session_state.create_physical_plan(&self.plan).await?;
1015-
let task_ctx = Arc::new(self.task_ctx());
1016-
plan_to_json(task_ctx, plan, path).await
1030+
pub async fn write_json(
1031+
self,
1032+
path: &str,
1033+
) -> Result<Vec<RecordBatch>, DataFusionError> {
1034+
let plan = LogicalPlanBuilder::copy_to(
1035+
self.plan,
1036+
path.into(),
1037+
OutputFileFormat::JSON,
1038+
true,
1039+
// TODO implement options
1040+
vec![],
1041+
)?
1042+
.build()?;
1043+
DataFrame::new(self.session_state, plan).collect().await
10171044
}
10181045

10191046
/// Add an additional column to the DataFrame.

datafusion/core/src/datasource/file_format/csv.rs

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::datasource::physical_plan::{
4747
};
4848
use crate::error::Result;
4949
use crate::execution::context::SessionState;
50-
use crate::physical_plan::insert::{DataSink, InsertExec};
50+
use crate::physical_plan::insert::{DataSink, FileSinkExec};
5151
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
5252
use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
5353
use rand::distributions::{Alphanumeric, DistString};
@@ -277,6 +277,7 @@ impl FileFormat for CsvFormat {
277277
"Inserting compressed CSV is not implemented yet.".into(),
278278
));
279279
}
280+
280281
let sink_schema = conf.output_schema().clone();
281282
let sink = Arc::new(CsvSink::new(
282283
conf,
@@ -285,7 +286,7 @@ impl FileFormat for CsvFormat {
285286
self.file_compression_type,
286287
));
287288

288-
Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
289+
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
289290
}
290291
}
291292

@@ -505,12 +506,14 @@ impl DataSink for CsvSink {
505506
let object_store = context
506507
.runtime_env()
507508
.object_store(&self.config.object_store_url)?;
508-
509509
// Construct serializer and writer for each file group
510510
let mut serializers: Vec<Box<dyn BatchSerializer>> = vec![];
511511
let mut writers = vec![];
512512
match self.config.writer_mode {
513513
FileWriterMode::Append => {
514+
if !self.config.per_thread_output {
515+
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for CsvSink in Append mode".into()));
516+
}
514517
for file_group in &self.config.file_groups {
515518
// In append mode, consider has_header flag only when file is empty (at the start).
516519
// For other modes, use has_header flag as is.
@@ -542,38 +545,72 @@ impl DataSink for CsvSink {
542545
FileWriterMode::PutMultipart => {
543546
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
544547
let base_path = &self.config.table_paths[0];
545-
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
546-
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
547-
for part_idx in 0..num_partitions {
548-
let header = self.has_header;
549-
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
550-
let serializer = CsvSerializer::new()
551-
.with_builder(builder)
552-
.with_header(header);
553-
let file_path = base_path
554-
.prefix()
555-
.child(format!("/{}_{}.csv", write_id, part_idx));
556-
let object_meta = ObjectMeta {
557-
location: file_path,
558-
last_modified: chrono::offset::Utc::now(),
559-
size: 0,
560-
e_tag: None,
561-
};
562-
let writer = create_writer(
563-
self.config.writer_mode,
564-
self.file_compression_type,
565-
object_meta.into(),
566-
object_store.clone(),
567-
)
568-
.await?;
569-
570-
serializers.push(Box::new(serializer));
571-
writers.push(writer);
548+
match self.config.per_thread_output {
549+
true => {
550+
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
551+
let write_id =
552+
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
553+
for part_idx in 0..num_partitions {
554+
let header = self.has_header;
555+
let builder =
556+
WriterBuilder::new().with_delimiter(self.delimiter);
557+
let serializer = CsvSerializer::new()
558+
.with_builder(builder)
559+
.with_header(header);
560+
serializers.push(Box::new(serializer));
561+
let file_path = base_path
562+
.prefix()
563+
.child(format!("{}_{}.csv", write_id, part_idx));
564+
let object_meta = ObjectMeta {
565+
location: file_path,
566+
last_modified: chrono::offset::Utc::now(),
567+
size: 0,
568+
e_tag: None,
569+
};
570+
let writer = create_writer(
571+
self.config.writer_mode,
572+
self.file_compression_type,
573+
object_meta.into(),
574+
object_store.clone(),
575+
)
576+
.await?;
577+
writers.push(writer);
578+
}
579+
}
580+
false => {
581+
let header = self.has_header;
582+
let builder = WriterBuilder::new().with_delimiter(self.delimiter);
583+
let serializer = CsvSerializer::new()
584+
.with_builder(builder)
585+
.with_header(header);
586+
serializers.push(Box::new(serializer));
587+
let file_path = base_path.prefix();
588+
let object_meta = ObjectMeta {
589+
location: file_path.clone(),
590+
last_modified: chrono::offset::Utc::now(),
591+
size: 0,
592+
e_tag: None,
593+
};
594+
let writer = create_writer(
595+
self.config.writer_mode,
596+
self.file_compression_type,
597+
object_meta.into(),
598+
object_store.clone(),
599+
)
600+
.await?;
601+
writers.push(writer);
602+
}
572603
}
573604
}
574605
}
575606

576-
stateless_serialize_and_write_files(data, serializers, writers).await
607+
stateless_serialize_and_write_files(
608+
data,
609+
serializers,
610+
writers,
611+
self.config.per_thread_output,
612+
)
613+
.await
577614
}
578615
}
579616

datafusion/core/src/datasource/file_format/json.rs

Lines changed: 58 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use object_store::{GetResult, ObjectMeta, ObjectStore};
4343

4444
use crate::datasource::physical_plan::FileGroupDisplay;
4545
use crate::physical_plan::insert::DataSink;
46-
use crate::physical_plan::insert::InsertExec;
46+
use crate::physical_plan::insert::FileSinkExec;
4747
use crate::physical_plan::SendableRecordBatchStream;
4848
use crate::physical_plan::{DisplayAs, DisplayFormatType, Statistics};
4949

@@ -187,7 +187,7 @@ impl FileFormat for JsonFormat {
187187
let sink_schema = conf.output_schema().clone();
188188
let sink = Arc::new(JsonSink::new(conf, self.file_compression_type));
189189

190-
Ok(Arc::new(InsertExec::new(input, sink, sink_schema)) as _)
190+
Ok(Arc::new(FileSinkExec::new(input, sink, sink_schema)) as _)
191191
}
192192
}
193193

@@ -280,6 +280,9 @@ impl DataSink for JsonSink {
280280
let mut writers = vec![];
281281
match self.config.writer_mode {
282282
FileWriterMode::Append => {
283+
if !self.config.per_thread_output {
284+
return Err(DataFusionError::NotImplemented("per_thread_output=false is not implemented for JsonSink in Append mode".into()));
285+
}
283286
for file_group in &self.config.file_groups {
284287
let serializer = JsonSerializer::new();
285288
serializers.push(Box::new(serializer));
@@ -303,33 +306,63 @@ impl DataSink for JsonSink {
303306
FileWriterMode::PutMultipart => {
304307
// Currently assuming only 1 partition path (i.e. not hive-style partitioning on a column)
305308
let base_path = &self.config.table_paths[0];
306-
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
307-
let write_id = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
308-
for part_idx in 0..num_partitions {
309-
let serializer = JsonSerializer::new();
310-
serializers.push(Box::new(serializer));
311-
let file_path = base_path
312-
.prefix()
313-
.child(format!("/{}_{}.json", write_id, part_idx));
314-
let object_meta = ObjectMeta {
315-
location: file_path,
316-
last_modified: chrono::offset::Utc::now(),
317-
size: 0,
318-
e_tag: None,
319-
};
320-
let writer = create_writer(
321-
self.config.writer_mode,
322-
self.file_compression_type,
323-
object_meta.into(),
324-
object_store.clone(),
325-
)
326-
.await?;
327-
writers.push(writer);
309+
match self.config.per_thread_output {
310+
true => {
311+
// Uniquely identify this batch of files with a random string, to prevent collisions overwriting files
312+
let write_id =
313+
Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
314+
for part_idx in 0..num_partitions {
315+
let serializer = JsonSerializer::new();
316+
serializers.push(Box::new(serializer));
317+
let file_path = base_path
318+
.prefix()
319+
.child(format!("{}_{}.json", write_id, part_idx));
320+
let object_meta = ObjectMeta {
321+
location: file_path,
322+
last_modified: chrono::offset::Utc::now(),
323+
size: 0,
324+
e_tag: None,
325+
};
326+
let writer = create_writer(
327+
self.config.writer_mode,
328+
self.file_compression_type,
329+
object_meta.into(),
330+
object_store.clone(),
331+
)
332+
.await?;
333+
writers.push(writer);
334+
}
335+
}
336+
false => {
337+
let serializer = JsonSerializer::new();
338+
serializers.push(Box::new(serializer));
339+
let file_path = base_path.prefix();
340+
let object_meta = ObjectMeta {
341+
location: file_path.clone(),
342+
last_modified: chrono::offset::Utc::now(),
343+
size: 0,
344+
e_tag: None,
345+
};
346+
let writer = create_writer(
347+
self.config.writer_mode,
348+
self.file_compression_type,
349+
object_meta.into(),
350+
object_store.clone(),
351+
)
352+
.await?;
353+
writers.push(writer);
354+
}
328355
}
329356
}
330357
}
331358

332-
stateless_serialize_and_write_files(data, serializers, writers).await
359+
stateless_serialize_and_write_files(
360+
data,
361+
serializers,
362+
writers,
363+
self.config.per_thread_output,
364+
)
365+
.await
333366
}
334367
}
335368

0 commit comments

Comments
 (0)