Skip to content

Conversation

gabotechs
Copy link
Collaborator

@gabotechs gabotechs commented Sep 21, 2025

This PR builds on top of previous concepts for supporting more distribution patterns and addressing some potential improvements suggested by some existing issues:

Several new improvements have been introduced for accomplishing this:

Leverage all physical threads for all workers involved in a query

Previously, the partitions the default DataFusion planner decided to place where spread across different tasks. This leads to situations where workers handle too few partitions, less than the amount of cores the worker has.

Previously

┌───────────────────────────┐  ┌───────────────────────────┐ ┌───────────────────────────┐     ■
│    NetworkShuffleExec     │  │    NetworkShuffleExec     │ │    NetworkShuffleExec     │     │
│         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
└┬─┬────────────────────────┘  └───┬─┬─────────────────────┘ └──┬─┬──────────────────────┘  Stage N+1
 │1│                               │2│                          │3│                            │
 └─┘                               └─┘                          └─┘                            │
  ▲                                 ▲                            ▲                             ■
  └──────────────────────────────┐  │  ┌─────────────────────────┘
                                 │  │  │                                                       ■
                                ┌─┐┌─┐┌─┐                                                      │
                                │1││2││3│                                                      │
                               ┌┴─┴┴─┴┴─┴──────────────────┐                                Stage N
                               │      RepartitionExec      │                                   │
                               │         (task 1)          │                                   │
                               └───────────────────────────┘                                   ■

As DataFusion decides the amount of partitions based on the available parallelism (~number of cores in a machine), a good opportunity for using all the power in each worker is to artificially bump the amount of partitions in a RepartitionExec so that all tasks in a stage handle as many partitions as number of cores they have:

Now

┌───────────────────────────┐  ┌───────────────────────────┐ ┌───────────────────────────┐     ■
│    NetworkShuffleExec     │  │    NetworkShuffleExec     │ │    NetworkShuffleExec     │     │
│         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
└┬─┬┬─┬┬─┬──────────────────┘  └─────────┬─┬┬─┬┬─┬─────────┘ └──────────────────┬─┬┬─┬┬─┬┘  Stage N+1
 │1││2││3│                               │4││5││6│                              │7││8││9│      │
 └─┘└─┘└─┘                               └─┘└─┘└─┘                              └─┘└─┘└─┘      │
  ▲  ▲  ▲                                 ▲  ▲  ▲                                ▲  ▲  ▲       ■
  └──┴──┴────────────────────────┬──┬──┐  │  │  │  ┌──┬──┬───────────────────────┴──┴──┘
                                 │  │  │  │  │  │  │  │  │                                     ■
                                ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐                                    │
                                │1││2││3││4││5││6││7││8││9│                                    │
                               ┌┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┐                                Stage N
                               │      RepartitionExec      │                                   │
                               │         (task 1)          │                                   │
                               └───────────────────────────┘                                   ■

Support for task coalescing distribution pattern

Previously, we could only do shuffles. A shuffle implies repartitioning the data and fanning it out to different tasks, therefore, during distributed planning, the only thing we could do is to enhance RepartitionExec nodes with ArrowFlightReadExec nodes.

ArrowFlightReadExec was previously treated as a shuffle, but as we have a new pattern for communicating stages (task coalescing), the ArrowFlightReadExec node has been split into two different nodes:

NetworkShuffleExec

It is the equivalent to a RepartitionExec, but sending data across the network.

Pretty much the same as the previous ArrowFlightReadExec. It allows shuffling data across stages through an Arrow Flight interface. In this pattern, all tasks in a stage gather data from all tasks in the child stage:

                   ┌───────────────────────────┐  ┌───────────────────────────┐                ■
                   │    NetworkShuffleExec     │  │    NetworkShuffleExec     │                │
                   │         (task 1)          │  │         (task 2)          │                │
                   └┬─┬┬─┬┬─┬┬─┬───────────────┘  └───────────────┬─┬┬─┬┬─┬┬─┬┘             Stage N+1
                    │1││2││3││4│                                  │5││6││7││8│                 │
                    └─┘└─┘└─┘└─┘                                  └─┘└─┘└─┘└─┘                 │
                    ▲▲▲▲▲▲▲▲▲▲▲▲                                  ▲▲▲▲▲▲▲▲▲▲▲▲                 ■
    ┌──┬──┬──┬──┬──┬┴┴┼┴┴┼┴┴┴┴┴┴───┬──┬──┬──┬──┬──┬──┬──┬────────┬┴┴┼┴┴┼┴┴┼┴┴┼──┬──┬──┐
    │  │  │  │  │  │  │  │         │  │  │  │  │  │  │  │        │  │  │  │  │  │  │  │        ■
   ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐       ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐      ┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐┌─┐       │
   │1││2││3││4││5││6││7││8│       │1││2││3││4││5││6││7││8│      │1││2││3││4││5││6││7││8│       │
