Skip to content

[WIP] SQL Support for Gap Filling #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed

Conversation

wolffcm
Copy link
Owner

@wolffcm wolffcm commented Jan 11, 2023

This PR is still a work in progress but follows the design described in this document:
https://docs.google.com/document/d/1vIcs9uhlCX_AkD9bemcDx-YhBOVe_TW5sBbXtKCHIfk/edit#

Feature request in DataFusion:
apache#4809

Discussion is still ongoing; this work may end up in the IOx repo rather than DataFusion.

I tried to break this PR up into logical commits:

  • The first adds two new functions, DATE_BIN_GAPFILL and LOCF. They are just stubs that return errors. This work touched several files but is mostly boilerplate.
  • I added a logical plan node type GapFill.
  • I created the start of an optimizer rule to transform the logical plan as described above. Mostly it just
    • Detects the conditions where it can do gap-filling, and checks for errors
    • Changes to plan to mutate the Aggregate node and add Sort and GapFill nodes.

Still to do:

  • Detect the time range of the query by looking for a filter node
  • Physical implementation of GapFill
  • Support for LOCF().

Copy link

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking very nice @wolffcm 👍

With comments like the following, I share your assesment that this code may end up in IOx rather than DataFusion

apache#4809 (comment)

#[tokio::test]
async fn date_bin_gapfill() {
let ctx = SessionContext::new();
let sql = "SELECT DATE_BIN(INTERVAL '15 minutes', TIMESTAMP '2022-08-03 14:38:50Z', TIMESTAMP '1970-01-01T00:00:00Z') AS res";
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be possible to use the https://github.com/apache/arrow-datafusion/tree/master/datafusion/core/tests/sqllogictests way of writing tests -- I like them as the iteration time is much faster

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that looks very nice!

I wonder if expect_test when writing the unit tests that verify plans, since it can automatically update the expected output.

let result = try_execute_to_batches(&ctx, sql).await;
assert_eq!(
result.err().unwrap().to_string(),
"Arrow error: External error: This feature is not implemented: DATE_BIN_GAPFILL is not yet supported"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error looks strange to me as the query has DATE_BIN in it but this error refers to DATE_BIN_GAPFILL

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That must have been a copy/paste error, but I would have expected the test to fail.

@@ -268,6 +272,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::CurrentTime => Volatility::Stable,

// Volatile builtin functions
BuiltinScalarFunction::DateBinGapfill => Volatility::Volatile,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 this is the right classification I think as we don't want the datafusion optimier to remove / optimize out these functions

"handle_gap_fill"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Ok(res)
}

fn handle_aggregate(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you passed in the aggregate struct here above, you might be able to avoid the second match / unreachable here

Suggested change
fn handle_aggregate(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
fn handle_aggregate(aggregate: &Aggregate) -> Result<Option<LogicalPlan>> {

.build()?;

let expected = "GapFill: groupBy=[[datebin(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None))]], aggr=[[AVG(temps.temp)]]\
\n Sort: datebin(IntervalDayTime(\"60000\"),temps.time,TimestampNanosecond(0, None))\
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is pretty neat

}

fn check_node(node: &LogicalPlan) -> Result<()> {
println!("checking node");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably left over

Comment on lines +242 to +243
})?;
Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can just return the result directly here if you wanted:

Suggested change
})?;
Ok(())
})

Comment on lines +262 to +266
let input = Aggregate::try_from_plan(&self.inner)
.expect("this should always be an Aggregate node")
.input
.as_ref();
vec![input]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let input = Aggregate::try_from_plan(&self.inner)
.expect("this should always be an Aggregate node")
.input
.as_ref();
vec![input]
vec![&self.inner]

I may be missing something but that it seems like this should just return a reference to inner.self?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner Aggregate node isn't a child of GapFill, rather GapFill is using the Aggregate node as a container for its own fields. But looking this over now I could have written it more concisely like self.inner.inputs().

Comment on lines +270 to +272
&Aggregate::try_from_plan(&self.inner)
.expect("this should aways be an Aggregate node")
.schema
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also just call schema on the input I think

Suggested change
&Aggregate::try_from_plan(&self.inner)
.expect("this should aways be an Aggregate node")
.schema
self.inner
.schema()

@wolffcm
Copy link
Owner Author

wolffcm commented Apr 28, 2023

This work (and lots more) is now merged downstream in IOx.

@wolffcm wolffcm closed this Apr 28, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants