Skip to content

Vectorized hash grouping #6904

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 32 commits into from
Jul 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
54a5c95
Vectorized hash grouping
alamb Jul 10, 2023
1991a76
Prepare for merge to main
alamb Jul 10, 2023
8464816
Improve comments and update size calculations
alamb Jul 10, 2023
4bd3066
Implement test for accumulate_boolean
alamb Jul 10, 2023
7c97b24
Use resize instead of resize_with
alamb Jul 10, 2023
3ca27ac
fix avg size calculation
alamb Jul 10, 2023
e4a52f9
Simplify sum accumulator
alamb Jul 10, 2023
f9eaa68
Add comments explaining i64 as counts
alamb Jul 11, 2023
fc96b13
Clarify `aggreate_arguments`
alamb Jul 11, 2023
9db6f4b
Apply suggestions from code review
alamb Jul 11, 2023
edc8c43
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 11, 2023
19b8981
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 11, 2023
90f8730
Clarify rationale for ScratchSpace being a field
alamb Jul 11, 2023
3369ec1
use slice syntax
alamb Jul 12, 2023
4124bfa
Merge remote-tracking branch 'apache/main' into alamb/fast_gby_hash
alamb Jul 12, 2023
58e3b6d
Update datafusion/physical-expr/src/aggregate/average.rs
alamb Jul 12, 2023
47135ba
Update datafusion/physical-expr/src/aggregate/count.rs
alamb Jul 12, 2023
744b4aa
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
c3d5ff2
fix diagram
alamb Jul 12, 2023
92f6234
Update datafusion/physical-expr/src/aggregate/groups_accumulator/adap…
alamb Jul 12, 2023
f35f2ae
Merge branch 'alamb/fast_gby_hash' of github.com:alamb/arrow-datafusi…
alamb Jul 12, 2023
d19c41e
simplify the supported logic
alamb Jul 12, 2023
da911a3
Add a log message when using slow adapter
alamb Jul 12, 2023
de7b250
fmt
alamb Jul 12, 2023
b313278
Revert "chore(deps): update bigdecimal requirement from 0.3.0 to 0.4.…
alamb Jul 12, 2023
2bff155
Make FileScanConfig::project pub (#6931)
Dandandan Jul 12, 2023
453b71e
feat: add round trip test of physical plan in tpch unit tests (#6918)
r4ntix Jul 12, 2023
32ff16e
Use thiserror to implement the From trait for DFSqlLogicTestError (#6…
jonahgao Jul 12, 2023
54f96e6
parallel csv scan (#6801)
2010YOUY01 Jul 12, 2023
d96dfa2
Add additional test coverage for aggregaes using dates/times/timestam…
alamb Jul 12, 2023
a98b6a0
Support timestamp types for min/max
alamb Jul 13, 2023
5ab75b1
Fix aggregate nullability calculation
alamb Jul 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 8 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,40 +17,26 @@

[workspace]
exclude = ["datafusion-cli"]
members = [
"datafusion/common",
"datafusion/core",
"datafusion/expr",
"datafusion/execution",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/proto",
"datafusion/proto/gen",
"datafusion/row",
"datafusion/sql",
"datafusion/substrait",
"datafusion-examples",
"test-utils",
"benchmarks",
members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/execution", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/proto/gen", "datafusion/row", "datafusion/sql", "datafusion/substrait", "datafusion-examples", "test-utils", "benchmarks",
]
resolver = "2"

[workspace.package]
version = "27.0.0"
edition = "2021"
readme = "README.md"
authors = ["Apache Arrow <[email protected]>"]
license = "Apache-2.0"
edition = "2021"
homepage = "https://github.com/apache/arrow-datafusion"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/apache/arrow-datafusion"
rust-version = "1.64"
version = "27.0.0"

[workspace.dependencies]
arrow = { version = "43.0.0", features = ["prettyprint", "dyn_cmp_dict"] }
arrow-flight = { version = "43.0.0", features = ["flight-sql-experimental"] }
arrow-array = { version = "43.0.0", default-features = false, features = ["chrono-tz"] }
arrow-buffer = { version = "43.0.0", default-features = false }
arrow-flight = { version = "43.0.0", features = ["flight-sql-experimental"] }
arrow-schema = { version = "43.0.0", default-features = false }
arrow-array = { version = "43.0.0", default-features = false, features = ["chrono-tz"] }
parquet = { version = "43.0.0", features = ["arrow", "async", "object_store"] }
sqlparser = { version = "0.35", features = ["visitor"] }

Expand All @@ -70,4 +56,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
223 changes: 106 additions & 117 deletions benchmarks/src/bin/tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,9 +394,24 @@ mod tests {
use std::path::Path;

use super::*;
use datafusion_proto::bytes::{logical_plan_from_bytes, logical_plan_to_bytes};
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
};

fn get_tpch_data_path() -> Result<String> {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return Err(DataFusionError::Execution(format!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
path
)));
}
Ok(path)
}

async fn serde_round_trip(query: usize) -> Result<()> {
async fn round_trip_logical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
Expand Down Expand Up @@ -425,125 +440,99 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn serde_q1() -> Result<()> {
serde_round_trip(1).await
}

#[tokio::test]
async fn serde_q2() -> Result<()> {
serde_round_trip(2).await
}

#[tokio::test]
async fn serde_q3() -> Result<()> {
serde_round_trip(3).await
}

#[tokio::test]
async fn serde_q4() -> Result<()> {
serde_round_trip(4).await
}

#[tokio::test]
async fn serde_q5() -> Result<()> {
serde_round_trip(5).await
}

#[tokio::test]
async fn serde_q6() -> Result<()> {
serde_round_trip(6).await
}

#[tokio::test]
async fn serde_q7() -> Result<()> {
serde_round_trip(7).await
}

#[tokio::test]
async fn serde_q8() -> Result<()> {
serde_round_trip(8).await
}

#[tokio::test]
async fn serde_q9() -> Result<()> {
serde_round_trip(9).await
}

#[tokio::test]
async fn serde_q10() -> Result<()> {
serde_round_trip(10).await
}

#[tokio::test]
async fn serde_q11() -> Result<()> {
serde_round_trip(11).await
}

#[tokio::test]
async fn serde_q12() -> Result<()> {
serde_round_trip(12).await
}

#[tokio::test]
async fn serde_q13() -> Result<()> {
serde_round_trip(13).await
}

#[tokio::test]
async fn serde_q14() -> Result<()> {
serde_round_trip(14).await
}

#[tokio::test]
async fn serde_q15() -> Result<()> {
serde_round_trip(15).await
}

#[tokio::test]
async fn serde_q16() -> Result<()> {
serde_round_trip(16).await
}

#[tokio::test]
async fn serde_q17() -> Result<()> {
serde_round_trip(17).await
}

#[tokio::test]
async fn serde_q18() -> Result<()> {
serde_round_trip(18).await
}

#[tokio::test]
async fn serde_q19() -> Result<()> {
serde_round_trip(19).await
}

#[tokio::test]
async fn serde_q20() -> Result<()> {
serde_round_trip(20).await
async fn round_trip_physical_plan(query: usize) -> Result<()> {
let ctx = SessionContext::default();
let path = get_tpch_data_path()?;
let opt = DataFusionBenchmarkOpt {
query: Some(query),
debug: false,
iterations: 1,
partitions: 2,
batch_size: 8192,
path: PathBuf::from(path.to_string()),
file_format: "tbl".to_string(),
mem_table: false,
output_path: None,
disable_statistics: false,
};
register_tables(&opt, &ctx).await?;
let queries = get_query_sql(query)?;
for query in queries {
let plan = ctx.sql(&query).await?;
let plan = plan.create_physical_plan().await?;
let bytes = physical_plan_to_bytes(plan.clone())?;
let plan2 = physical_plan_from_bytes(&bytes, &ctx)?;
let plan_formatted = format!("{}", displayable(plan.as_ref()).indent(false));
let plan2_formatted =
format!("{}", displayable(plan2.as_ref()).indent(false));
assert_eq!(plan_formatted, plan2_formatted);
}
Ok(())
}

#[tokio::test]
async fn serde_q21() -> Result<()> {
serde_round_trip(21).await
macro_rules! test_round_trip_logical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_logical_plan($query).await
}
};
}

#[tokio::test]
async fn serde_q22() -> Result<()> {
serde_round_trip(22).await
macro_rules! test_round_trip_physical {
($tn:ident, $query:expr) => {
#[tokio::test]
async fn $tn() -> Result<()> {
round_trip_physical_plan($query).await
}
};
}

fn get_tpch_data_path() -> Result<String> {
let path =
std::env::var("TPCH_DATA").unwrap_or_else(|_| "benchmarks/data".to_string());
if !Path::new(&path).exists() {
return Err(DataFusionError::Execution(format!(
"Benchmark data not found (set TPCH_DATA env var to override): {}",
path
)));
}
Ok(path)
}
// logical plan tests
test_round_trip_logical!(round_trip_logical_plan_q1, 1);
test_round_trip_logical!(round_trip_logical_plan_q2, 2);
test_round_trip_logical!(round_trip_logical_plan_q3, 3);
test_round_trip_logical!(round_trip_logical_plan_q4, 4);
test_round_trip_logical!(round_trip_logical_plan_q5, 5);
test_round_trip_logical!(round_trip_logical_plan_q6, 6);
test_round_trip_logical!(round_trip_logical_plan_q7, 7);
test_round_trip_logical!(round_trip_logical_plan_q8, 8);
test_round_trip_logical!(round_trip_logical_plan_q9, 9);
test_round_trip_logical!(round_trip_logical_plan_q10, 10);
test_round_trip_logical!(round_trip_logical_plan_q11, 11);
test_round_trip_logical!(round_trip_logical_plan_q12, 12);
test_round_trip_logical!(round_trip_logical_plan_q13, 13);
test_round_trip_logical!(round_trip_logical_plan_q14, 14);
test_round_trip_logical!(round_trip_logical_plan_q15, 15);
test_round_trip_logical!(round_trip_logical_plan_q16, 16);
test_round_trip_logical!(round_trip_logical_plan_q17, 17);
test_round_trip_logical!(round_trip_logical_plan_q18, 18);
test_round_trip_logical!(round_trip_logical_plan_q19, 19);
test_round_trip_logical!(round_trip_logical_plan_q20, 20);
test_round_trip_logical!(round_trip_logical_plan_q21, 21);
test_round_trip_logical!(round_trip_logical_plan_q22, 22);

// physical plan tests
test_round_trip_physical!(round_trip_physical_plan_q1, 1);
test_round_trip_physical!(round_trip_physical_plan_q2, 2);
test_round_trip_physical!(round_trip_physical_plan_q3, 3);
test_round_trip_physical!(round_trip_physical_plan_q4, 4);
test_round_trip_physical!(round_trip_physical_plan_q5, 5);
test_round_trip_physical!(round_trip_physical_plan_q6, 6);
test_round_trip_physical!(round_trip_physical_plan_q7, 7);
test_round_trip_physical!(round_trip_physical_plan_q8, 8);
test_round_trip_physical!(round_trip_physical_plan_q9, 9);
test_round_trip_physical!(round_trip_physical_plan_q10, 10);
test_round_trip_physical!(round_trip_physical_plan_q11, 11);
test_round_trip_physical!(round_trip_physical_plan_q12, 12);
test_round_trip_physical!(round_trip_physical_plan_q13, 13);
test_round_trip_physical!(round_trip_physical_plan_q14, 14);
test_round_trip_physical!(round_trip_physical_plan_q15, 15);
test_round_trip_physical!(round_trip_physical_plan_q16, 16);
test_round_trip_physical!(round_trip_physical_plan_q17, 17);
test_round_trip_physical!(round_trip_physical_plan_q18, 18);
test_round_trip_physical!(round_trip_physical_plan_q19, 19);
test_round_trip_physical!(round_trip_physical_plan_q20, 20);
test_round_trip_physical!(round_trip_physical_plan_q21, 21);
test_round_trip_physical!(round_trip_physical_plan_q22, 22);
}
Loading