[SPARK-47836][SQL] Use doubles sketch replace the GK algorithm for ap… #52701
+40
−89
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
…proximate quantile computation, significantly improving merge performance
What changes were proposed in this pull request?
Use datasketches qualifie to replace spark default GK algorithm for speed up ApproximatePercentile performance
https://datasketches.apache.org/
https://github.com/apache/datasketches-java
i found that spark has use datasketches before, but why not replace approximate qualifie with datasketches?
Why are the changes needed?
https://issues.apache.org/jira/browse/SPARK-47836

https://issues.apache.org/jira/browse/SPARK-46706
https://issues.apache.org/jira/browse/SPARK-40499
multipe issues has reported spark3.x ApproximatePercentile performance problem, which introduce from this bug fix:https://issues.apache.org/jira/browse/SPARK-29336
the performance problem is because GK algorithm is not designed for distruibuted system, it's merge performance is bad, higher upstream stage parallelism leads to worse performance.
Use our produce env spark job as example, it deal with 60 billion records as source input, then sample with ratio 0.06, group by key (key has 4 distinct records), then calculate 1 to 100 percentile with accuracy 999 for 40 columns with spark conf spark.sql.shuffle.partitions=2000, each executor memory is 28g cores is 6

run with spark-2.4.3 the final merge stage cost is 5min
run with spark-3.5.2 the final merge stage cost is 2.8h
adjust spark.sql.shuffle.partitions to 500

run with spark-3.5.2 the final merge stage cost is 11min, but because the data is big, the upstream stage time cost will be increase a lot, and more data is spill to disk
when use datasketches qualifie

run with spark-3.5.2 the final merge stage cost is less than 1min with conf spark.sql.shuffle.partitions=2000
Does this PR introduce any user-facing change?
No
How was this patch tested?
var values = (1 to 100).toArray
var percents = (1 to 100).toArray
val all_quantiles = percents.indices.map(i => (i+1).toDouble / percents.length).toArray
val all_quantiles_str = s"ARRAY(${all_quantiles.toList.mkString(",")})"
for (n <- 0 until 5) {
var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5)
df.createOrReplaceTempView("data_table")
var sql = s"select PERCENTILE_APPROX(cast(value as DOUBLE), $all_quantiles_str, 90) as values from data_table"
val all_answers = spark.sql(sql).collect
val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray
val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected - answer) }).toArray
val max_error = error.max
print(max_error + "\n")
}
test code above the max_error is always 1, which is good than expect
var values = (1 to 10000).toArray
var percents = (1 to 100).toArray
val all_quantiles = percents.indices.map(i => (i+1).toDouble / percents.length).toArray
val all_quantiles_str = s"ARRAY(${all_quantiles.toList.mkString(",")})"
for (n <- 0 until 5) {
var df = spark.sparkContext.makeRDD(values).toDF("value").repartition(5)
df.createOrReplaceTempView("data_table")
var sql = s"select PERCENTILE_APPROX(cast(value as DOUBLE), $all_quantiles_str, 9999) as values from data_table"
val all_answers = spark.sql(sql).collect
val all_answered_ranks = all_answers.map(ans => values.indexOf(ans)).toArray
val error = all_answered_ranks.zipWithIndex.map({ case (answer, expected) => Math.abs(expected*100 - answer) }).toArray
val max_error = error.max
print(max_error + "\n")
}
test code above the max_error is always 1, which is as expect
also test with user produce env job for performance check
Was this patch authored or co-authored using generative AI tooling?
No