┌──┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴─┐  ┌──┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴─┐ ┌──┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴┴─┴─┐  Stage N
│      RepartitionExec      │  │      RepartitionExec      │ │      RepartitionExec      │     │
│         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
└───────────────────────────┘  └───────────────────────────┘ └───────────────────────────┘     ■

NetworkCoalesceExec

It is the equivalent to a CoalescePartitionsExec, but coalescing tasks instead of partitions.

This node does not shuffle any data and does not perform any transformation. It just reads N tasks with P partitions each, and coalesces them all in a single task with N*P partitions.

                               ┌───────────────────────────┐                                   ■
                               │    NetworkCoalesceExec    │                                   │
                               │         (task 1)          │                                   │
                               └┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┬─┬┘                                Stage N+1
                                │1││2││3││4││5││6││7││8││9│                                    │
                                └─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘└─┘                                    │
                                ▲  ▲  ▲   ▲  ▲  ▲   ▲  ▲  ▲                                    ■
  ┌──┬──┬───────────────────────┴──┴──┘   │  │  │   └──┴──┴──────────────────────┬──┬──┐
  │  │  │                                 │  │  │                                │  │  │       ■
 ┌─┐┌─┐┌─┐                               ┌─┐┌─┐┌─┐                              ┌─┐┌─┐┌─┐      │
 │1││2││3│                               │4││5││6│                              │7││8││9│      │
┌┴─┴┴─┴┴─┴──────────────────┐  ┌─────────┴─┴┴─┴┴─┴─────────┐ ┌──────────────────┴─┴┴─┴┴─┴┐  Stage N
│  Arc<dyn ExecutionPlan>   │  │  Arc<dyn ExecutionPlan>   │ │  Arc<dyn ExecutionPlan>   │     │
│         (task 1)          │  │         (task 2)          │ │         (task 3)          │     │
└───────────────────────────┘  └───────────────────────────┘ └───────────────────────────┘     ■

Distributed planner controlled errors

There are several situations where during distributed planning we realized that some nodes cannot be spread across different tasks. For example, a HashJoinExec in CollectLeft mode cannot be run in multiple tasks.

There are some other situations where it does not make sense to use more than a specific N amount of tasks for a stage. For example, if a DataSourceExec has only 4 partitions, it does not make sense to spawn 5 tasks for it.

A new mechanism was introduced for handling this cases: DistributedPlanError

We can throw DistributedPlanError errors during distributed planning for signaling that stage building code that the stage cannot be built "as is", and something needs to be changed for the stage to be built properly.

For now, only the DistributedPlanError::LimitTasks(usize) exists, and the flow looks like this:

  • We start building a StageExec with DistributedPhysicalOptimizerRule::_distribute_plan_inner with 4 tasks
  • Somewhere along the way, an error of type DistributedPlanError::LimitTasks(2) is thrown
  • Instead of propagating the error, the distributed planner is prompted to choose 2 tasks for that stage instead of 4
  • The StageExec build is attempted again, but this time with 2 input tasks instead of 4
  • The build succeeds, and a new stage is built with 2 tasks instead of 4

Out of the box performance improvement

As we support new distribution patterns, several knobs are unlocked, but a quick naive run already results in overall better performance

