Skip to content

Plans that have multiple output partitions at the top level fail #75

@adragomir

Description

@adragomir

I have a physical plan that looks like this when it enters ray

    [ output_partitions: 16]    ProjectionExec: expr=[...]
    [ output_partitions: 16]      RayStageExec[0] (output_partitioning=RoundRobinBatch(16))
    [ output_partitions: 16]        RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
    [ output_partitions: 1]          GlobalLimitExec: skip=0, fetch=1
    [ output_partitions: 1]            CoalescePartitionsExec
    [ output_partitions: 16]              CoalesceBatchesExec: target_batch_size=8192, fetch=1
    [ output_partitions: 16]                FilterExec: timestamp@0 >= 1736899200000000 AND timestamp@0 < 1736985600000000
    [ output_partitions: 16]                  DeltaScan
    [ output_partitions: 16]                    ParquetExec: file_groups={...]

In this case, Ray will add a RayStageExec on top of the ProjectionExec, and then fail because Last stage expected to have one partition

I managed to fix it locally by ensuring that the plan has 1 output partition

        let num_output_partitions = match plan.output_partitioning() {
            Partitioning::RoundRobinBatch(x) => x,
            Partitioning::Hash(_, x) => x,
            Partitioning::UnknownPartitioning(x) => x
        };
        if *num_output_partitions > 1 {
            plan = Arc::new(RepartitionExec::try_new(plan, Partitioning::RoundRobinBatch(1))?);
        }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions