Skip to content

feat: switch to using proper Substrait types for IntervalYearMonth and IntervalDayTime #11471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 37 additions & 9 deletions datafusion/substrait/src/logical_plan/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use std::str::FromStr;
use std::sync::Arc;
use substrait::proto::exchange_rel::ExchangeKind;
use substrait::proto::expression::literal::user_defined::Val;
use substrait::proto::expression::literal::IntervalDayToSecond;
use substrait::proto::expression::literal::{IntervalDayToSecond, IntervalYearToMonth};
use substrait::proto::expression::subquery::SubqueryType;
use substrait::proto::expression::{self, FieldReference, Literal, ScalarFunction};
use substrait::proto::read_rel::local_files::file_or_files::PathType::UriFile;
Expand Down Expand Up @@ -1414,7 +1414,7 @@ fn from_substrait_type(
})?;
let field = Arc::new(Field::new_list_field(
from_substrait_type(inner_type, dfs_names, name_idx)?,
// We ignore Substrait's nullability here to match to_substrait_literal
// We ignore Substrait's nullability here to match to_substrait_literal
// which always creates nullable lists
true,
));
Expand Down Expand Up @@ -1445,12 +1445,15 @@ fn from_substrait_type(
));
match map.type_variation_reference {
DEFAULT_CONTAINER_TYPE_VARIATION_REF => {
Ok(DataType::Map(Arc::new(Field::new_struct(
"entries",
[key_field, value_field],
false, // The inner map field is always non-nullable (Arrow #1697),
)), false))
},
Ok(DataType::Map(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto-formatting

Arc::new(Field::new_struct(
"entries",
[key_field, value_field],
false, // The inner map field is always non-nullable (Arrow #1697),
)),
false,
))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
)?,
Expand All @@ -1467,14 +1470,33 @@ fn from_substrait_type(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalYear(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::IntervalDay(i) => match i.type_variation_reference {
DEFAULT_TYPE_VARIATION_REF => {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
v => not_impl_err!(
"Unsupported Substrait type variation {v} of type {s_kind:?}"
),
},
r#type::Kind::UserDefined(u) => {
match u.type_reference {
// Kept for backwards compatibility, use IntervalYear instead
INTERVAL_YEAR_MONTH_TYPE_REF => {
Ok(DataType::Interval(IntervalUnit::YearMonth))
}
// Kept for backwards compatibility, use IntervalDay instead
INTERVAL_DAY_TIME_TYPE_REF => {
Ok(DataType::Interval(IntervalUnit::DayTime))
}
// Not supported yet by Substrait
INTERVAL_MONTH_DAY_NANO_TYPE_REF => {
Ok(DataType::Interval(IntervalUnit::MonthDayNano))
}
Expand All @@ -1484,7 +1506,7 @@ fn from_substrait_type(
u.type_variation_reference
),
}
},
}
r#type::Kind::Struct(s) => Ok(DataType::Struct(from_substrait_struct_type(
s, dfs_names, name_idx,
)?)),
Expand Down Expand Up @@ -1753,11 +1775,16 @@ fn from_substrait_literal(
seconds,
microseconds,
})) => {
// DF only supports millisecond precision, so we lose the micros here
ScalarValue::new_interval_dt(*days, (seconds * 1000) + (microseconds / 1000))
}
Some(LiteralType::IntervalYearToMonth(IntervalYearToMonth { years, months })) => {
ScalarValue::new_interval_ym(*years, *months)
}
Some(LiteralType::FixedChar(c)) => ScalarValue::Utf8(Some(c.clone())),
Some(LiteralType::UserDefined(user_defined)) => {
match user_defined.type_reference {
// Kept for backwards compatibility, use IntervalYearToMonth instead
INTERVAL_YEAR_MONTH_TYPE_REF => {
let Some(Val::Value(raw_val)) = user_defined.val.as_ref() else {
return substrait_err!("Interval year month value is empty");
Expand All @@ -1770,6 +1797,7 @@ fn from_substrait_literal(
})?;
ScalarValue::IntervalYearMonth(Some(i32::from_le_bytes(value_slice)))
}
// Kept for backwards compatibility, use IntervalDayToSecond instead
INTERVAL_DAY_TIME_TYPE_REF => {
let Some(Val::Value(raw_val)) = user_defined.val.as_ref() else {
return substrait_err!("Interval day time value is empty");
Expand Down
140 changes: 42 additions & 98 deletions datafusion/substrait/src/logical_plan/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ use datafusion::logical_expr::{expr, Between, JoinConstraint, LogicalPlan, Opera
use datafusion::prelude::Expr;
use pbjson_types::Any as ProtoAny;
use substrait::proto::exchange_rel::{ExchangeKind, RoundRobin, ScatterFields};
use substrait::proto::expression::literal::user_defined::Val;
use substrait::proto::expression::literal::UserDefined;
use substrait::proto::expression::literal::{List, Struct};
use substrait::proto::expression::literal::{
user_defined, IntervalDayToSecond, IntervalYearToMonth, List, Struct, UserDefined,
};
use substrait::proto::expression::subquery::InPredicate;
use substrait::proto::expression::window_function::BoundsType;
use substrait::proto::r#type::{parameter, Parameter};
use substrait::proto::read_rel::VirtualTable;
use substrait::proto::{CrossRel, ExchangeRel};
use substrait::{
Expand Down Expand Up @@ -95,9 +94,7 @@ use crate::variation_const::{
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF,
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
INTERVAL_DAY_TIME_TYPE_REF, INTERVAL_DAY_TIME_TYPE_URL,
INTERVAL_MONTH_DAY_NANO_TYPE_REF, INTERVAL_MONTH_DAY_NANO_TYPE_URL,
INTERVAL_YEAR_MONTH_TYPE_REF, INTERVAL_YEAR_MONTH_TYPE_URL,
LARGE_CONTAINER_TYPE_VARIATION_REF, TIMESTAMP_MICRO_TYPE_VARIATION_REF,
TIMESTAMP_MILLI_TYPE_VARIATION_REF, TIMESTAMP_NANO_TYPE_VARIATION_REF,
TIMESTAMP_SECOND_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF,
Expand Down Expand Up @@ -1534,47 +1531,31 @@ fn to_substrait_type(dt: &DataType, nullable: bool) -> Result<substrait::proto::
})),
}),
DataType::Interval(interval_unit) => {
// define two type parameters for convenience
let i32_param = Parameter {
parameter: Some(parameter::Parameter::DataType(substrait::proto::Type {
kind: Some(r#type::Kind::I32(r#type::I32 {
match interval_unit {
IntervalUnit::YearMonth => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::IntervalYear(r#type::IntervalYear {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability: r#type::Nullability::Unspecified as i32,
nullability,
})),
})),
};
let i64_param = Parameter {
parameter: Some(parameter::Parameter::DataType(substrait::proto::Type {
kind: Some(r#type::Kind::I64(r#type::I64 {
}),
IntervalUnit::DayTime => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::IntervalDay(r#type::IntervalDay {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability: r#type::Nullability::Unspecified as i32,
nullability,
})),
})),
};

