Prototype combined Repartition/Filter + Coalesce (WIP) #11647
Closed
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Builds on
CoalesceBatchesStream
to a struct #11610BatchCoalescer
to its own module #12047Which issue does this PR close?
Related to #7957 and #11628
Rationale for this change
As described on #7957 and #11628 the current combination of filtering / repartition followed by coalesce requires copying the data twice. This PR is a prototype to:
This is based on the code in #11610 and a bunch of discussion with @XiangpengHao @edmondop @2010YOUY01 and others
Plan
The theory is there is non trivial time spent in coalesce batches and repartitioning that we could improve performance by several seconds (almost 1s of CPU in several queries query) --- see analysis below
My high level plan is to implement enough of this idea to run some ClickBench queries like Q20 Q15 and Q16 and TPCH Q8 and see. If the results are promising, I will work to scope out how to make this into real PRs
High level plan:
FilterExec
control flowRepartitionExec
take
)Supporting Anaylsis
Details
Clickbench Q16
This query has no filter in this query
CoalesceBathces takes 92ms of the time which the theory is we can totally avoid. Repartition take 178ms
$ datafusion-cli -c 'explain analyze SELECT "UserID", COUNT(*) FROM "hits.parquet" GROUP BY "UserID" ORDER BY COUNT(*) DESC LIMIT 10;' DataFusion CLI v40.0.0 +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=10, elapsed_compute=13.583µs] | | | SortPreservingMergeExec: [count(*)@1 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=3.5µs] | | | SortExec: TopK(fetch=10), expr=[count(*)@1 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=145.487854ms, row_replacements=1570] | | | AggregateExec: mode=FinalPartitioned, gby=[UserID@0 as UserID], aggr=[count(*)], metrics=[output_rows=17630976, elapsed_compute=1.581519786s] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=21164213, elapsed_compute=92.710954ms] | | | RepartitionExec: partitioning=Hash([UserID@0], 16), input_partitions=16, metrics=[send_time=483.37239ms, repart_time=178.203635ms, fetch_time=3.187560018s] | | | AggregateExec: mode=Partial, gby=[UserID@0 as UserID], aggr=[count(*)], metrics=[output_rows=21164213, elapsed_compute=2.415851718s] | | | ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:0..923748528], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:923748528..1847497056], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:3694994112..4618742640], ...]}, projection=[UserID], metrics=[output_rows=99997497, elapsed_compute=16ns, row_groups_pruned_bloom_filter=0, num_predicate_creation_errors=0, bytes_scanned=270116234, row_groups_matched_statistics=0, row_groups_matched_bloom_filter=0, predicate_evaluation_errors=0, row_groups_pruned_statistics=0, file_open_errors=0, pushdown_rows_filtered=0, file_scan_errors=0, page_index_rows_filtered=0, time_elapsed_opening=321.212666ms, page_index_eval_time=32ns, time_elapsed_scanning_total=2.856808586s, time_elapsed_scanning_until_data=15.897751ms, pushdown_eval_time=32ns, time_elapsed_processing=581.233243ms] | | | | +-------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.412 seconds.
Under the covers repartition eventually simply calls take and the internal implementation of take uses a mutable buffer
So in theory we could make a FilteredArrayBuilder or something that was able to
take one or more arrays, and copy those rows which match into an inprogress
output array. This would avoid the need to copy the data twice.
ClickBenchmark Q15 (predicate)
Q15 should be one of the bast cases to show this improvement, as the effect of copying should be especially pronounced (filtering / repartitioning strings)
Well here elapsed compute in coalesce batches is non trivial (both for filter and repartition)
$ datafusion-cli -c "explain analyze SELECT \"SearchEngineID\", \"SearchPhrase\", COUNT(*) AS c FROM \"hits.parquet\" WHERE \"SearchPhrase\" <> '' GROUP BY \"SearchEngineID\", \"SearchPhrase\" ORDER BY c DESC LIMIT 10;" DataFusion CLI v40.0.0 +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | Plan with Metrics | GlobalLimitExec: skip=0, fetch=10, metrics=[output_rows=10, elapsed_compute=22µs] | | | SortPreservingMergeExec: [c@2 DESC], fetch=10, metrics=[output_rows=10, elapsed_compute=5µs] | | | SortExec: TopK(fetch=10), expr=[c@2 DESC], preserve_partitioning=[true], metrics=[output_rows=160, elapsed_compute=48.584286ms, row_replacements=738] | | | ProjectionExec: expr=[SearchEngineID@0 as SearchEngineID, SearchPhrase@1 as SearchPhrase, count(*)@2 as c], metrics=[output_rows=6474212, elapsed_compute=123.825µs] | | | AggregateExec: mode=FinalPartitioned, gby=[SearchEngineID@0 as SearchEngineID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)], metrics=[output_rows=6474212, elapsed_compute=1.962629061s] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=7559975, elapsed_compute=113.930353ms] | | | RepartitionExec: partitioning=Hash([SearchEngineID@0, SearchPhrase@1], 16), input_partitions=16, metrics=[send_time=211.810042ms, fetch_time=6.817337071s, repart_time=352.1063ms] | | | AggregateExec: mode=Partial, gby=[SearchEngineID@0 as SearchEngineID, SearchPhrase@1 as SearchPhrase], aggr=[count(*)], metrics=[output_rows=7559975, elapsed_compute=2.744048082s] | | | CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=13172392, elapsed_compute=91.456825ms] | | | FilterExec: SearchPhrase@1 != , metrics=[output_rows=13172392, elapsed_compute=656.500384ms] | | | ParquetExec: file_groups={16 groups: [[Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:0..923748528], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:923748528..1847497056], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:1847497056..2771245584], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:2771245584..3694994112], [Users/andrewlamb/Software/datafusion/benchmarks/data/hits.parquet:3694994112..4618742640], ...]}, projection=[SearchEngineID, SearchPhrase], predicate=SearchPhrase@39 != , pruning_predicate=CASE WHEN SearchPhrase_null_count@2 = SearchPhrase_row_count@3 THEN false ELSE SearchPhrase_min@0 != OR != SearchPhrase_max@1 END, required_guarantees=[SearchPhrase not in ()], metrics=[output_rows=99997497, elapsed_compute=16ns, file_open_errors=0, row_groups_matched_statistics=226, file_scan_errors=0, page_index_rows_filtered=0, pushdown_rows_filtered=0, row_groups_pruned_statistics=0, bytes_scanned=391794592, predicate_evaluation_errors=0, num_predicate_creation_errors=0, row_groups_matched_bloom_filter=0, row_groups_pruned_bloom_filter=0, time_elapsed_processing=2.725566865s, page_index_eval_time=685ns, time_elapsed_scanning_total=6.044856135s, time_elapsed_opening=373.773501ms, time_elapsed_scanning_until_data=52.434752ms, pushdown_eval_time=32ns] | | | | +-------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.757 seconds.
TPCH Q1 (thanks @2010YOUY01)
Query run:
And indeed CoalesceBathesExec is consuming 332.124162ms (filter) + elapsed_compute=1.997875ms (repartition)
The types are:
So to make this one faster we would have to support several types (decomal, utf8 and date32)