Skip to content

Add physical and logical plan conversion to and from protobuf #892

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 117 additions & 134 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pyo3 = { version = "0.22", features = ["extension-module", "abi3", "abi3-py38"]
arrow = { version = "53", features = ["pyarrow"] }
datafusion = { version = "42.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-substrait = { version = "42.0.0", optional = true }
datafusion-proto = { version = "42.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
prost-types = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.9", features = ["v4"] }
Expand Down
4 changes: 3 additions & 1 deletion python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
from .catalog import Catalog, Database, Table

# The following imports are okay to remain as opaque to the user.
from ._internal import Config, LogicalPlan, ExecutionPlan, runtime
from ._internal import Config, runtime

from .record_batch import RecordBatchStream, RecordBatch

Expand All @@ -53,6 +53,8 @@
WindowFrame,
)

from .plan import LogicalPlan, ExecutionPlan

from . import functions, object_store, substrait

__version__ = importlib_metadata.version(__name__)
Expand Down
12 changes: 7 additions & 5 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from ._internal import RuntimeConfig as RuntimeConfigInternal
from ._internal import SQLOptions as SQLOptionsInternal
from ._internal import SessionContext as SessionContextInternal
from ._internal import LogicalPlan, ExecutionPlan

from datafusion.catalog import Catalog, Table
from datafusion.dataframe import DataFrame
Expand All @@ -39,6 +38,7 @@
import pandas
import polars
import pathlib
from datafusion.plan import LogicalPlan, ExecutionPlan


class SessionConfig:
Expand Down Expand Up @@ -268,8 +268,10 @@ def with_disk_manager_specified(self, *paths: str | pathlib.Path) -> RuntimeConf
Returns:
A new :py:class:`RuntimeConfig` object with the updated setting.
"""
paths = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(paths)
paths_list = [str(p) for p in paths]
self.config_internal = self.config_internal.with_disk_manager_specified(
paths_list
)
return self

def with_unbounded_memory_pool(self) -> RuntimeConfig:
Expand Down Expand Up @@ -558,7 +560,7 @@ def create_dataframe_from_logical_plan(self, plan: LogicalPlan) -> DataFrame:
Returns:
DataFrame representation of the logical plan.
"""
return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan))
return DataFrame(self.ctx.create_dataframe_from_logical_plan(plan._raw_plan))

def from_pylist(
self, data: list[dict[str, Any]], name: str | None = None
Expand Down Expand Up @@ -1034,4 +1036,4 @@ def read_table(self, table: Table) -> DataFrame:

def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
return RecordBatchStream(self.ctx.execute(plan, partitions))
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))
11 changes: 4 additions & 7 deletions python/datafusion/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from typing import Any, List, TYPE_CHECKING
from datafusion.record_batch import RecordBatchStream
from typing_extensions import deprecated
from datafusion.plan import LogicalPlan, ExecutionPlan

if TYPE_CHECKING:
import pyarrow as pa
Expand All @@ -34,10 +35,6 @@

from datafusion._internal import DataFrame as DataFrameInternal
from datafusion.expr import Expr, SortExpr, sort_or_default
from datafusion._internal import (
LogicalPlan,
ExecutionPlan,
)


class DataFrame:
Expand Down Expand Up @@ -316,23 +313,23 @@ def logical_plan(self) -> LogicalPlan:
Returns:
Unoptimized logical plan.
"""
return self.df.logical_plan()
return LogicalPlan(self.df.logical_plan())

def optimized_logical_plan(self) -> LogicalPlan:
"""Return the optimized ``LogicalPlan``.

Returns:
Optimized logical plan.
"""
return self.df.optimized_logical_plan()
return LogicalPlan(self.df.optimized_logical_plan())

def execution_plan(self) -> ExecutionPlan:
"""Return the execution/physical plan.

Returns:
Execution plan.
"""
return self.df.execution_plan()
return ExecutionPlan(self.df.execution_plan())

def repartition(self, num: int) -> DataFrame:
"""Repartition a DataFrame into ``num`` partitions.
Expand Down
8 changes: 5 additions & 3 deletions python/datafusion/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,18 @@

from __future__ import annotations

from typing import Any, Optional, Type
from typing import Any, Optional, Type, TYPE_CHECKING

import pyarrow as pa
from datafusion.common import DataTypeMap, NullTreatment, RexType
from typing_extensions import deprecated

from ._internal import LogicalPlan
from ._internal import expr as expr_internal
from ._internal import functions as functions_internal

if TYPE_CHECKING:
from datafusion.plan import LogicalPlan

# The following are imported from the internal representation. We may choose to
# give these all proper wrappers, or to simply leave as is. These were added
# in order to support passing the `test_imports` unit test.
Expand Down Expand Up @@ -485,7 +487,7 @@ def rex_call_operator(self) -> str:

def column_name(self, plan: LogicalPlan) -> str:
"""Compute the output column name based on the provided logical plan."""
return self.expr.column_name(plan)
return self.expr.column_name(plan._raw_plan)

def order_by(self, *exprs: Expr | SortExpr) -> ExprFuncBuilder:
"""Set the ordering for a window or aggregate function.
Expand Down
147 changes: 147 additions & 0 deletions python/datafusion/plan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""This module supports physical and logical plans in DataFusion."""

from __future__ import annotations

import datafusion._internal as df_internal

from typing import List, Any, TYPE_CHECKING

if TYPE_CHECKING:
from datafusion.context import SessionContext

__all__ = [
"LogicalPlan",
"ExecutionPlan",
]