==== Comparison with the previous benchmark from 2025-09-21 17:41:00 UTC ====
os:        macos
arch:      aarch64
cpu cores: 16
threads:   2 -> 2
workers:   8 -> 8
=============================================================================
 Query 1: prev=2669 ms, new=2674 ms, diff=1.00 faster ✔
 Query 2: prev= 400 ms, new= 368 ms, diff=1.09 faster ✔
 Query 3: prev=1319 ms, new=1089 ms, diff=1.21 faster ✅
 Query 4: prev=1111 ms, new= 991 ms, diff=1.12 faster ✔
 Query 5: prev=1884 ms, new=1465 ms, diff=1.29 faster ✅
 Query 6: prev= 629 ms, new=1197 ms, diff=1.90 slower ❌
 Query 7: prev=2138 ms, new=2033 ms, diff=1.05 faster ✔
 Query 8: prev=2844 ms, new=1997 ms, diff=1.42 faster ✅
 Query 9: prev=3910 ms, new=2610 ms, diff=1.50 faster ✅
Query 11: prev= 678 ms, new= 274 ms, diff=2.47 faster ✅
Query 12: prev=1858 ms, new=1222 ms, diff=1.52 faster ✅
Query 13: prev=2090 ms, new=1843 ms, diff=1.13 faster ✔
Query 14: prev=1170 ms, new= 968 ms, diff=1.21 faster ✅
Query 15: prev=1078 ms, new=1045 ms, diff=1.03 faster ✔
Query 16: prev= 462 ms, new= 299 ms, diff=1.55 faster ✅
Query 17: prev=3943 ms, new=3773 ms, diff=1.05 faster ✔
Query 18: prev=4019 ms, new=3966 ms, diff=1.01 faster ✔
Query 19: prev=2323 ms, new=1821 ms, diff=1.28 faster ✅
Query 20: prev=1827 ms, new=1059 ms, diff=1.73 faster ✅
Query 21: prev=5924 ms, new=3119 ms, diff=1.90 faster ✅
Query 22: prev= 580 ms, new= 443 ms, diff=1.31 faster ✅

New TPCH plans

TPCH 1 1
TPCH 2 2
TPCH 3 3
TPCH 4 4
TPCH 5 5
TPCH 6 6
TPCH 7 7
TPCH 8 8
TPCH 9 9
TPCH 10 10
TPCH 11 11
TPCH 12 12
TPCH 13 13
TPCH 14 14
TPCH 15 15
TPCH 16 16
TPCH 17 17
TPCH 18 18
TPCH 19 19
TPCH 20 20
TPCH 21 21
TPCH 22 22

@gabotechs gabotechs marked this pull request as ready for review September 21, 2025 17:49
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really annoying that GitHub doesn't realize that this is pretty much the same as the new network_shuffle.rs file... it's showing a huge diff instead

}

#[tokio::test]
#[ignore]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jayshrivastava I might need some help here. As the planning nodes changed, all these tests started failing, as they no longer produce 5 nodes.

Ideally, we could come up with tests that do not need maintenance as planning changes, as I would expect our planning rules to greatly evolve with time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem. I think we can dynamically check the size of the plan by doing a traversal. I can push some changes to the test in a bit

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! my plan was to tackle this in a follow up to not bloat this one more than what it already is, so anything you can provide is more than welcome

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh feel free to #[skip] the test for now if that unblocks you faster

Comment on lines -13 to -15
/// We will add this as an extension to the SessionConfig whenever we need
/// to execute a plan that might include this node.
pub struct PartitionGroup(pub Vec<usize>);
Copy link
Collaborator Author

@gabotechs gabotechs Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @robtandy

As now all tasks in a stage handle the exact same number of partitions, there's a simpler way of identifying which partitions should be exposed by this PartitionIsolatorExec: just the task index within a stage is enough.

For example, If the input DataSource has 10 partitions, and we are on task index 0 out of a total amount of 5 tasks, we can deterministically say that this PartitionIsolatorExec is going to handle partitions [0,1]. If we are on task with index 3 of a total amount of 5 task, we know for sure that we are handling partitions [6,7].

This has two benefits:

  • There are less things to thread, just propagating the task index is enough
  • No assumptions are made regarding which partitions should be executed within a task. For example, for a parquet reader, it's fair to assume that each partition is always going to return the same set of data, but some other leaf nodes might not guarantee that (e.g. a leaf node that implements a work-stealing mpmc queue for distributing data gathering across partitions). Not forcing a specific partition indexes and letting each implementation decide accounts for this.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice.

