Skip to content

Support zero copy hash repartitioning for Hash Join #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion .github/workflows/extended.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,26 @@ concurrency:
# in the (very rare) event of a hash failure or sqlite library query failure.
on:
push:
branches:
- main
workflow_dispatch:
inputs:
pr_number:
description: 'Pull request number'
type: string
check_run_id:
description: 'Check run ID for status updates'
type: string
pr_head_sha:
description: 'PR head SHA'
type: string

permissions:
contents: read
checks: write

jobs:

# Check crate compiles and base cargo check passes
linux-build-lib:
name: linux build test
Expand All @@ -57,7 +75,7 @@ jobs:
# Run extended tests (with feature 'extended_tests')
linux-test-extended:
name: cargo test 'extended_tests' (amd64)
needs: linux-build-lib
needs: [linux-build-lib]
runs-on: ubuntu-latest
# note: do not use amd/rust container to preserve disk space
steps:
Expand Down Expand Up @@ -127,4 +145,44 @@ jobs:
cargo test --features backtrace --profile release-nonlto --test sqllogictests -- --include-sqlite
cargo clean

# If the workflow was triggered by the PR comment (through pr_comment_commands.yml action) we need to manually update check status to display in UI
update-check-status:
needs: [linux-build-lib, linux-test-extended, hash-collisions, sqllogictest-sqlite]
runs-on: ubuntu-latest
if: ${{ always() && github.event_name == 'workflow_dispatch' }}
steps:
- name: Determine workflow status
id: status
run: |
if [[ "${{ contains(needs.*.result, 'failure') || contains(needs.*.result, 'cancelled') }}" == "true" ]]; then
echo "workflow_status=failure" >> $GITHUB_OUTPUT
echo "conclusion=failure" >> $GITHUB_OUTPUT
else
echo "workflow_status=completed" >> $GITHUB_OUTPUT
echo "conclusion=success" >> $GITHUB_OUTPUT
fi

- name: Update check run
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
const workflowRunUrl = `https://github.com/${context.repo.owner}/${context.repo.repo}/actions/runs/${context.runId}`;

await github.rest.checks.update({
owner: context.repo.owner,
repo: context.repo.repo,
check_run_id: ${{ github.event.inputs.check_run_id }},
status: 'completed',
conclusion: '${{ steps.status.outputs.conclusion }}',
output: {
title: '${{ steps.status.outputs.conclusion == 'success' && 'Extended Tests Passed' || 'Extended Tests Failed' }}',
summary: `Extended tests have completed with status: ${{ steps.status.outputs.conclusion }}.\n\n[View workflow run](${workflowRunUrl})`
},
details_url: workflowRunUrl
});





89 changes: 89 additions & 0 deletions .github/workflows/pr_comment_commands.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: PR commands

on:
issue_comment:
types: [created]

permissions:
contents: read
pull-requests: write
actions: write
checks: write

jobs:
# Starts the extended_tests on a PR branch when someone leaves a `Run extended tests` comment
run_extended_tests:
runs-on: ubuntu-latest
if: ${{ github.event_name == 'issue_comment' && github.event.issue.pull_request && contains(github.event.comment.body, 'Run extended tests') }}
steps:
- name: Dispatch extended tests for a PR branch with comment
uses: actions/github-script@v7
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
script: |
// Get PR details to fetch the branch name
const { data: pullRequest } = await github.rest.pulls.get({
owner: context.repo.owner,
repo: context.repo.repo,
pull_number: context.payload.issue.number
});

// Extract the branch name
const branchName = pullRequest.head.ref;
const headSha = pullRequest.head.sha;
const workflowRunsUrl = `https://github.com/${context.repo.owner}/${context.repo.repo}/actions?query=workflow%3A%22Datafusion+extended+tests%22+branch%3A${branchName}`;

// Create a check run that links to the Actions tab so the run will be visible in GitHub UI
const check = await github.rest.checks.create({
owner: context.repo.owner,
repo: context.repo.repo,
name: 'Extended Tests',
head_sha: headSha,
status: 'in_progress',
output: {
title: 'Extended Tests Running',
summary: `Extended tests have been triggered for this PR.\n\n[View workflow runs](${workflowRunsUrl})`
},
details_url: workflowRunsUrl
});

