Skip to content

Commit 0c5d72b

Browse files
authored
fix: avoid compressed json files repartitioning (#10470)
1 parent dbd0186 commit 0c5d72b

File tree

1 file changed

+52
-27
lines changed
  • datafusion/core/src/datasource/physical_plan

1 file changed

+52
-27
lines changed

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 52 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ impl ExecutionPlan for NdJsonExec {
154154
target_partitions: usize,
155155
config: &datafusion_common::config::ConfigOptions,
156156
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
157-
if self.file_compression_type == FileCompressionType::GZIP {
157+
if self.file_compression_type.is_compressed() {
158158
return Ok(None);
159159
}
160160
let repartition_file_min_size = config.optimizer.repartition_file_min_size;
@@ -383,7 +383,6 @@ mod tests {
383383
use std::path::Path;
384384

385385
use super::*;
386-
use crate::assert_batches_eq;
387386
use crate::dataframe::DataFrameWriteOptions;
388387
use crate::datasource::file_format::file_compression_type::FileTypeExt;
389388
use crate::datasource::file_format::{json::JsonFormat, FileFormat};
@@ -393,18 +392,15 @@ mod tests {
393392
CsvReadOptions, NdJsonReadOptions, SessionConfig, SessionContext,
394393
};
395394
use crate::test::partitioned_file_groups;
395+
use crate::{assert_batches_eq, assert_batches_sorted_eq};
396396

397397
use arrow::array::Array;
398398
use arrow::datatypes::{Field, SchemaBuilder};
399399
use datafusion_common::cast::{as_int32_array, as_int64_array, as_string_array};
400400
use datafusion_common::FileType;
401-
use flate2::write::GzEncoder;
402-
use flate2::Compression;
403401
use object_store::chunked::ChunkedStore;
404402
use object_store::local::LocalFileSystem;
405403
use rstest::*;
406-
use std::fs::File;
407-
use std::io;
408404
use tempfile::TempDir;
409405
use url::Url;
410406

@@ -892,36 +888,65 @@ mod tests {
892888

893889
Ok(())
894890
}
895-
fn compress_file(path: &str, output_path: &str) -> io::Result<()> {
896-
let input_file = File::open(path)?;
897-
let mut reader = BufReader::new(input_file);
898891

899-
let output_file = File::create(output_path)?;
900-
let writer = std::io::BufWriter::new(output_file);
901-
902-
let mut encoder = GzEncoder::new(writer, Compression::default());
903-
io::copy(&mut reader, &mut encoder)?;
904-
905-
encoder.finish()?;
906-
Ok(())
907-
}
892+
#[rstest(
893+
file_compression_type,
894+
case::uncompressed(FileCompressionType::UNCOMPRESSED),
895+
case::gzip(FileCompressionType::GZIP),
896+
case::bzip2(FileCompressionType::BZIP2),
897+
case::xz(FileCompressionType::XZ),
898+
case::zstd(FileCompressionType::ZSTD)
899+
)]
900+
#[cfg(feature = "compression")]
908901
#[tokio::test]
909-
async fn test_disable_parallel_for_json_gz() -> Result<()> {
902+
async fn test_json_with_repartitioing(
903+
file_compression_type: FileCompressionType,
904+
) -> Result<()> {
910905
let config = SessionConfig::new()
911906
.with_repartition_file_scans(true)
912907
.with_repartition_file_min_size(0)
913908
.with_target_partitions(4);
914909
let ctx = SessionContext::new_with_config(config);
915-
let path = format!("{TEST_DATA_BASE}/1.json");
916-
let compressed_path = format!("{}.gz", &path);
917-
compress_file(&path, &compressed_path)?;
910+
911+
let tmp_dir = TempDir::new()?;
912+
let (store_url, file_groups, _) =
913+
prepare_store(&ctx.state(), file_compression_type, tmp_dir.path()).await;
914+
915+
// It's important to have less than `target_partitions` amount of file groups, to
916+
// trigger repartitioning.
917+
assert_eq!(
918+
file_groups.len(),
919+
1,
920+
"Expected prepared store with single file group"
921+
);
922+
923+
let path = file_groups
924+
.first()
925+
.unwrap()
926+
.first()
927+
.unwrap()
928+
.object_meta
929+
.location
930+
.as_ref();
931+
932+
let url: &Url = store_url.as_ref();
933+
let path_buf = Path::new(url.path()).join(path);
934+
let path = path_buf.to_str().unwrap();
935+
let ext = FileType::JSON
936+
.get_ext_with_compression(file_compression_type.to_owned())
937+
.unwrap();
938+
918939
let read_option = NdJsonReadOptions::default()
919-
.file_compression_type(FileCompressionType::GZIP)
920-
.file_extension("gz");
921-
let df = ctx.read_json(compressed_path.clone(), read_option).await?;
940+
.file_compression_type(file_compression_type)
941+
.file_extension(ext.as_str());
942+
943+
let df = ctx.read_json(path, read_option).await?;
922944
let res = df.collect().await;
923-
fs::remove_file(&compressed_path)?;
924-
assert_batches_eq!(
945+
946+
// Output sort order is nondeterministic due to multiple
947+
// target partitions. To handle it, assert compares sorted
948+
// result.
949+
assert_batches_sorted_eq!(
925950
&[
926951
"+-----+------------------+---------------+------+",
927952
"| a | b | c | d |",

0 commit comments

Comments
 (0)