Skip to content

Increase batch size #8388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ Example output:

```
Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data",
batch_size: 8192, scale_factor: 1.0 }
batch_size: 32768, scale_factor: 1.0 }
Generated test dataset with 10699521 rows
Executing with filter 'request_method = Utf8("GET")'
Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use structopt::StructOpt;
///
/// Example output:
///
/// Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 8192, scale_factor: 1.0 }
/// Running benchmarks with the following options: Opt { debug: false, iterations: 3, partitions: 2, path: "./data", batch_size: 32768, scale_factor: 1.0 }
/// Generated test dataset with 10699521 rows
/// Executing with filter 'request_method = Utf8("GET")'
/// Using scan options ParquetScanOptions { pushdown_filters: false, reorder_predicates: false, enable_page_index: false }
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/tpch/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub struct ConvertOpt {
partitions: usize,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
#[structopt(short = "s", long = "batch-size", default_value = "32768")]
batch_size: usize,
}

Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: 32768,
debug: false,
};
let opt = RunOpt {
Expand Down Expand Up @@ -358,7 +358,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: 32768,
debug: false,
};
let opt = RunOpt {
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub struct CommonOpt {
pub partitions: Option<usize>,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
#[structopt(short = "s", long = "batch-size", default_value = "65536")]
pub batch_size: usize,

/// Activate debug mode to see more details
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/csv_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() -> Result<()> {
let schema = aggr_test_schema();

let config = CsvConfig::new(
8192,
32768,
schema.clone(),
Some(vec![12, 0]),
true,
Expand Down
2 changes: 1 addition & 1 deletion datafusion-examples/examples/json_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<()> {
let projected = Arc::new(schema.clone().project(&[1, 0])?);

let opener = JsonOpener::new(
8192,
32768,
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ config_namespace! {
/// Default batch size while creating new batches, it's especially useful for
/// buffer-in-memory batches since creating tiny batches would result in too much
/// metadata memory consumption
pub batch_size: usize, default = 8192
pub batch_size: usize, default = 32768

/// When set to true, record batches will be examined between each operator and
/// small batches will be coalesced into larger batches. This is helpful when there
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/benches/distinct_query_sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ fn create_context(
fn criterion_benchmark_limited_distinct(c: &mut Criterion) {
let partitions_len = 10;
let array_len = 1 << 26; // 64 M
let batch_size = 8192;
let batch_size = 32768;
let ctx = create_context(partitions_len, array_len, batch_size).unwrap();

let mut group = c.benchmark_group("custom-measurement-time");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,15 +474,15 @@ mod tests {
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -511,19 +511,19 @@ mod tests {
let expected_input = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
Expand All @@ -546,15 +546,15 @@ mod tests {

let expected_input = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
" CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
];
let expected_optimized = [
"CoalescePartitionsExec",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -583,7 +583,7 @@ mod tests {
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortExec: expr=[a@0 ASC NULLS LAST]",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand All @@ -592,7 +592,7 @@ mod tests {
let expected_optimized = [
"SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" FilterExec: c@1 > 3",
" SortPreservingRepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, sort_exprs=a@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
Expand Down Expand Up @@ -887,7 +887,7 @@ mod tests {
}

fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Arc::new(CoalesceBatchesExec::new(input, 8192))
Arc::new(CoalesceBatchesExec::new(input, 32768))
}

fn coalesce_partitions_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/config_from_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ fn from_env() {

env::remove_var(env_key);
let config = ConfigOptions::from_env().unwrap();
assert_eq!(config.execution.batch_size, 8192); // set to its default value
assert_eq!(config.execution.batch_size, 32768); // set to its default value
}
8 changes: 4 additions & 4 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ async fn join_change_in_planner() -> Result<()> {
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
Expand Down Expand Up @@ -181,11 +181,11 @@ async fn join_change_in_planner_without_sort() -> Result<()> {
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/sql/order.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,15 @@ ORDER BY 1, 2;
" InterleaveExec",
" ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], aggr=[]",
" ProjectionExec: expr=[column1@0 as t]",
" ValuesExec",
" ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]",
" AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 as t], aggr=[]",
" CoalesceBatchesExec: target_batch_size=8192",
" CoalesceBatchesExec: target_batch_size=32768",
" RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), input_partitions=2",
" RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1",
" AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], aggr=[]",
Expand Down
2 changes: 1 addition & 1 deletion datafusion/physical-plan/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a> DisplayableExecutionPlan<'a> {
///
/// ```text
/// ProjectionExec: expr=[a]
/// CoalesceBatchesExec: target_batch_size=8192
/// CoalesceBatchesExec: target_batch_size=32768
/// FilterExec: a < 5
/// RepartitionExec: partitioning=RoundRobinBatch(16)
/// CsvExec: source=...",
Expand Down
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ mod tests {

let result = collect(sort_exec.clone(), task_ctx.clone()).await?;

assert_eq!(result.len(), 2);
assert_eq!(result.len(), 1);

// Now, validate metrics
let metrics = sort_exec.metrics().unwrap();
Expand All @@ -1020,7 +1020,7 @@ mod tests {

let i = as_primitive_array::<Int32Type>(&columns[0])?;
assert_eq!(i.value(0), 0);
assert_eq!(i.value(i.len() - 1), 81);
assert_eq!(i.value(i.len() - 1), 99);

assert_eq!(
task_ctx.runtime_env().memory_pool.reserved(),
Expand Down
14 changes: 7 additions & 7 deletions datafusion/sqllogictest/test_files/aggregate.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2486,7 +2486,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand Down Expand Up @@ -2541,7 +2541,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)], lim=[4]
Expand All @@ -2560,7 +2560,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MIN(traces.timestamp)@1 DESC], fetch=4
----SortExec: TopK(fetch=4), expr=[MIN(traces.timestamp)@1 DESC]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MIN(traces.timestamp)]
Expand All @@ -2579,7 +2579,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [MAX(traces.timestamp)@1 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[MAX(traces.timestamp)@1 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand All @@ -2598,7 +2598,7 @@ GlobalLimitExec: skip=0, fetch=4
--SortPreservingMergeExec: [trace_id@0 ASC NULLS LAST], fetch=4
----SortExec: TopK(fetch=4), expr=[trace_id@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([trace_id@0], 4), input_partitions=4
------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
--------------AggregateExec: mode=Partial, gby=[trace_id@0 as trace_id], aggr=[MAX(traces.timestamp)]
Expand Down Expand Up @@ -2733,7 +2733,7 @@ GlobalLimitExec: skip=0, fetch=4
------------AggregateExec: mode=Final, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
--------------CoalescePartitionsExec
----------------AggregateExec: mode=Partial, gby=[c2@0 as c2, c3@1 as c3], aggr=[]
------------------CoalesceBatchesExec: target_batch_size=8192
------------------CoalesceBatchesExec: target_batch_size=32768
--------------------FilterExec: c3@1 >= 10 AND c3@1 <= 20
----------------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c2, c3], has_header=true
Expand Down Expand Up @@ -3066,7 +3066,7 @@ from aggregate_test_100;
0.051534002628 0.48427355347 100 0.001929150558 0.479274948239 0.508972509913 6.707779292571 9.234223721582 0.345678715695

statement ok
set datafusion.execution.batch_size = 8192;
set datafusion.execution.batch_size = 32768;



Expand Down
2 changes: 1 addition & 1 deletion datafusion/sqllogictest/test_files/distinct_on.slt
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_tes
--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
----SortExec: expr=[c1@0 ASC NULLS LAST]
------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
--------CoalesceBatchesExec: target_batch_size=8192
--------CoalesceBatchesExec: target_batch_size=32768
----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)], ordering_mode=Sorted
--------------SortExec: expr=[c1@0 ASC NULLS LAST,c3@2 ASC NULLS LAST]
Expand Down
Loading