|
| 1 | +"""complete implementation for polars.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +from typing import Any |
| 6 | + |
| 7 | +from janitor.utils import check, import_message |
| 8 | + |
| 9 | +try: |
| 10 | + import polars as pl |
| 11 | + import polars.selectors as cs |
| 12 | + from polars.type_aliases import ColumnNameOrSelector |
| 13 | +except ImportError: |
| 14 | + import_message( |
| 15 | + submodule="polars", |
| 16 | + package="polars", |
| 17 | + conda_channel="conda-forge", |
| 18 | + pip_install=True, |
| 19 | + ) |
| 20 | + |
| 21 | + |
| 22 | +def _complete( |
| 23 | + df: pl.DataFrame | pl.LazyFrame, |
| 24 | + columns: tuple[ColumnNameOrSelector], |
| 25 | + fill_value: dict | Any | pl.Expr, |
| 26 | + explicit: bool, |
| 27 | + sort: bool, |
| 28 | + by: ColumnNameOrSelector, |
| 29 | +) -> pl.DataFrame | pl.LazyFrame: |
| 30 | + """ |
| 31 | + This function computes the final output for the `complete` function. |
| 32 | +
|
| 33 | + A DataFrame, with rows of missing values, if any, is returned. |
| 34 | + """ |
| 35 | + if not columns: |
| 36 | + return df |
| 37 | + |
| 38 | + check("sort", sort, [bool]) |
| 39 | + check("explicit", explicit, [bool]) |
| 40 | + _columns = [] |
| 41 | + for column in columns: |
| 42 | + if isinstance(column, str): |
| 43 | + col = pl.col(column).unique() |
| 44 | + if sort: |
| 45 | + col = col.sort() |
| 46 | + _columns.append(col) |
| 47 | + elif cs.is_selector(column): |
| 48 | + col = column.as_expr().unique() |
| 49 | + if sort: |
| 50 | + col = col.sort() |
| 51 | + _columns.append(col) |
| 52 | + elif isinstance(column, pl.Expr): |
| 53 | + _columns.append(column) |
| 54 | + else: |
| 55 | + raise TypeError( |
| 56 | + f"The argument passed to the columns parameter " |
| 57 | + "should either be a string, a column selector, " |
| 58 | + "or a polars expression, instead got - " |
| 59 | + f"{type(column)}." |
| 60 | + ) |
| 61 | + by_does_not_exist = by is None |
| 62 | + if by_does_not_exist: |
| 63 | + _columns = [column.implode() for column in _columns] |
| 64 | + uniques = df.select(_columns) |
| 65 | + _columns = uniques.columns |
| 66 | + else: |
| 67 | + uniques = df.group_by(by, maintain_order=sort).agg(_columns) |
| 68 | + _by = uniques.select(by).columns |
| 69 | + _columns = uniques.select(pl.exclude(_by)).columns |
| 70 | + for column in _columns: |
| 71 | + uniques = uniques.explode(column) |
| 72 | + |
| 73 | + _columns = [ |
| 74 | + column |
| 75 | + for column, dtype in zip(_columns, uniques.select(_columns).dtypes) |
| 76 | + # this way we ensure there is no tampering with existing struct columns |
| 77 | + if (dtype == pl.Struct) and (column not in df.columns) |
| 78 | + ] |
| 79 | + |
| 80 | + if _columns: |
| 81 | + for column in _columns: |
| 82 | + uniques = uniques.unnest(columns=column) |
| 83 | + |
| 84 | + if fill_value is None: |
| 85 | + return uniques.join(df, on=uniques.columns, how="full", coalesce=True) |
| 86 | + idx = None |
| 87 | + columns_to_select = df.columns |
| 88 | + if not explicit: |
| 89 | + idx = "".join(df.columns) |
| 90 | + df = df.with_row_index(name=idx) |
| 91 | + df = uniques.join(df, on=uniques.columns, how="full", coalesce=True) |
| 92 | + # exclude columns that were not used |
| 93 | + # to generate the combinations |
| 94 | + exclude_columns = uniques.columns |
| 95 | + if idx: |
| 96 | + exclude_columns.append(idx) |
| 97 | + expression = pl.exclude(exclude_columns).is_null().any() |
| 98 | + booleans = df.select(expression) |
| 99 | + if isinstance(booleans, pl.LazyFrame): |
| 100 | + booleans = booleans.collect() |
| 101 | + _columns = [ |
| 102 | + column |
| 103 | + for column in booleans.columns |
| 104 | + if booleans.get_column(column).item() |
| 105 | + ] |
| 106 | + if _columns and isinstance(fill_value, dict): |
| 107 | + fill_value = [ |
| 108 | + pl.col(column_name).fill_null(value=value) |
| 109 | + for column_name, value in fill_value.items() |
| 110 | + if column_name in _columns |
| 111 | + ] |
| 112 | + elif _columns: |
| 113 | + fill_value = [ |
| 114 | + pl.col(column).fill_null(value=fill_value) for column in _columns |
| 115 | + ] |
| 116 | + if _columns and not explicit: |
| 117 | + condition = pl.col(idx).is_null() |
| 118 | + fill_value = [ |
| 119 | + pl.when(condition).then(_fill_value).otherwise(pl.col(column_name)) |
| 120 | + for column_name, _fill_value in zip(_columns, fill_value) |
| 121 | + ] |
| 122 | + if _columns: |
| 123 | + df = df.with_columns(fill_value) |
| 124 | + |
| 125 | + return df.select(columns_to_select) |
0 commit comments