Skip to content

Commit ed24a8c

Browse files
author
jatin
committed
merge main branch
2 parents 33fbfd4 + 94f2a11 commit ed24a8c

File tree

4 files changed

+227
-0
lines changed

4 files changed

+227
-0
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,5 +84,6 @@ SELECT min_by(x, y) FROM VALUES (1, 10), (2, 5), (3, 15), (4, 8) as tab(x, y);
8484
- [x] `mode(expression) -> scalar` - Returns the most frequent (mode) value from a column of data.
8585
- [x] `max_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the maximum value of `expression2`.
8686
- [x] `min_by(expression1, expression2) -> scalar` - Returns the value of `expression1` associated with the minimum value of `expression2`.
87+
- [x] `skewness(expression) -> scalar` - Computes the skewness value for `expression`.
8788
- [x] `kurtois_pop(expression) -> scalar` - Computes the excess kurtosis (Fisher’s definition) without bias correction.
8889
- [x] `kurtosis(expression) -> scalar` - Computes the excess kurtosis (Fisher’s definition) with bias correction according to the sample size.

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ pub mod kurtosis;
3030
pub mod kurtosis_pop;
3131
pub mod max_min_by;
3232
pub mod mode;
33+
pub mod skewness;
3334
pub mod expr_extra_fn {
3435
pub use super::kurtosis::kurtosis;
3536
pub use super::kurtosis_pop::kurtosis_pop;
3637
pub use super::max_min_by::max_by;
3738
pub use super::max_min_by::min_by;
3839
pub use super::mode::mode;
40+
pub use super::skewness::skewness;
3941
}
4042

4143
pub fn all_extra_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
@@ -44,6 +46,7 @@ pub fn all_extra_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
4446
max_min_by::max_by_udaf(),
4547
max_min_by::min_by_udaf(),
4648
kurtosis::kurtosis_udaf(),
49+
skewness::skewness_udaf(),
4750
kurtosis_pop::kurtosis_pop_udaf(),
4851
]
4952
}

src/skewness.rs

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::{ArrayRef, AsArray};
19+
use arrow::datatypes::{Float64Type, UInt64Type};
20+
use datafusion::arrow::datatypes::{DataType, Field};
21+
use datafusion::common::ScalarValue;
22+
use datafusion::logical_expr::{function::AccumulatorArgs, function::StateFieldsArgs};
23+
use datafusion::logical_expr::{Accumulator, AggregateUDFImpl, Signature, Volatility};
24+
use std::any::Any;
25+
use std::fmt::Debug;
26+
use std::ops::{Div, Mul, Sub};
27+
28+
make_udaf_expr_and_func!(SkewnessFunc, skewness, x, "Computes the skewness value.", skewness_udaf);
29+
30+
pub struct SkewnessFunc {
31+
name: String,
32+
signature: Signature,
33+
}
34+
35+
impl Debug for SkewnessFunc {
36+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37+
f.debug_struct("SkewnessFunc")
38+
.field("signature", &self.signature)
39+
.finish()
40+
}
41+
}
42+
43+
impl Default for SkewnessFunc {
44+
fn default() -> Self {
45+
Self::new()
46+
}
47+
}
48+
49+
impl SkewnessFunc {
50+
pub fn new() -> Self {
51+
Self {
52+
name: "skewness".to_string(),
53+
signature: Signature::coercible(vec![DataType::Float64], Volatility::Immutable),
54+
}
55+
}
56+
}
57+
58+
impl AggregateUDFImpl for SkewnessFunc {
59+
fn as_any(&self) -> &dyn Any {
60+
self
61+
}
62+
fn name(&self) -> &str {
63+
&self.name
64+
}
65+
66+
fn signature(&self) -> &Signature {
67+
&self.signature
68+
}
69+
70+
fn return_type(&self, _arg_types: &[DataType]) -> datafusion::common::Result<DataType> {
71+
Ok(DataType::Float64)
72+
}
73+
74+
fn accumulator(&self, _acc_args: AccumulatorArgs) -> datafusion::common::Result<Box<dyn Accumulator>> {
75+
Ok(Box::new(SkewnessAccumulator::new()))
76+
}
77+
78+
fn state_fields(&self, _args: StateFieldsArgs) -> datafusion::common::Result<Vec<Field>> {
79+
Ok(vec![
80+
Field::new("count", DataType::UInt64, true),
81+
Field::new("sum", DataType::Float64, true),
82+
Field::new("sum_sqr", DataType::Float64, true),
83+
Field::new("sum_cub", DataType::Float64, true),
84+
])
85+
}
86+
}
87+
88+
/// Accumulator for calculating the skewness
89+
/// This implementation follows the DuckDB implementation:
90+
/// <https://github.com/duckdb/duckdb/blob/main/src/core_functions/aggregate/distributive/skew.cpp>
91+
#[derive(Debug)]
92+
pub struct SkewnessAccumulator {
93+
count: u64,
94+
sum: f64,
95+
sum_sqr: f64,
96+
sum_cub: f64,
97+
}
98+
99+
impl SkewnessAccumulator {
100+
fn new() -> Self {
101+
Self {
102+
count: 0,
103+
sum: 0f64,
104+
sum_sqr: 0f64,
105+
sum_cub: 0f64,
106+
}
107+
}
108+
}
109+
110+
impl Accumulator for SkewnessAccumulator {
111+
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion::common::Result<()> {
112+
let array = values[0].as_primitive::<Float64Type>();
113+
for val in array.iter().flatten() {
114+
self.count += 1;
115+
self.sum += val;
116+
self.sum_sqr += val.powi(2);
117+
self.sum_cub += val.powi(3);
118+
}
119+
Ok(())
120+
}
121+
fn evaluate(&mut self) -> datafusion::common::Result<ScalarValue> {
122+
if self.count <= 2 {
123+
return Ok(ScalarValue::Float64(None));
124+
}
125+
let count = self.count as f64;
126+
let t1 = 1f64 / count;
127+
let p = (t1 * (self.sum_sqr - self.sum * self.sum * t1)).powi(3).max(0f64);
128+
let div = p.sqrt();
129+
if div == 0f64 {
130+
return Ok(ScalarValue::Float64(None));
131+
}
132+
let t2 = count.mul(count.sub(1f64)).sqrt().div(count.sub(2f64));
133+
let res =
134+
t2 * t1 * (self.sum_cub - 3f64 * self.sum_sqr * self.sum * t1 + 2f64 * self.sum.powi(3) * t1 * t1) / div;
135+
Ok(ScalarValue::Float64(Some(res)))
136+
}
137+
138+
fn size(&self) -> usize {
139+
std::mem::size_of_val(self)
140+
}
141+
142+
fn state(&mut self) -> datafusion::common::Result<Vec<ScalarValue>> {
143+
Ok(vec![
144+
ScalarValue::from(self.count),
145+
ScalarValue::from(self.sum),
146+
ScalarValue::from(self.sum_sqr),
147+
ScalarValue::from(self.sum_cub),
148+
])
149+
}
150+
151+
fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion::common::Result<()> {
152+
let counts = states[0].as_primitive::<UInt64Type>();
153+
let sums = states[1].as_primitive::<Float64Type>();
154+
let sum_sqrs = states[2].as_primitive::<Float64Type>();
155+
let sum_cubs = states[3].as_primitive::<Float64Type>();
156+
157+
for i in 0..counts.len() {
158+
let c = counts.value(i);
159+
if c == 0 {
160+
continue;
161+
}
162+
self.count += c;
163+
self.sum += sums.value(i);
164+
self.sum_sqr += sum_sqrs.value(i);
165+
self.sum_cub += sum_cubs.value(i);
166+
}
167+
Ok(())
168+
}
169+
}

tests/main.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,60 @@ async fn test_kurtosis_pop() {
311311
"###);
312312
}
313313

314+
#[tokio::test]
315+
async fn test_skewness() {
316+
let mut execution = TestExecution::new().await.unwrap().with_setup(TEST_TABLE).await;
317+
318+
// Test with int64
319+
let actual = execution
320+
.run_and_format("SELECT skewness(int64_col) FROM test_table")
321+
.await;
322+
323+
insta::assert_yaml_snapshot!(actual, @r###"
324+
- +--------------------------------+
325+
- "| skewness(test_table.int64_col) |"
326+
- +--------------------------------+
327+
- "| -0.8573214099741201 |"
328+
- +--------------------------------+
329+
"###);
330+
331+
// Test with float64
332+
let actual = execution
333+
.run_and_format("SELECT skewness(float64_col) FROM test_table")
334+
.await;
335+
336+
insta::assert_yaml_snapshot!(actual, @r###"
337+
- +----------------------------------+
338+
- "| skewness(test_table.float64_col) |"
339+
- +----------------------------------+
340+
- "| -0.8573214099741201 |"
341+
- +----------------------------------+
342+
"###);
343+
344+
// Test with single value
345+
let actual = execution.run_and_format("SELECT skewness(1.0)").await;
346+
347+
insta::assert_yaml_snapshot!(actual, @r###"
348+
- +----------------------+
349+
- "| skewness(Float64(1)) |"
350+
- +----------------------+
351+
- "| |"
352+
- +----------------------+
353+
"###);
354+
355+
let actual = execution
356+
.run_and_format("SELECT skewness(col) FROM VALUES (1.0), (2.0) as tab(col)")
357+
.await;
358+
359+
insta::assert_yaml_snapshot!(actual, @r###"
360+
- +-------------------+
361+
- "| skewness(tab.col) |"
362+
- +-------------------+
363+
- "| |"
364+
- +-------------------+
365+
"###);
366+
}
367+
314368
#[tokio::test]
315369
async fn test_kurtosis() {
316370
let mut execution = TestExecution::new().await.unwrap();

0 commit comments

Comments
 (0)