Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Unit testing for Python #50

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
@@ -32,14 +32,14 @@ jobs:
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Install protobuf compiler
shell: bash
run: sudo apt-get install protobuf-compiler
- name: Build Rust code
run: cargo build --verbose
- name: Set up Python
uses: actions/setup-python@v2
uses: actions/setup-python@v5
with:
python-version: ${{ env.PYTHON_VERSION }}
- name: Install test dependencies
@@ -49,5 +49,12 @@ jobs:
- name: Generate test data
run: |
./scripts/gen-test-data.sh
- name: Run tests
- name: Run Rust tests
run: cargo test --verbose
- name: Run Python tests
run: |
python -m venv venv
source venv/bin/activate
pip install -r requirements-in.txt
maturin develop
python -m pytest
File renamed without changes.
7 changes: 2 additions & 5 deletions src/query_stage.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@
use crate::context::serialize_execution_plan;
use crate::shuffle::{ShuffleCodec, ShuffleReaderExec};
use datafusion::error::Result;
use datafusion::physical_plan::{ExecutionPlan, Partitioning};
use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, Partitioning};
use datafusion::prelude::SessionContext;
use datafusion_proto::bytes::physical_plan_from_bytes_with_extension_codec;
use pyo3::prelude::*;
@@ -99,10 +99,7 @@ impl QueryStage {
/// Get the input partition count. This is the same as the number of concurrent tasks
/// when we schedule this query stage for execution
pub fn get_input_partition_count(&self) -> usize {
self.plan.children()[0]
.properties()
.output_partitioning()
.partition_count()
self.plan.output_partitioning().partition_count()
}

pub fn get_output_partition_count(&self) -> usize {
2 changes: 1 addition & 1 deletion testdata/expected-plans/q1.txt
Original file line number Diff line number Diff line change
@@ -42,7 +42,7 @@ ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "l_return
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=0, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

Query Stage #2 (2 -> 1):
Query Stage #2 (1 -> 1):
SortPreservingMergeExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST]
ShuffleReaderExec(stage_id=1, input_partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 2))

4 changes: 2 additions & 2 deletions testdata/expected-plans/q10.txt
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ SortPreservingMergeExec: [revenue@2 DESC], fetch=20
DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2))
ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name]

@@ -117,7 +117,7 @@ ShuffleWriterExec(stage_id=7, output_partitioning=Hash([Column { name: "c_custke
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 2 }, Column { name: "c_phone", index: 3 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 6 }], 2))

Query Stage #8 (2 -> 1):
Query Stage #8 (1 -> 1):
SortPreservingMergeExec: [revenue@2 DESC], fetch=20
ShuffleReaderExec(stage_id=7, input_partitioning=Hash([Column { name: "c_custkey", index: 0 }, Column { name: "c_name", index: 1 }, Column { name: "c_acctbal", index: 3 }, Column { name: "c_phone", index: 6 }, Column { name: "n_name", index: 4 }, Column { name: "c_address", index: 5 }, Column { name: "c_comment", index: 7 }], 2))

10 changes: 5 additions & 5 deletions testdata/expected-plans/q11.txt
Original file line number Diff line number Diff line change
@@ -86,13 +86,13 @@ SortPreservingMergeExec: [value@1 DESC]
DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)]

Query Stage #1 (1 -> 2):
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2))
ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey]

@@ -120,13 +120,13 @@ ShuffleWriterExec(stage_id=4, output_partitioning=Hash([], 2))
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "s_nationkey", index: 2 }], 2))

