Skip to content
133 changes: 112 additions & 21 deletions datafusion/functions/src/datetime/date_bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
// under the License.

use std::any::Any;
use std::ops::Sub;
use std::sync::Arc;

use arrow::array::temporal_conversions::NANOSECONDS;
use arrow::array::timezone::Tz;
use arrow::array::types::{
ArrowTimestampType, IntervalDayTimeType, IntervalMonthDayNanoType,
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
Expand All @@ -38,7 +40,7 @@ use datafusion_expr::{
ColumnarValue, ScalarUDFImpl, Signature, Volatility, TIMEZONE_WILDCARD,
};

use chrono::{DateTime, Datelike, Duration, Months, TimeDelta, Utc};
use chrono::{DateTime, Datelike, Duration, Months, Offset, TimeDelta, TimeZone, Utc};

#[derive(Debug)]
pub struct DateBinFunc {
Expand Down Expand Up @@ -341,39 +343,67 @@ fn date_bin_impl(
Millisecond => NANOSECONDS / 1_000,
Second => NANOSECONDS,
};

move |x: i64| stride_fn(stride, x * scale, origin) / scale
}

Ok(match array {
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampNanosecondType>(origin, stride, stride_fn);

// TODO chunchun: maybe simplify this part
let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampMicrosecondType>(origin, stride, stride_fn);

let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampMicrosecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampMillisecondType>(origin, stride, stride_fn);
let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};
ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
ColumnarValue::Scalar(ScalarValue::TimestampSecond(v, tz_opt)) => {
let apply_stride_fn =
stride_map_fn::<TimestampSecondType>(origin, stride, stride_fn);

let date_binned_ts = v.map(apply_stride_fn);
let adjusted_ts = match (date_binned_ts, tz_opt) {
(Some(ts), Some(tz)) => Some(adjust_to_local_time(ts, tz)),
(_, _) => date_binned_ts,
};

ColumnarValue::Scalar(ScalarValue::TimestampSecond(
v.map(apply_stride_fn),
adjusted_ts,
tz_opt.clone(),
))
}
Expand All @@ -392,7 +422,13 @@ fn date_bin_impl(
let array = as_primitive_array::<T>(array)?;
let apply_stride_fn = stride_map_fn::<T>(origin, stride, stride_fn);
let array: PrimitiveArray<T> = array
.unary(apply_stride_fn)
.unary(|ts| {
let date_binned_ts = apply_stride_fn(ts);
match tz_opt {
Some(tz) => adjust_to_local_time(date_binned_ts, tz),
None => date_binned_ts,
}
})
.with_timezone_opt(tz_opt.clone());

Ok(ColumnarValue::Array(Arc::new(array)))
Expand Down Expand Up @@ -435,6 +471,23 @@ fn date_bin_impl(
})
}

// TODO chunchun: add description
fn adjust_to_local_time(ts: i64, timezone: &str) -> i64 {
let tz: Tz = timezone.parse().unwrap();

let date_time = DateTime::from_timestamp_nanos(ts).naive_utc();

let offset_seconds: i64 = tz
.offset_from_utc_datetime(&date_time)
.fix()
.local_minus_utc() as i64;

let adjusted_date_time =
date_time.sub(TimeDelta::try_seconds(offset_seconds).unwrap());

adjusted_date_time.and_utc().timestamp_nanos_opt().unwrap()
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down Expand Up @@ -673,6 +726,42 @@ mod tests {
"2020-09-08T00:00:00Z",
],
),
(
vec![
"2024-03-31T00:30:00Z",
"2024-03-31T01:00:00Z",
"2024-03-31T04:20:00Z",
"2024-04-01T04:20:00Z",
"2024-05-01T00:30:00Z",
],
Some("Europe/Brussels".into()),
"1970-01-01T00:00:00Z",
vec![
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-04-01T00:00:00+02:00",
"2024-05-01T00:00:00+02:00",
],
),
(
vec![
"2024-03-31T00:30:00",
"2024-03-31T01:00:00",
"2024-03-31T04:20:00",
"2024-04-01T04:20:00",
"2024-05-01T00:30:00",
],
Some("Europe/Brussels".into()),
"1970-01-01T00:00:00Z",
vec![
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-03-31T00:00:00+01:00",
"2024-04-01T00:00:00+02:00",
"2024-05-01T00:00:00+02:00",
],
),
(
vec![
"2020-09-08T00:00:00Z",
Expand All @@ -684,11 +773,11 @@ mod tests {
Some("-02".into()),
"1970-01-01T00:00:00Z",
vec![
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00Z",
"2020-09-08T00:00:00-02:00",
"2020-09-08T00:00:00-02:00",
"2020-09-08T00:00:00-02:00",
"2020-09-08T00:00:00-02:00",
"2020-09-08T00:00:00-02:00",
],
),
(
Expand All @@ -702,11 +791,11 @@ mod tests {
Some("+05".into()),
"1970-01-01T00:00:00+05",
vec![
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-08T00:00:00+05",
"2020-09-07T19:00:00+05:00",
"2020-09-07T19:00:00+05:00",
"2020-09-07T19:00:00+05:00",
"2020-09-07T19:00:00+05:00",
"2020-09-07T19:00:00+05:00",
Comment on lines -705 to +798
Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not right.. this is a change in behavior

],
),
(
Expand All @@ -720,11 +809,11 @@ mod tests {
Some("+08".into()),
"1970-01-01T00:00:00+08",
vec![
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-08T00:00:00+08",
"2020-09-07T16:00:00+08:00",
"2020-09-07T16:00:00+08:00",
"2020-09-07T16:00:00+08:00",
"2020-09-07T16:00:00+08:00",
"2020-09-07T16:00:00+08:00",
],
),
];
Expand Down Expand Up @@ -767,6 +856,8 @@ mod tests {
});
}

// TODO chunchun: may need to add test for single: nano, milli, macro, sec, ...

#[test]
fn test_date_bin_single() {
let cases = vec![
Expand Down
12 changes: 11 additions & 1 deletion datafusion/functions/src/datetime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub mod make_date;
pub mod now;
pub mod to_char;
pub mod to_date;
pub mod to_local_time;
pub mod to_timestamp;
pub mod to_unixtime;

Expand All @@ -50,6 +51,7 @@ make_udf_function!(
make_udf_function!(now::NowFunc, NOW, now);
make_udf_function!(to_char::ToCharFunc, TO_CHAR, to_char);
make_udf_function!(to_date::ToDateFunc, TO_DATE, to_date);
make_udf_function!(to_local_time::ToLocalTimeFunc, TO_LOCAL_TIME, to_local_time);
make_udf_function!(to_unixtime::ToUnixtimeFunc, TO_UNIXTIME, to_unixtime);
make_udf_function!(to_timestamp::ToTimestampFunc, TO_TIMESTAMP, to_timestamp);
make_udf_function!(
Expand Down Expand Up @@ -108,7 +110,14 @@ pub mod expr_fn {
),(
now,
"returns the current timestamp in nanoseconds, using the same value for all instances of now() in same statement",
),(
),
// TODO chunchun: add more doc examples
(
to_local_time,
"converts a timestamp with a timezone to a local time, returns a timestamp without timezone",
args,
),
(
to_unixtime,
"converts a string and optional formats to a Unixtime",
args,
Expand Down Expand Up @@ -277,6 +286,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
now(),
to_char(),
to_date(),
to_local_time(),
to_unixtime(),
to_timestamp(),
to_timestamp_seconds(),
Expand Down
Loading