Skip to content

Spark expr replace strict #2254

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion narwhals/_compliant/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,6 @@ class LazyExpr( # type: ignore[misc]
map_batches: not_implemented = not_implemented()
ewm_mean: not_implemented = not_implemented()
gather_every: not_implemented = not_implemented()
replace_strict: not_implemented = not_implemented()
cat: not_implemented = not_implemented() # pyright: ignore[reportAssignmentType]

@classmethod
Expand Down
1 change: 1 addition & 0 deletions narwhals/_dask/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,4 @@ def dt(self) -> DaskExprDateTimeNamespace:
list = not_implemented() # pyright: ignore[reportAssignmentType]
struct = not_implemented() # pyright: ignore[reportAssignmentType]
rank = not_implemented() # pyright: ignore[reportAssignmentType]
replace_strict = not_implemented()
1 change: 1 addition & 0 deletions narwhals/_duckdb/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -757,3 +757,4 @@ def struct(self) -> DuckDBExprStructNamespace:

drop_nulls = not_implemented()
unique = not_implemented()
replace_strict = not_implemented()
61 changes: 61 additions & 0 deletions narwhals/_spark_like/expr.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import annotations

import operator
from itertools import chain
from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Literal
from typing import Mapping
from typing import Sequence
from typing import cast

Expand Down Expand Up @@ -560,6 +562,65 @@ def _len(_input: Column) -> Column:

return self._with_callable(_len)

def replace_strict(
self,
old: Sequence[Any] | Mapping[Any, Any],
new: Sequence[Any],
*,
return_dtype: DType | type[DType] | None,
) -> Self:
mapping = old if isinstance(old, Mapping) else dict(zip(old, new))

mapping_keys = list(mapping.keys())

# Create an array of all valid keys for our IN check
# Note: None/null handling is special in Spark - we'll handle it separately
non_null_keys = [k for k in mapping_keys if k is not None]
has_null_key = None in mapping_keys

mapping_expr = self._F.create_map(
[self._F.lit(x) for x in chain(*mapping.items())]
)

def _replace_strict(_input: Column) -> Column:
validation_expr = (
self._F.when(
_input.isNull() & self._F.lit(has_null_key),
self._F.lit(True), # noqa: FBT003
)
.when(_input.isNull() & ~self._F.lit(has_null_key), self._F.lit(False)) # noqa: FBT003
.otherwise(
self._F.array_contains(
self._F.array([self._F.lit(k) for k in non_null_keys]), _input
)
)
)
Comment on lines +586 to +597
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might need an extra look at this


mapped_col = (
mapping_expr[_input]
if self._implementation.is_pyspark()
else mapping_expr.getItem(_input)
)
Comment on lines +599 to +603
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PySpark raises a warning for mapping_expr.getItem(_input), while SQLFrame raises an exception for mapping_expr[_input] (Column is not callable)


try:
result = self._F.when(validation_expr, mapped_col).otherwise(
self._F.assert_true(self._F.lit(False)) # noqa: FBT003
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLFrame does not have assert_true for duckdb backend, that's why I am still xfailing it in tests

)
except Exception as exc:
msg = "replace_strict did not replace all non-null values."
raise ValueError(msg) from exc
Comment on lines +609 to +611
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I am not really able to capture any spark exception at this level


if return_dtype is not None:
result = result.cast(
narwhals_to_native_dtype(
return_dtype, self._version, self._native_dtypes
)
)

return result

return self._with_callable(_replace_strict)

def round(self, decimals: int) -> Self:
def _round(_input: Column) -> Column:
return self._F.round(_input, decimals)
Expand Down
6 changes: 3 additions & 3 deletions tests/expr_and_series/replace_strict_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def test_replace_strict(
) -> None:
if "dask" in str(constructor):
request.applymarker(pytest.mark.xfail)
if ("pyspark" in str(constructor)) or "duckdb" in str(constructor):
if ("sqlframe" in str(constructor)) or "duckdb" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = nw.from_native(constructor({"a": [1, 2, 3]}))
result = df.select(
Expand Down Expand Up @@ -59,7 +59,7 @@ def test_replace_non_full(
) -> None:
if "dask" in str(constructor):
request.applymarker(pytest.mark.xfail)
if ("pyspark" in str(constructor)) or "duckdb" in str(constructor):
if "duckdb" in str(constructor):
request.applymarker(pytest.mark.xfail)
df = nw.from_native(constructor({"a": [1, 2, 3]}))
if isinstance(df, nw.LazyFrame):
Expand All @@ -80,7 +80,7 @@ def test_replace_strict_mapping(
) -> None:
if "dask" in str(constructor):
request.applymarker(pytest.mark.xfail)
if ("pyspark" in str(constructor)) or "duckdb" in str(constructor):
if "duckdb" in str(constructor):
request.applymarker(pytest.mark.xfail)

df = nw.from_native(constructor({"a": [1, 2, 3]}))
Expand Down
Loading