Skip to content

Commit 23ffedd

Browse files
author
jatin
committed
merge main branch
2 parents 1342ef9 + caac6fb commit 23ffedd

File tree

4 files changed

+253
-2
lines changed

4 files changed

+253
-2
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,6 @@ SELECT kurtosis(col) FROM VALUES (1.0), (10.0), (100.0), (10.0), (1.0) as tab(co
9393
- [x] `mode(expression) -> scalar` - Returns the most frequent (mode) value from a column of data.
9494
- [x] `max_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the maximum value of `expression2`.
9595
- [x] `min_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the minimum value of `expression2`.
96+
- [x] `kurtois_pop(expression) -> scalar` - Computes the excess kurtosis (Fisher’s definition) without bias correction.
97+
9698
- [x] `kurtosis(expression) -> scalar` - Returns the kurtosis, a measure of the `tailedness` of the distribution, for the given numeric values in `expression`.

src/kurtosis_pop.rs

Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
// Copired from `datafusion/functions-aggregate/src/kurtosis_pop.rs`
19+
// Originally authored by goldmedal
20+
21+
use arrow::array::{Array, ArrayRef, Float64Array, UInt64Array};
22+
use datafusion::arrow::datatypes::{DataType, Field};
23+
use datafusion::common::cast::as_float64_array;
24+
use datafusion::common::{downcast_value, DataFusionError, Result, ScalarValue};
25+
use datafusion::logical_expr::function::{AccumulatorArgs, StateFieldsArgs};
26+
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
27+
use std::any::Any;
28+
use std::fmt::Debug;
29+
30+
make_udaf_expr_and_func!(
31+
KurtosisPopFunction,
32+
kurtosis_pop,
33+
x,
34+
"Calculates the excess kurtosis (Fisher’s definition) without bias correction.",
35+
kurtosis_pop_udaf
36+
);
37+
38+
pub struct KurtosisPopFunction {
39+
signature: Signature,
40+
}
41+
42+
impl Debug for KurtosisPopFunction {
43+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44+
f.debug_struct("KurtosisPopFunction")
45+
.field("signature", &self.signature)
46+
.finish()
47+
}
48+
}
49+
50+
impl Default for KurtosisPopFunction {
51+
fn default() -> Self {
52+
Self::new()
53+
}
54+
}
55+
56+
impl KurtosisPopFunction {
57+
pub fn new() -> Self {
58+
Self {
59+
signature: Signature::coercible(vec![DataType::Float64], Volatility::Immutable),
60+
}
61+
}
62+
}
63+
64+
impl AggregateUDFImpl for KurtosisPopFunction {
65+
fn as_any(&self) -> &dyn Any {
66+
self
67+
}
68+
69+
fn name(&self) -> &str {
70+
"kurtosis_pop"
71+
}
72+
73+
fn signature(&self) -> &Signature {
74+
&self.signature
75+
}
76+
77+
fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
78+
Ok(DataType::Float64)
79+
}
80+
81+
fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<Field>> {
82+
Ok(vec![
83+
Field::new("count", DataType::UInt64, true),
84+
Field::new("sum", DataType::Float64, true),
85+
Field::new("sum_sqr", DataType::Float64, true),
86+
Field::new("sum_cub", DataType::Float64, true),
87+
Field::new("sum_four", DataType::Float64, true),
88+
])
89+
}
90+
91+
fn accumulator(&self, _acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> {
92+
Ok(Box::new(KurtosisPopAccumulator::new()))
93+
}
94+
}
95+
96+
/// Accumulator for calculating the excess kurtosis (Fisher’s definition) without bias correction.
97+
/// This implementation follows the [DuckDB implementation]:
98+
/// <https://github.com/duckdb/duckdb/blob/main/src/core_functions/aggregate/distributive/kurtosis.cpp>
99+
#[derive(Debug, Default)]
100+
pub struct KurtosisPopAccumulator {
101+
count: u64,
102+
sum: f64,
103+
sum_sqr: f64,
104+
sum_cub: f64,
105+
sum_four: f64,
106+
}
107+
108+
impl KurtosisPopAccumulator {
109+
pub fn new() -> Self {
110+
Self {
111+
count: 0,
112+
sum: 0.0,
113+
sum_sqr: 0.0,
114+
sum_cub: 0.0,
115+
sum_four: 0.0,
116+
}
117+
}
118+
}
119+
120+
impl Accumulator for KurtosisPopAccumulator {
121+
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
122+
let array = as_float64_array(&values[0])?;
123+
for value in array.iter().flatten() {
124+
self.count += 1;
125+
self.sum += value;
126+
self.sum_sqr += value.powi(2);
127+
self.sum_cub += value.powi(3);
128+
self.sum_four += value.powi(4);
129+
}
130+
Ok(())
131+
}
132+
133+
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
134+
let counts = downcast_value!(states[0], UInt64Array);
135+
let sums = downcast_value!(states[1], Float64Array);
136+
let sum_sqrs = downcast_value!(states[2], Float64Array);
137+
let sum_cubs = downcast_value!(states[3], Float64Array);
138+
let sum_fours = downcast_value!(states[4], Float64Array);
139+
140+
for i in 0..counts.len() {
141+
let c = counts.value(i);
142+
if c == 0 {
143+
continue;
144+
}
145+
self.count += c;
146+
self.sum += sums.value(i);
147+
self.sum_sqr += sum_sqrs.value(i);
148+
self.sum_cub += sum_cubs.value(i);
149+
self.sum_four += sum_fours.value(i);
150+
}
151+
152+
Ok(())
153+
}
154+
155+
fn evaluate(&mut self) -> Result<ScalarValue> {
156+
if self.count < 1 {
157+
return Ok(ScalarValue::Float64(None));
158+
}
159+
160+
let count_64 = 1_f64 / self.count as f64;
161+
let m4 = count_64
162+
* (self.sum_four - 4.0 * self.sum_cub * self.sum * count_64
163+
+ 6.0 * self.sum_sqr * self.sum.powi(2) * count_64.powi(2)
164+
- 3.0 * self.sum.powi(4) * count_64.powi(3));
165+
166+
let m2 = (self.sum_sqr - self.sum.powi(2) * count_64) * count_64;
167+
if m2 <= 0.0 {
168+
return Ok(ScalarValue::Float64(None));
169+
}
170+
171+
let target = m4 / (m2.powi(2)) - 3.0;
172+
Ok(ScalarValue::Float64(Some(target)))
173+
}
174+
175+
fn size(&self) -> usize {
176+
std::mem::size_of_val(self)
177+
}
178+
179+
fn state(&mut self) -> Result<Vec<ScalarValue>> {
180+
Ok(vec![
181+
ScalarValue::from(self.count),
182+
ScalarValue::from(self.sum),
183+
ScalarValue::from(self.sum_sqr),
184+
ScalarValue::from(self.sum_cub),
185+
ScalarValue::from(self.sum_four),
186+
])
187+
}
188+
}