Query Stage #5 (1 -> 2):
Query Stage #5 (2 -> 2):
ShuffleWriterExec(stage_id=5, output_partitioning=Hash([Column { name: "n_nationkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: n_name@1 = ALGERIA, projection=[n_nationkey@0]
ParquetExec: file_groups={ ... }, projection=[n_nationkey, n_name], predicate=n_name@1 = ALGERIA, pruning_predicate=CASE WHEN n_name_null_count@2 = n_name_row_count@3 THEN false ELSE n_name_min@0 <= ALGERIA AND ALGERIA <= n_name_max@1 END, required_guarantees=[n_name in (ALGERIA)]

Query Stage #6 (1 -> 2):
Query Stage #6 (2 -> 2):
ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2))
ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_nationkey]

@@ -167,7 +167,7 @@ ShuffleWriterExec(stage_id=10, output_partitioning=Hash([Column { name: "ps_part
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=9, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2))

Query Stage #11 (2 -> 1):
Query Stage #11 (1 -> 1):
SortPreservingMergeExec: [value@1 DESC]
ShuffleReaderExec(stage_id=10, input_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q12.txt
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "l_shipmo
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))

Query Stage #4 (2 -> 1):
Query Stage #4 (1 -> 1):
SortPreservingMergeExec: [l_shipmode@0 ASC NULLS LAST]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "l_shipmode", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q13.txt
Original file line number Diff line number Diff line change
@@ -70,7 +70,7 @@ ShuffleWriterExec(stage_id=3, output_partitioning=Hash([Column { name: "c_count"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=2, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

Query Stage #4 (2 -> 1):
Query Stage #4 (1 -> 1):
SortPreservingMergeExec: [custdist@1 DESC,c_count@0 DESC]
ShuffleReaderExec(stage_id=3, input_partitioning=Hash([Column { name: "c_count", index: 0 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q14.txt
Original file line number Diff line number Diff line change
@@ -33,7 +33,7 @@ ProjectionExec: expr=[100 * CAST(sum(CASE WHEN part.p_type LIKE Utf8("PROMO%") T
DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2))
ParquetExec: file_groups={ ... }, projection=[p_partkey, p_type]

10 changes: 5 additions & 5 deletions testdata/expected-plans/q16.txt
Original file line number Diff line number Diff line change
@@ -48,25 +48,25 @@ SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1
CoalesceBatchesExec: target_batch_size=8192
FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }])
RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (5, 41, 49, 15, 6, 31, 47, 14)]
ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (6, 5, 31, 41, 47, 14, 15, 49)]
CoalesceBatchesExec: target_batch_size=8192
RepartitionExec: partitioning=Hash([ps_partkey@0], 2), input_partitions=2
ParquetExec: file_groups={ ... }, projection=[ps_partkey, ps_suppkey]

DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "s_suppkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: s_comment@1 LIKE %Customer%Complaints%, projection=[s_suppkey@0]
ParquetExec: file_groups={ ... }, projection=[s_suppkey, s_comment], predicate=s_comment@6 LIKE %Customer%Complaints%

Query Stage #1 (1 -> 2):
Query Stage #1 (2 -> 2):
ShuffleWriterExec(stage_id=1, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: p_brand@1 != Brand#14 AND p_type@2 NOT LIKE SMALL PLATED% AND Use p_size@3 IN (SET) ([Literal { value: Int32(14) }, Literal { value: Int32(6) }, Literal { value: Int32(5) }, Literal { value: Int32(31) }, Literal { value: Int32(49) }, Literal { value: Int32(15) }, Literal { value: Int32(41) }, Literal { value: Int32(47) }])
ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (5, 41, 49, 15, 6, 31, 47, 14)]
ParquetExec: file_groups={ ... }]), pruning_predicate=CASE WHEN p_brand_null_count@2 = p_brand_row_count@3 THEN false ELSE p_brand_min@0 != Brand#14 OR Brand#14 != p_brand_max@1 END AND (CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 14 AND 14 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 6 AND 6 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 5 AND 5 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 31 AND 31 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 49 AND 49 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 15 AND 15 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 41 AND 41 <= p_size_max@5 END OR CASE WHEN p_size_null_count@6 = p_size_row_count@7 THEN false ELSE p_size_min@4 <= 47 AND 47 <= p_size_max@5 END), required_guarantees=[p_brand not in (Brand#14), p_size in (6, 5, 31, 41, 47, 14, 15, 49)]

Query Stage #2 (2 -> 2):
ShuffleWriterExec(stage_id=2, output_partitioning=Hash([Column { name: "ps_partkey", index: 0 }], 2))
@@ -107,7 +107,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "p_brand"
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2))

Query Stage #7 (2 -> 1):
Query Stage #7 (1 -> 1):
SortPreservingMergeExec: [supplier_cnt@3 DESC,p_brand@0 ASC NULLS LAST,p_type@1 ASC NULLS LAST,p_size@2 ASC NULLS LAST]
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "p_brand", index: 0 }, Column { name: "p_type", index: 1 }, Column { name: "p_size", index: 2 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q17.txt
Original file line number Diff line number Diff line change
@@ -47,7 +47,7 @@ ProjectionExec: expr=[CAST(sum(lineitem.l_extendedprice)@0 AS Float64) / 7 as av
DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: p_brand@1 = Brand#42 AND p_container@2 = LG BAG, projection=[p_partkey@0]
2 changes: 1 addition & 1 deletion testdata/expected-plans/q18.txt
Original file line number Diff line number Diff line change
@@ -104,7 +104,7 @@ ShuffleWriterExec(stage_id=6, output_partitioning=Hash([Column { name: "c_name",
CoalesceBatchesExec: target_batch_size=8192
ShuffleReaderExec(stage_id=5, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2))

Query Stage #7 (2 -> 1):
Query Stage #7 (1 -> 1):
SortPreservingMergeExec: [o_totalprice@4 DESC,o_orderdate@3 ASC NULLS LAST], fetch=100
ShuffleReaderExec(stage_id=6, input_partitioning=Hash([Column { name: "c_name", index: 0 }, Column { name: "c_custkey", index: 1 }, Column { name: "o_orderkey", index: 2 }, Column { name: "o_orderdate", index: 3 }, Column { name: "o_totalprice", index: 4 }], 2))

2 changes: 1 addition & 1 deletion testdata/expected-plans/q19.txt
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ ProjectionExec: expr=[sum(lineitem.l_extendedprice * Int64(1) - lineitem.l_disco
DataFusion Ray Distributed Plan
===========

Query Stage #0 (1 -> 2):
Query Stage #0 (2 -> 2):
ShuffleWriterExec(stage_id=0, output_partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2))
CoalesceBatchesExec: target_batch_size=8192
FilterExec: (p_brand@1 = Brand#21 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("SM CASE") }, Literal { value: Utf8("SM BOX") }, Literal { value: Utf8("SM PACK") }, Literal { value: Utf8("SM PKG") }]) AND p_size@2 <= 5 OR p_brand@1 = Brand#13 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("MED BAG") }, Literal { value: Utf8("MED BOX") }, Literal { value: Utf8("MED PKG") }, Literal { value: Utf8("MED PACK") }]) AND p_size@2 <= 10 OR p_brand@1 = Brand#52 AND Use p_container@3 IN (SET) ([Literal { value: Utf8("LG CASE") }, Literal { value: Utf8("LG BOX") }, Literal { value: Utf8("LG PACK") }, Literal { value: Utf8("LG PKG") }]) AND p_size@2 <= 15) AND p_size@2 >= 1
Loading