let (type_parameters, type_reference) = match interval_unit {
IntervalUnit::YearMonth => {
let type_parameters = vec![i32_param];
(type_parameters, INTERVAL_YEAR_MONTH_TYPE_REF)
}
IntervalUnit::DayTime => {
let type_parameters = vec![i64_param];
(type_parameters, INTERVAL_DAY_TIME_TYPE_REF)
}
}),
IntervalUnit::MonthDayNano => {
// use 2 `i64` as `i128`
let type_parameters = vec![i64_param.clone(), i64_param];
(type_parameters, INTERVAL_MONTH_DAY_NANO_TYPE_REF)
// Substrait doesn't currently support this type, so we represent it as a UDT
Ok(substrait::proto::Type {
kind: Some(r#type::Kind::UserDefined(r#type::UserDefined {
type_reference: INTERVAL_MONTH_DAY_NANO_TYPE_REF,
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability,
type_parameters: vec![],
})),
})
}
};
Ok(substrait::proto::Type {
kind: Some(r#type::Kind::UserDefined(r#type::UserDefined {
type_reference,
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability,
type_parameters,
})),
})
}
}
DataType::Binary => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::Binary(r#type::Binary {
Expand Down Expand Up @@ -1954,75 +1935,38 @@ fn to_substrait_literal(value: &ScalarValue) -> Result<Literal> {
(LiteralType::Date(*d), DATE_32_TYPE_VARIATION_REF)
}
// Date64 literal is not supported in Substrait
ScalarValue::IntervalYearMonth(Some(i)) => {
let bytes = i.to_le_bytes();
(
LiteralType::UserDefined(UserDefined {
type_reference: INTERVAL_YEAR_MONTH_TYPE_REF,
type_parameters: vec![Parameter {
parameter: Some(parameter::Parameter::DataType(
substrait::proto::Type {
kind: Some(r#type::Kind::I32(r#type::I32 {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability: r#type::Nullability::Required as i32,
})),
},
)),
}],
val: Some(Val::Value(ProtoAny {
type_url: INTERVAL_YEAR_MONTH_TYPE_URL.to_string(),
value: bytes.to_vec().into(),
})),
}),
INTERVAL_YEAR_MONTH_TYPE_REF,
)
}
ScalarValue::IntervalYearMonth(Some(i)) => (
LiteralType::IntervalYearToMonth(IntervalYearToMonth {
// DF only tracks total months, but there should always be 12 months in a year
years: *i / 12,
months: *i % 12,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::IntervalMonthDayNano(Some(i)) => {
// treat `i128` as two contiguous `i64`
// IntervalMonthDayNano is internally represented as a 128-bit integer, containing
// months (32bit), days (32bit), and nanoseconds (64bit)
let bytes = i.to_byte_slice();
let i64_param = Parameter {
parameter: Some(parameter::Parameter::DataType(substrait::proto::Type {
kind: Some(r#type::Kind::I64(r#type::I64 {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability: r#type::Nullability::Required as i32,
})),
})),
};
(
LiteralType::UserDefined(UserDefined {
type_reference: INTERVAL_MONTH_DAY_NANO_TYPE_REF,
type_parameters: vec![i64_param.clone(), i64_param],
val: Some(Val::Value(ProtoAny {
type_parameters: vec![],
val: Some(user_defined::Val::Value(ProtoAny {
type_url: INTERVAL_MONTH_DAY_NANO_TYPE_URL.to_string(),
value: bytes.to_vec().into(),
})),
}),
INTERVAL_MONTH_DAY_NANO_TYPE_REF,
)
}
ScalarValue::IntervalDayTime(Some(i)) => {
let bytes = i.to_byte_slice();
(
LiteralType::UserDefined(UserDefined {
type_reference: INTERVAL_DAY_TIME_TYPE_REF,
type_parameters: vec![Parameter {
parameter: Some(parameter::Parameter::DataType(
substrait::proto::Type {
kind: Some(r#type::Kind::I64(r#type::I64 {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability: r#type::Nullability::Required as i32,
})),
},
)),
}],
val: Some(Val::Value(ProtoAny {
type_url: INTERVAL_DAY_TIME_TYPE_URL.to_string(),
value: bytes.to_vec().into(),
})),
}),
INTERVAL_DAY_TIME_TYPE_REF,
)
}
ScalarValue::IntervalDayTime(Some(i)) => (
LiteralType::IntervalDayToSecond(IntervalDayToSecond {
days: i.days,
seconds: i.milliseconds / 1000,
microseconds: (i.milliseconds % 1000) * 1000,
}),
DEFAULT_TYPE_VARIATION_REF,
),
ScalarValue::Binary(Some(b)) => (
LiteralType::Binary(b.clone()),
DEFAULT_CONTAINER_TYPE_VARIATION_REF,
Expand Down
Loading