src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,12 @@ use datafusion::logical_expr::AggregateUDF;
2727
pub mod macros;
2828
pub mod common;
2929
pub mod kurtosis;
30+
pub mod kurtosis_pop;
3031
pub mod max_min_by;
3132
pub mod mode;
32-
3333
pub mod expr_extra_fn {
3434
pub use super::kurtosis::kurtosis;
35+
pub use super::kurtosis_pop::kurtosis_pop;
3536
pub use super::max_min_by::max_by;
3637
pub use super::max_min_by::min_by;
3738
pub use super::mode::mode;
@@ -43,6 +44,7 @@ pub fn all_extra_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
4344
max_min_by::max_by_udaf(),
4445
max_min_by::min_by_udaf(),
4546
kurtosis::kurtosis_udaf(),
47+
kurtosis_pop::kurtosis_pop_udaf(),
4648
]
4749
}
4850

tests/main.rs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ async fn test_mode_utf8() {
5050
- +---------------------------+
5151
"###);
5252
}
53-
5453
#[tokio::test]
5554
async fn test_mode_int64() {
5655
let mut execution = TestExecution::new().await.unwrap().with_setup(TEST_TABLE).await;
@@ -252,6 +251,66 @@ async fn test_max_by_and_min_by() {
252251
"###);
253252
}
254253

254+
#[tokio::test]
255+
async fn test_kurtosis_pop() {
256+
let mut execution = TestExecution::new().await.unwrap().with_setup(TEST_TABLE).await;
257+
258+
// Test with int64
259+
let actual = execution
260+
.run_and_format("SELECT kurtosis_pop(int64_col) FROM test_table")
261+
.await;
262+
263+
insta::assert_yaml_snapshot!(actual, @r###"
264+
- +------------------------------------+
265+
- "| kurtosis_pop(test_table.int64_col) |"
266+
- +------------------------------------+
267+
- "| -0.9599999999999755 |"
268+
- +------------------------------------+
269+
"###);
270+
271+
// Test with float64
272+
let actual = execution
273+
.run_and_format("SELECT kurtosis_pop(float64_col) FROM test_table")
274+
.await;
275+
276+
insta::assert_yaml_snapshot!(actual, @r###"
277+
- +--------------------------------------+
278+
- "| kurtosis_pop(test_table.float64_col) |"
279+
- +--------------------------------------+
280+
- "| -0.9599999999999755 |"
281+
- +--------------------------------------+
282+
"###);
283+
284+
let actual = execution
285+
.run_and_format("SELECT kurtosis_pop(col) FROM VALUES (1.0) as tab(col)")
286+
.await;
287+
insta::assert_yaml_snapshot!(actual, @r###"
288+
- +-----------------------+
289+
- "| kurtosis_pop(tab.col) |"
290+
- +-----------------------+
291+
- "| |"
292+
- +-----------------------+
293+
"###);
294+
295+
let actual = execution.run_and_format("SELECT kurtosis_pop(1.0)").await;
296+
insta::assert_yaml_snapshot!(actual, @r###"
297+
- +--------------------------+
298+
- "| kurtosis_pop(Float64(1)) |"
299+
- +--------------------------+
300+
- "| |"
301+
- +--------------------------+
302+
"###);
303+
304+
let actual = execution.run_and_format("SELECT kurtosis_pop(null)").await;
305+
insta::assert_yaml_snapshot!(actual, @r###"
306+
- +--------------------+
307+
- "| kurtosis_pop(NULL) |"
308+
- +--------------------+
309+
- "| |"
310+
- +--------------------+
311+
"###);
312+
}
313+
255314
#[tokio::test]
256315
async fn test_kurtosis() {
257316
let mut execution = TestExecution::new().await.unwrap();

0 commit comments

Comments
 (0)