// Dispatch the workflow with the PR branch name
await github.rest.actions.createWorkflowDispatch({
owner: context.repo.owner,
repo: context.repo.repo,
workflow_id: 'extended.yml',
ref: branchName,
inputs: {
pr_number: context.payload.issue.number.toString(),
check_run_id: check.data.id.toString(),
pr_head_sha: headSha
}
});

- name: Add reaction to comment
uses: actions/github-script@v7
with:
script: |
await github.rest.reactions.createForIssueComment({
owner: context.repo.owner,
repo: context.repo.repo,
comment_id: context.payload.comment.id,
content: 'rocket'
});
35 changes: 25 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ script. Usage instructions can be found with:

```shell
# show usage
cd ./benchmarks/
./bench.sh
```

Expand All @@ -64,9 +65,24 @@ Create / download a specific dataset (TPCH)
```shell
./bench.sh data tpch
```

Data is placed in the `data` subdirectory.

## Running benchmarks

Run benchmark for TPC-H dataset
```shell
./bench.sh run tpch
```
or for TPC-H dataset scale 10
```shell
./bench.sh run tpch10
```

To run for specific query, for example Q21
```shell
./bench.sh run tpch10 21
```

## Select join algorithm
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
To run TPCH benchmarks with join other than HASH:
Expand Down
9 changes: 6 additions & 3 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ usage() {
Orchestrates running benchmarks against DataFusion checkouts

Usage:
$0 data [benchmark]
$0 data [benchmark] [query]
$0 run [benchmark]
$0 compare <branch1> <branch2>
$0 venv
Expand Down Expand Up @@ -410,7 +410,9 @@ run_tpch() {
RESULTS_FILE="${RESULTS_DIR}/tpch_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch benchmark..."
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}"
# Optional query filter to run specific query
QUERY=$([ -n "$ARG3" ] && echo "--query $ARG3" || echo "")
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" --format parquet -o "${RESULTS_FILE}" $QUERY
}

# Runs the tpch in memory
Expand All @@ -425,8 +427,9 @@ run_tpch_mem() {
RESULTS_FILE="${RESULTS_DIR}/tpch_mem_sf${SCALE_FACTOR}.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running tpch_mem benchmark..."
QUERY=$([ -n "$ARG3" ] && echo "--query $ARG3" || echo "")
# -m means in memory
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin tpch -- benchmark datafusion --iterations 5 --path "${TPCH_DIR}" --prefer_hash_join "${PREFER_HASH_JOIN}" -m --format parquet -o "${RESULTS_FILE}" $QUERY
}