Comment on lines +54 to +57
pub enum PartitionIsolatorExec {
Pending(PartitionIsolatorPendingExec),
Ready(PartitionIsolatorReadyExec),
}
Copy link
Collaborator Author

@gabotechs gabotechs Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @robtandy

By dividing the PartitionIsolatorExec into two states as the other nodes, we are capable of letting users just place a PartitionIsolatorExec wherever they want, and the distributed planner will then transition these to Ready when a stage is formed and the amount of tasks running in that stage is known.

This change was necessary in this PR for the integration tests: some integration tests are forming manually an execution plan rather than building it out of SQL, and in order for those tests to work, they need to be able to manually place PartitionIsolatorExec nodes in "pending" state in the same way they place the other distributed nodes also in "pending" state.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This file was unused, the graphviz visualization lives in src/execution_plans/stage.rs

Comment on lines -45 to +89
self.partitions_per_task = Some(partitions_per_task);
/// Sets the amount of tasks employed in performing shuffles.
pub fn with_network_shuffle_tasks(mut self, tasks: usize) -> Self {
self.network_shuffle_tasks = Some(tasks);
self
}

/// Sets the amount of input tasks for every task coalescing operation.
pub fn with_network_coalesce_tasks(mut self, tasks: usize) -> Self {
self.network_coalesce_tasks = Some(tasks);
Copy link
Collaborator Author

@gabotechs gabotechs Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @robtandy

Previously, we were splitting P partitions across N tasks. This means that each task handles an average of P/N partitions. The partitions_per_task parameter was used for telling the planner how many partitions should each task handle, and the number of tasks was inferred from it.

Now, it works pretty much the same as before, but with one subtle difference: before distributing, we multiply P by N so that when we fan out P * N partitions across N tasks, each task handles (P * N) / N = P partitions, making sure each task uses all the cores from the machine it runs on.

This means that the partitions_per_task is no longer relevant, as now partitions_per_task is always going to be P (~ the number of cores) of the machine running the task, and now the relevant parameters are these:

    /// Upon shuffling data, this defines how many tasks are employed into performing the shuffling.
    /// ```text
    ///  ( task 1 )  ( task 2 ) ( task 3 )
    ///      ▲           ▲          ▲
    ///      └────┬──────┴─────┬────┘
    ///       ( task 1 )  ( task 2 )       N tasks
    /// ```
    /// This parameter defines N
    network_shuffle_tasks: Option<usize>,
    /// Upon merging multiple tasks into one, this defines how many tasks are merged.
    /// ```text
    ///              ( task 1 )
    ///                  ▲
    ///      ┌───────────┴──────────┐
    ///  ( task 1 )  ( task 2 ) ( task 3 )  N tasks
    /// ```
    /// This parameter defines N
    network_coalesce_tasks: Option<usize>,

@robtandy
Copy link
Collaborator

Thanks for this! I'll have time to review this Tuesday afternoon

@NGA-TRAN
Copy link
Collaborator

  • Your write-up clearly sets expectations for the review—really well done. The improvements you listed are all reasonable and clearly reflected in the benchmark results 🎉
  • I looked into the graphviz plans for Q1 and Q3, which I’ve looked at closely before:
    • Q1 looks fantastic with these changes.
    • Q3 shows progress but still has room for improvement. I’ll open a ticket with detailed suggestions.

Now, let’s dive into the code 🙂

Ok::<_, Status>(TaskData {
session_state,
stage: Arc::new(stage),
num_partitions_remaining: Arc::new(AtomicUsize::new(total_partitions)),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct anymore. We may need better test coverage here. We evict the stage when all partitions in the stage are read from. However, with NetworkCoalesceExec, not all partitions are read. We need to calculate how many partitions this task will execute here.

let stage_proto = doget.stage_proto.ok_or_else(missing("stage_proto"))?;
let stage = stage_from_proto(stage_proto, &session_state, &self.runtime, &codec)
    .map_err(|err| {
        Status::invalid_argument(format!("Cannot decode stage proto: {err}"))
    })?;

// Initialize partition count to the number of partitions in the stage
let total_partitions = stage.plan.properties().partitioning.partition_count();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_task_data_partition_counting tests that we evict when all partitions are executed. We should add a case where we have a Network Coalesce scenario

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, with NetworkCoalesceExec, not all partitions are read.

Unless there's a bug in the implementation, all partitions should be read. Take the first TPCH query's head as an example:

Screenshot 2025-09-22 at 8 25 21 PM

All partitions from all tasks are read exactly once, and this condition remains true independently of wether the reader node is a NetworkShuffleExec or a NetworkCoalesceExec

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact test_task_data_partition_counting is independent from the reader node. If you look at the current test, no references to the NetworkShuffleExec or the previous ArrowFlightReadExec are made. I would say it should still be valid regardless of wether the reader is a NetworkCoalesceExec or NetworkShuffleExec.

Copy link
Collaborator

@jayshrivastava jayshrivastava Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact test_task_data_partition_counting is independent from the reader node. If you look at the current test, no references to the NetworkShuffleExec or the previous ArrowFlightReadExec are made. I would say it should still be valid regardless of wether the reader is a NetworkCoalesceExec or NetworkShuffleExec.

I agree the test should be independent. I was saying that the test asserts that the endpoint evicts a stage when all partitions in the stage are done. I suspect this needs to change slightly.

Please correct me if I'm wrong for this part: It looks like the NetworkCoalesceExec::execute() method would execute partitions 0-5 on stage 2 task 0 only. For a given partition, we look up the URL to send the request to (This is different than NetworkShuffleExec where we execute the partition on each worker). Stage 2 has 24 partitions. I think stage 2 task 0 needs to evict the stage after 6 partitions are run.

///
/// - Stage N+1 must have exactly 1 task. The distributed planner ensures this is true.
/// - The amount of partitions in the single task of Stage N+1 is equal to the sum of all
/// partitions in all tasks in Stage N+1 (e.g. (1,2,3,4,5,6,7,8,9) = (1,2,3)+(4,5,6)+(7,8,9) )
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice diagram and clear invariant description

│ RepartitionExec: partitioning=Hash([RainToday@0], 3), input_partitions=2
│ AggregateExec: mode=Partial, gby=[RainToday@0 as RainToday], aggr=[count(Int64(1))]
│ PartitionIsolatorExec Tasks: t0:[p0,p1,__] t1:[__,__,p0]
│ DataSourceExec: file_groups={3 groups: [[/testdata/weather/result-000000.parquet], [/testdata/weather/result-000001.parquet], [/testdata/weather/result-000002.parquet]]}, projection=[RainToday], file_type=parquet
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay! No more unecessary roundrobin repartition

└──────────────────────────────────────────────────
┌───── Stage 1 Tasks: t0:[p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11,p12,p13,p14,p15,p16,p17,p18,p19,p20,p21,p22,p23] t1:[p24,p25,p26,p27,p28,p29,p30,p31,p32,p33,p34,p35,p36,p37,p38,p39,p40,p41,p42,p43,p44,p45,p46,p47] t2:[p48,p49,p50,p51,p52,p53,p54,p55,p56,p57,p58,p59,p60,p61,p62,p63,p64,p65,p66,p67,p68,p69,p70,p71]
Copy link
Collaborator

@NGA-TRAN NGA-TRAN Sep 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we need to name the partitions for each task different here? Can every task use the same p0 to p23 instead? They use the same algorithm. Their data will be different but must belong to the same hash/range

│ AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[sum(lineitem.l_quantity), sum(lineitem.l_extendedprice), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), avg(lineitem.l_quantity), avg(lineitem.l_extendedprice), avg(lineitem.l_discount), count(Int64(1))]
│ CoalesceBatchesExec: target_batch_size=8192
│ ArrowFlightReadExec input_stage=1, input_partitions=15, input_tasks=8
│ NetworkCoalesceExec read_from=Stage 2, output_partitions=24, input_tasks=4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay, we have a way to put partitions together without repartitioning

Copy link
Collaborator

@NGA-TRAN NGA-TRAN left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like the way you define NetworkCoalesceExec and NetworkShuffleExec. The exact thing we need to shuffle and collect data efficiently 🎉

Since your description makes sense and clearly solve the problems we found earlier, I did not look into the details but rely on @robtandy and @jayshrivastava to review them. I did look at the tests and had one suggestion to rename partitions of different tasks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants