Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Allow JIT compilation with an internal API #61032

Merged
merged 10 commits into from
Mar 14, 2025
1 change: 1 addition & 0 deletions doc/source/whatsnew/v3.0.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ Other enhancements
- :class:`Rolling` and :class:`Expanding` now support aggregations ``first`` and ``last`` (:issue:`33155`)
- :func:`read_parquet` accepts ``to_pandas_kwargs`` which are forwarded to :meth:`pyarrow.Table.to_pandas` which enables passing additional keywords to customize the conversion to pandas, such as ``maps_as_pydicts`` to read the Parquet map data type as python dictionaries (:issue:`56842`)
- :meth:`.DataFrameGroupBy.transform`, :meth:`.SeriesGroupBy.transform`, :meth:`.DataFrameGroupBy.agg`, :meth:`.SeriesGroupBy.agg`, :meth:`.SeriesGroupBy.apply`, :meth:`.DataFrameGroupBy.apply` now support ``kurt`` (:issue:`40139`)
- :meth:`DataFrame.apply` supports using third-party execution engines like the Bodo.ai JIT compiler (:issue:`60668`)
- :meth:`DataFrameGroupBy.transform`, :meth:`SeriesGroupBy.transform`, :meth:`DataFrameGroupBy.agg`, :meth:`SeriesGroupBy.agg`, :meth:`RollingGroupby.apply`, :meth:`ExpandingGroupby.apply`, :meth:`Rolling.apply`, :meth:`Expanding.apply`, :meth:`DataFrame.apply` with ``engine="numba"`` now supports positional arguments passed as kwargs (:issue:`58995`)
- :meth:`Rolling.agg`, :meth:`Expanding.agg` and :meth:`ExponentialMovingWindow.agg` now accept :class:`NamedAgg` aggregations through ``**kwargs`` (:issue:`28333`)
- :meth:`Series.map` can now accept kwargs to pass on to func (:issue:`59814`)
Expand Down
2 changes: 2 additions & 0 deletions pandas/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""public toolkit API"""

from pandas.api import (
executors,
extensions,
indexers,
interchange,
Expand All @@ -9,6 +10,7 @@
)

__all__ = [
"executors",
"extensions",
"indexers",
"interchange",
Expand Down
7 changes: 7 additions & 0 deletions pandas/api/executors/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""
Public API for function executor engines to be used with ``map`` and ``apply``.
"""

from pandas.core.apply import BaseExecutionEngine

__all__ = ["BaseExecutionEngine"]
104 changes: 104 additions & 0 deletions pandas/core/apply.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,110 @@
ResType = dict[int, Any]


class BaseExecutionEngine(abc.ABC):
"""
Base class for execution engines for map and apply methods.

An execution engine receives all the parameters of a call to
``apply`` or ``map``, such as the data container, the function,
etc. and takes care of running the execution.

Supporting different engines allows functions to be JIT compiled,
run in parallel, and others. Besides the default executor which
simply runs the code with the Python interpreter and pandas.
"""

@staticmethod
@abc.abstractmethod
def map(
data: Series | DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable | None,
skip_na: bool,
):
"""
Executor method to run functions elementwise.

In general, pandas uses ``map`` for running functions elementwise,
but ``Series.apply`` with the default ``by_row='compat'`` will also
call this executor function.

Parameters
----------
data : Series, DataFrame or NumPy ndarray
The object to use for the data. Some methods implement a ``raw``
parameter which will convert the original pandas object to a
NumPy array, which will then be passed here to the executor.
func : function or NumPy ufunc
The function to execute.
args : tuple
Positional arguments to be passed to ``func``.
kwargs : dict
Keyword arguments to be passed to ``func``.
decorator : function, optional
For JIT compilers and other engines that need to decorate the
function ``func``, this is the decorator to use. While the
executor may already know which is the decorator to use, this
is useful as for a single executor the user can specify for
example ``numba.jit`` or ``numba.njit(nogil=True)``, and this
decorator parameter will contain the exact decorator from the
executor the user wants to use.
skip_na : bool
Whether the function should be called for missing values or not.
This is specified by the pandas user as ``map(na_action=None)``
or ``map(na_action='ignore')``.
"""

@staticmethod
@abc.abstractmethod
def apply(
data: Series | DataFrame | np.ndarray,
func: AggFuncType,
args: tuple,
kwargs: dict[str, Any],
decorator: Callable,
axis: Axis,
):
"""
Executor method to run functions by an axis.

While we can see ``map`` as executing the function for each cell
in a ``DataFrame`` (or ``Series``), ``apply`` will execute the
function for each column (or row).

Parameters
----------
data : Series, DataFrame or NumPy ndarray
The object to use for the data. Some methods implement a ``raw``
parameter which will convert the original pandas object to a
NumPy array, which will then be passed here to the executor.
func : function or NumPy ufunc
The function to execute.
args : tuple
Positional arguments to be passed to ``func``.
kwargs : dict
Keyword arguments to be passed to ``func``.
decorator : function, optional
For JIT compilers and other engines that need to decorate the
function ``func``, this is the decorator to use. While the
executor may already know which is the decorator to use, this
is useful as for a single executor the user can specify for
example ``numba.jit`` or ``numba.njit(nogil=True)``, and this
decorator parameter will contain the exact decorator from the
executor the user wants to use.
axis : {0 or 'index', 1 or 'columns'}
0 or 'index' should execute the function passing each column as
parameter. 1 or 'columns' should execute the function passing
each row as parameter. The default executor engine passes rows
as pandas ``Series``. Other executor engines should probably
expect functions to be implemented this way for compatibility.
But passing rows as other data structures is technically possible
as far as the function ``func`` is implemented accordingly.
"""