class LogicalPlan:
"""Logical Plan.

A `LogicalPlan` is a node in a tree of relational operators (such as
Projection or Filter).

Represents transforming an input relation (table) to an output relation
(table) with a potentially different schema. Plans form a dataflow tree
where data flows from leaves up to the root to produce the query result.

`LogicalPlan`s can be created by the SQL query planner, the DataFrame API,
or programmatically (for example custom query languages).
"""

def __init__(self, plan: df_internal.LogicalPlan) -> None:
"""This constructor should not be called by the end user."""
self._raw_plan = plan

def to_variant(self) -> Any:
"""Convert the logical plan into its specific variant."""
return self._raw_plan.to_variant()

def inputs(self) -> List[LogicalPlan]:
"""Returns the list of inputs to the logical plan."""
return [LogicalPlan(p) for p in self._raw_plan.inputs()]

def __repr__(self) -> str:
"""Generate a printable representation of the plan."""
return self._raw_plan.__repr__()

def display(self) -> str:
"""Print the logical plan."""
return self._raw_plan.display()

def display_indent(self) -> str:
"""Print an indented form of the logical plan."""
return self._raw_plan.display_indent()

def display_indent_schema(self) -> str:
"""Print an indented form of the schema for the logical plan."""
return self._raw_plan.display_indent_schema()

def display_graphviz(self) -> str:
"""Print the graph visualization of the logical plan.

Returns a `format`able structure that produces lines meant for graphical display
using the `DOT` language. This format can be visualized using software from
[`graphviz`](https://graphviz.org/)
"""
return self._raw_plan.display_graphviz()

@staticmethod
def from_proto(ctx: SessionContext, data: bytes) -> LogicalPlan:
"""Create a LogicalPlan from protobuf bytes.

Tables created in memory from record batches are currently not supported.
"""
return LogicalPlan(df_internal.LogicalPlan.from_proto(ctx.ctx, data))

def to_proto(self) -> bytes:
"""Convert a LogicalPlan to protobuf bytes.

Tables created in memory from record batches are currently not supported.
"""
return self._raw_plan.to_proto()


class ExecutionPlan:
"""Represent nodes in the DataFusion Physical Plan."""

def __init__(self, plan: df_internal.ExecutionPlan) -> None:
"""This constructor should not be called by the end user."""
self._raw_plan = plan

def children(self) -> List[ExecutionPlan]:
"""Get a list of children `ExecutionPlan`s that act as inputs to this plan.

The returned list will be empty for leaf nodes such as scans, will contain a
single value for unary nodes, or two values for binary nodes (such as joins).
"""
return [ExecutionPlan(e) for e in self._raw_plan.children()]

def display(self) -> str:
"""Print the physical plan."""
return self._raw_plan.display()

def display_indent(self) -> str:
"""Print an indented form of the physical plan."""
return self._raw_plan.display_indent()

def __repr__(self) -> str:
"""Print a string representation of the physical plan."""
return self._raw_plan.__repr__()

@property
def partition_count(self) -> int:
"""Returns the number of partitions in the physical plan."""
return self._raw_plan.partition_count

@staticmethod
def from_proto(ctx: SessionContext, data: bytes) -> ExecutionPlan:
"""Create an ExecutionPlan from protobuf bytes.

Tables created in memory from record batches are currently not supported.
"""
return ExecutionPlan(df_internal.ExecutionPlan.from_proto(ctx.ctx, data))

def to_proto(self) -> bytes:
"""Convert an ExecutionPlan into protobuf bytes.

Tables created in memory from record batches are currently not supported.
"""
return self._raw_plan.to_proto()
10 changes: 6 additions & 4 deletions python/datafusion/substrait.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
from typing import TYPE_CHECKING
from typing_extensions import deprecated
import pathlib
from datafusion.plan import LogicalPlan

if TYPE_CHECKING:
from datafusion.context import SessionContext
from datafusion._internal import LogicalPlan

__all__ = [
"Plan",
Expand Down Expand Up @@ -156,7 +156,9 @@ def to_substrait_plan(logical_plan: LogicalPlan, ctx: SessionContext) -> Plan:
Substrait plan.
"""
return Plan(
substrait_internal.Producer.to_substrait_plan(logical_plan, ctx.ctx)
substrait_internal.Producer.to_substrait_plan(
logical_plan._raw_plan, ctx.ctx
)
)


Expand All @@ -181,8 +183,8 @@ def from_substrait_plan(ctx: SessionContext, plan: Plan) -> LogicalPlan:
Returns:
LogicalPlan.
"""
return substrait_internal.Consumer.from_substrait_plan(
ctx.ctx, plan.plan_internal
return LogicalPlan(
substrait_internal.Consumer.from_substrait_plan(ctx.ctx, plan.plan_internal)
)


Expand Down
42 changes: 42 additions & 0 deletions python/datafusion/tests/test_plans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from datafusion import SessionContext, LogicalPlan, ExecutionPlan
import pytest


# Note: We must use CSV because memory tables are currently not supported for
# conversion to/from protobuf.
@pytest.fixture
def df():
ctx = SessionContext()
return ctx.read_csv(path="testing/data/csv/aggregate_test_100.csv").select("c1")


def test_logical_plan_to_proto(ctx, df) -> None:
logical_plan_bytes = df.logical_plan().to_proto()
logical_plan = LogicalPlan.from_proto(ctx, logical_plan_bytes)

df_round_trip = ctx.create_dataframe_from_logical_plan(logical_plan)

assert df.collect() == df_round_trip.collect()

original_execution_plan = df.execution_plan()
execution_plan_bytes = original_execution_plan.to_proto()
execution_plan = ExecutionPlan.from_proto(ctx, execution_plan_bytes)

assert str(original_execution_plan) == str(execution_plan)
Loading
Loading