# Runs the cancellation benchmark
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/queries/clickbench/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ SELECT "UserID" FROM hits WHERE "UserID" = 435090932899640449;
SELECT COUNT(*) FROM hits WHERE "URL" LIKE '%google%';
SELECT "SearchPhrase", MIN("URL"), COUNT(*) AS c FROM hits WHERE "URL" LIKE '%google%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
SELECT "SearchPhrase", MIN("URL"), MIN("Title"), COUNT(*) AS c, COUNT(DISTINCT "UserID") FROM hits WHERE "Title" LIKE '%Google%' AND "URL" NOT LIKE '%.google.%' AND "SearchPhrase" <> '' GROUP BY "SearchPhrase" ORDER BY c DESC LIMIT 10;
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime") LIMIT 10;
SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime" LIMIT 10;
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "SearchPhrase" LIMIT 10;
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY to_timestamp_seconds("EventTime"), "SearchPhrase" LIMIT 10;
SELECT "SearchPhrase" FROM hits WHERE "SearchPhrase" <> '' ORDER BY "EventTime", "SearchPhrase" LIMIT 10;
SELECT "CounterID", AVG(length("URL")) AS l, COUNT(*) AS c FROM hits WHERE "URL" <> '' GROUP BY "CounterID" HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT REGEXP_REPLACE("Referer", '^https?://(?:www\.)?([^/]+)/.*$', '\1') AS k, AVG(length("Referer")) AS l, COUNT(*) AS c, MIN("Referer") FROM hits WHERE "Referer" <> '' GROUP BY k HAVING COUNT(*) > 100000 ORDER BY l DESC LIMIT 25;
SELECT SUM("ResolutionWidth"), SUM("ResolutionWidth" + 1), SUM("ResolutionWidth" + 2), SUM("ResolutionWidth" + 3), SUM("ResolutionWidth" + 4), SUM("ResolutionWidth" + 5), SUM("ResolutionWidth" + 6), SUM("ResolutionWidth" + 7), SUM("ResolutionWidth" + 8), SUM("ResolutionWidth" + 9), SUM("ResolutionWidth" + 10), SUM("ResolutionWidth" + 11), SUM("ResolutionWidth" + 12), SUM("ResolutionWidth" + 13), SUM("ResolutionWidth" + 14), SUM("ResolutionWidth" + 15), SUM("ResolutionWidth" + 16), SUM("ResolutionWidth" + 17), SUM("ResolutionWidth" + 18), SUM("ResolutionWidth" + 19), SUM("ResolutionWidth" + 20), SUM("ResolutionWidth" + 21), SUM("ResolutionWidth" + 22), SUM("ResolutionWidth" + 23), SUM("ResolutionWidth" + 24), SUM("ResolutionWidth" + 25), SUM("ResolutionWidth" + 26), SUM("ResolutionWidth" + 27), SUM("ResolutionWidth" + 28), SUM("ResolutionWidth" + 29), SUM("ResolutionWidth" + 30), SUM("ResolutionWidth" + 31), SUM("ResolutionWidth" + 32), SUM("ResolutionWidth" + 33), SUM("ResolutionWidth" + 34), SUM("ResolutionWidth" + 35), SUM("ResolutionWidth" + 36), SUM("ResolutionWidth" + 37), SUM("ResolutionWidth" + 38), SUM("ResolutionWidth" + 39), SUM("ResolutionWidth" + 40), SUM("ResolutionWidth" + 41), SUM("ResolutionWidth" + 42), SUM("ResolutionWidth" + 43), SUM("ResolutionWidth" + 44), SUM("ResolutionWidth" + 45), SUM("ResolutionWidth" + 46), SUM("ResolutionWidth" + 47), SUM("ResolutionWidth" + 48), SUM("ResolutionWidth" + 49), SUM("ResolutionWidth" + 50), SUM("ResolutionWidth" + 51), SUM("ResolutionWidth" + 52), SUM("ResolutionWidth" + 53), SUM("ResolutionWidth" + 54), SUM("ResolutionWidth" + 55), SUM("ResolutionWidth" + 56), SUM("ResolutionWidth" + 57), SUM("ResolutionWidth" + 58), SUM("ResolutionWidth" + 59), SUM("ResolutionWidth" + 60), SUM("ResolutionWidth" + 61), SUM("ResolutionWidth" + 62), SUM("ResolutionWidth" + 63), SUM("ResolutionWidth" + 64), SUM("ResolutionWidth" + 65), SUM("ResolutionWidth" + 66), SUM("ResolutionWidth" + 67), SUM("ResolutionWidth" + 68), SUM("ResolutionWidth" + 69), SUM("ResolutionWidth" + 70), SUM("ResolutionWidth" + 71), SUM("ResolutionWidth" + 72), SUM("ResolutionWidth" + 73), SUM("ResolutionWidth" + 74), SUM("ResolutionWidth" + 75), SUM("ResolutionWidth" + 76), SUM("ResolutionWidth" + 77), SUM("ResolutionWidth" + 78), SUM("ResolutionWidth" + 79), SUM("ResolutionWidth" + 80), SUM("ResolutionWidth" + 81), SUM("ResolutionWidth" + 82), SUM("ResolutionWidth" + 83), SUM("ResolutionWidth" + 84), SUM("ResolutionWidth" + 85), SUM("ResolutionWidth" + 86), SUM("ResolutionWidth" + 87), SUM("ResolutionWidth" + 88), SUM("ResolutionWidth" + 89) FROM hits;
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,9 @@ impl RunOpt {
self.register_hits(&ctx).await?;

let iterations = self.common.iterations;
let mut millis = Vec::with_capacity(iterations);
let mut benchmark_run = BenchmarkRun::new();
for query_id in query_range {
let mut millis = Vec::with_capacity(iterations);
benchmark_run.start_new_case(&format!("Query {query_id}"));
let sql = queries.get_query(query_id)?;
println!("Q{query_id}: {sql}");
Expand Down
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ backtrace = ["datafusion/backtrace"]
[dependencies]
arrow = { workspace = true }
async-trait = { workspace = true }
aws-config = "1.6.0"
aws-config = "1.6.1"
aws-credential-types = "1.2.0"
clap = { version = "4.5.34", features = ["derive", "cargo"] }
datafusion = { workspace = true, features = [
Expand Down
Loading
Loading