def frame_apply(
obj: DataFrame,
func: AggFuncType,
Expand Down
155 changes: 119 additions & 36 deletions pandas/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -10275,7 +10275,7 @@ def apply(
result_type: Literal["expand", "reduce", "broadcast"] | None = None,
args=(),
by_row: Literal[False, "compat"] = "compat",
engine: Literal["python", "numba"] = "python",
engine: Callable | None | Literal["python", "numba"] = None,
engine_kwargs: dict[str, bool] | None = None,
**kwargs,
):
Expand Down Expand Up @@ -10339,35 +10339,32 @@ def apply(

.. versionadded:: 2.1.0

engine : {'python', 'numba'}, default 'python'
Choose between the python (default) engine or the numba engine in apply.
engine : decorator or {'python', 'numba'}, optional
Choose the execution engine to use. If not provided the function
will be executed by the regular Python interpreter.

The numba engine will attempt to JIT compile the passed function,
which may result in speedups for large DataFrames.
It also supports the following engine_kwargs :
Other options include JIT compilers such Numba and Bodo, which in some
cases can speed up the execution. To use an executor you can provide
the decorators ``numba.jit``, ``numba.njit`` or ``bodo.jit``. You can
also provide the decorator with parameters, like ``numba.jit(nogit=True)``.

- nopython (compile the function in nopython mode)
- nogil (release the GIL inside the JIT compiled function)
- parallel (try to apply the function in parallel over the DataFrame)
Not all functions can be executed with all execution engines. In general,
JIT compilers will require type stability in the function (no variable
should change data type during the execution). And not all pandas and
NumPy APIs are supported. Check the engine documentation [1]_ and [2]_
for limitations.

Note: Due to limitations within numba/how pandas interfaces with numba,
you should only use this if raw=True

Note: The numba compiler only supports a subset of
valid Python/numpy operations.
.. warning::

Please read more about the `supported python features
<https://numba.pydata.org/numba-doc/dev/reference/pysupported.html>`_
and `supported numpy features
<https://numba.pydata.org/numba-doc/dev/reference/numpysupported.html>`_
in numba to learn what you can or cannot use in the passed function.
String parameters will stop being supported in a future pandas version.

.. versionadded:: 2.2.0

engine_kwargs : dict
Pass keyword arguments to the engine.
This is currently only used by the numba engine,
see the documentation for the engine argument for more information.

**kwargs
Additional keyword arguments to pass as keywords arguments to
`func`.
Expand All @@ -10390,6 +10387,13 @@ def apply(
behavior or errors and are not supported. See :ref:`gotchas.udf-mutation`
for more details.

References
----------
.. [1] `Numba documentation
<https://numba.readthedocs.io/en/stable/index.html>`_
.. [2] `Bodo documentation
<https://docs.bodo.ai/latest/>`/

Examples
--------
>>> df = pd.DataFrame([[4, 9]] * 3, columns=["A", "B"])
Expand Down Expand Up @@ -10458,22 +10462,99 @@ def apply(
0 1 2
1 1 2
2 1 2

Advanced users can speed up their code by using a Just-in-time (JIT) compiler
with ``apply``. The main JIT compilers available for pandas are Numba and Bodo.
In general, JIT compilation is only possible when the function passed to
``apply`` has type stability (variables in the function do not change their
type during the execution).

>>> import bodo
>>> df.apply(lambda x: x.A + x.B, axis=1, engine=bodo.jit)

Note that JIT compilation is only recommended for functions that take a
significant amount of time to run. Fast functions are unlikely to run faster
with JIT compilation.
"""
from pandas.core.apply import frame_apply
if engine is None or isinstance(engine, str):
from pandas.core.apply import frame_apply

op = frame_apply(
self,
func=func,
axis=axis,
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
return op.apply().__finalize__(self, method="apply")
if engine is None:
engine = "python"

if engine not in ["python", "numba"]:
raise ValueError(f"Unknown engine '{engine}'")

op = frame_apply(
self,
func=func,
axis=axis,
raw=raw,
result_type=result_type,
by_row=by_row,
engine=engine,
engine_kwargs=engine_kwargs,
args=args,
kwargs=kwargs,
)
return op.apply().__finalize__(self, method="apply")
elif hasattr(engine, "__pandas_udf__"):
if result_type is not None:
raise NotImplementedError(
f"{result_type=} only implemented for the default engine"
)

agg_axis = self._get_agg_axis(self._get_axis_number(axis))

# one axis is empty
if not all(self.shape):
func = cast(Callable, func)
try:
if axis == 0:
r = func(Series([], dtype=np.float64), *args, **kwargs)
else:
r = func(
Series(index=self.columns, dtype=np.float64),
*args,
**kwargs,
)
except Exception:
pass
else:
if not isinstance(r, Series):
if len(agg_axis):
r = func(Series([], dtype=np.float64), *args, **kwargs)
else:
r = np.nan

return self._constructor_sliced(r, index=agg_axis)
return self.copy()

data: DataFrame | np.ndarray = self
if raw:
# This will upcast the whole DataFrame to the same type,
# and likely result in an object 2D array.
# We should probably pass a list of 1D arrays instead, at
# lest for ``axis=0``
data = self.values

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Numba doesn't support heterogeneous lists but does support heterogeneous tuples. I would say that it is so important to avoid 2D object arrays here that it should go in the first revision.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, how is the mapping from column name in "func" to index in the tuple supposed to happen?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comments, and for catching the typos.

Fully agree with your first comment. I don't want to change the existing behavior in the same PR I'm implementing a new interface, but converting the whole dataframe to a single type doesn't seem a great option. The raw parameter and this behavior is as old as pandas, I don't think we would design it this way as of today. It won't still be trivial, as passing Series is easy, but the internal data can be different things, in the simple case a single NumPy array, but also other structured such as an Arrow column, two NumPy arrays... I don't think it's trivial to decide how to pass the data in those cases.

Also, how is the mapping from column name in "func" to index in the tuple supposed to happen?

If I understand you correctly, this is great question, but also a bit tricky. For the simple case, the function is called for every column. In the case the same function needs to be applied to every column, I don't think there is an issue. If the function receives Series (raw=False), then this can be done:

def func(series):
    if series.name == "column_1":
        return series.str.upper()
    return series.str.lower()

But when raw=True, in general the input of the function will be a numpy array, and it's not known which was the name of the column or the index. Nothing in this PR is preventing the engines from passing extra parameters to the function, but I don't think we should encourage that behavior, since users will in general expect that changing the value of engine doesn't require changing the function, and in particular their signature.

This is when a column at a time is passed. When axis=1, a row at a time is passed. When raw=False, then a Series is passed, which again is not ideal, since an upcast to probably object will happen. But this is how it works now. For raw=True and Numpy objects, the case it's exactly the same as before, just the axis changes.

I think historical reasons made the signatures or .map() and .apply() functions inconsistent, and in some cases too complex and with strange behaviors. There have been several improvements, like removing applymap for example. But several other things can be improved. I personally think the internal API here makes things simpler, and public APIs should follow the same idea. But this will take several iterations, and I think we don't want to make this PR too complex with any of the many possible improvements. I don't think you are proposing it, I really appreciate your feedback and question. Just commenting how I think we should move forward, given the amount of technical debt and complexity.

result = engine.__pandas_udf__.apply(
data=data,
func=func,
args=args,
kwargs=kwargs,
decorator=engine,
axis=axis,
)
if raw:
if result.ndim == 2:
return self._constructor(
result, index=self.index, columns=self.columns
)
else:
return self._constructor_sliced(result, index=agg_axis)
return result
else:
raise ValueError(f"Unknown engine {engine}")

def map(
self, func: PythonFuncType, na_action: Literal["ignore"] | None = None, **kwargs
Expand Down Expand Up @@ -10590,9 +10671,11 @@ def _append(

index = Index(
[other.name],
name=self.index.names
if isinstance(self.index, MultiIndex)
else self.index.name,
name=(
self.index.names
if isinstance(self.index, MultiIndex)
else self.index.name
),
)
row_df = other.to_frame().T
# infer_objects is needed for
Expand Down
6 changes: 6 additions & 0 deletions pandas/tests/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pandas import api
import pandas._testing as tm
from pandas.api import (
executors as api_executors,
extensions as api_extensions,
indexers as api_indexers,
interchange as api_interchange,
Expand Down Expand Up @@ -243,6 +244,7 @@ def test_depr(self):

class TestApi(Base):
allowed_api_dirs = [
"executors",
"types",
"extensions",
"indexers",
Expand Down Expand Up @@ -338,6 +340,7 @@ class TestApi(Base):
"ExtensionArray",
"ExtensionScalarOpsMixin",
]
allowed_api_executors = ["BaseExecutionEngine"]

def test_api(self):
self.check(api, self.allowed_api_dirs)
Expand All @@ -357,6 +360,9 @@ def test_api_indexers(self):
def test_api_extensions(self):
self.check(api_extensions, self.allowed_api_extensions)

def test_api_executors(self):
self.check(api_executors, self.allowed_api_executors)


class TestErrors(Base):
def test_errors(self):
Expand Down
Loading
Loading