Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(duckdb): return null typed pyarrow arrays and disable creating tables with all null columns in duckdb #9810

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,12 @@ def _normalize_external_tables(self, external_tables=None) -> ExternalData | Non
n += 1
if not (schema := obj.schema):
raise TypeError(f"Schema is empty for external table {name}")
if null_fields := schema.null_fields:
raise com.IbisTypeError(
"ClickHouse doesn't support NULL-typed fields. "
"Consider assigning a type through casting or on construction. "
f"Got null typed fields: {null_fields}"
)

structure = [
f"{name} {type_mapper.to_string(typ.copy(nullable=not typ.is_nested()))}"
Expand Down
25 changes: 16 additions & 9 deletions ibis/backends/duckdb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import ibis.expr.types as ir
from ibis import util
from ibis.backends import CanCreateDatabase, UrlFromPath
from ibis.backends.duckdb.converter import DuckDBPandasData
from ibis.backends.duckdb.converter import DuckDBPandasData, DuckDBPyArrowData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers.base import STAR, AlterTable, C, RenameTable
from ibis.common.dispatch import lazy_singledispatch
Expand Down Expand Up @@ -148,8 +148,6 @@ def create_table(

if obj is None and schema is None:
raise ValueError("Either `obj` or `schema` must be specified")
if schema is not None:
schema = ibis.schema(schema)

quoted = self.compiler.quoted
dialect = self.dialect
Expand All @@ -172,16 +170,25 @@ def create_table(
else:
query = None

if schema is None:
schema = table.schema()
else:
schema = ibis.schema(schema)

if null_fields := schema.null_fields:
raise exc.IbisTypeError(
"DuckDB does not support creating tables with NULL typed columns. "
"Ensure that every column has non-NULL type. "
f"NULL columns: {null_fields}"
)

if overwrite:
temp_name = util.gen_name("duckdb_table")
else:
temp_name = name

initial_table = sg.table(temp_name, catalog=catalog, db=database, quoted=quoted)
target = sge.Schema(
this=initial_table,
expressions=(schema or table.schema()).to_sqlglot(dialect),
)
target = sge.Schema(this=initial_table, expressions=schema.to_sqlglot(dialect))

create_stmt = sge.Create(
kind="TABLE",
Expand Down Expand Up @@ -252,7 +259,7 @@ def table(self, name: str, database: str | None = None) -> ir.Table:

table_schema = self.get_schema(name, catalog=catalog, database=database)
# load geospatial only if geo columns
if any(typ.is_geospatial() for typ in table_schema.types):
if table_schema.geospatial:
self.load_extension("spatial")
return ops.DatabaseTable(
name,
Expand Down Expand Up @@ -1302,7 +1309,7 @@ def to_pyarrow(
**_: Any,
) -> pa.Table:
table = self._to_duckdb_relation(expr, params=params, limit=limit).arrow()
return expr.__pyarrow_result__(table)
return expr.__pyarrow_result__(table, data_mapper=DuckDBPyArrowData)

def execute(
self,
Expand Down
22 changes: 22 additions & 0 deletions ibis/backends/duckdb/converter.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,31 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import pyarrow as pa

from ibis.formats.pandas import PandasData
from ibis.formats.pyarrow import PyArrowData

if TYPE_CHECKING:
import ibis.expr.datatypes as dt


class DuckDBPandasData(PandasData):
@staticmethod
def convert_Array(s, dtype, pandas_type):
return s.replace(float("nan"), None)


class DuckDBPyArrowData(PyArrowData):
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
@classmethod
def convert_scalar(cls, scalar: pa.Scalar, dtype: dt.DataType) -> pa.Scalar:
if dtype.is_null():
return pa.scalar(None)
return super().convert_scalar(scalar, dtype)

@classmethod
def convert_column(cls, column: pa.Array, dtype: dt.DataType) -> pa.Array:
if dtype.is_null():
return pa.nulls(len(column))
return super().convert_column(column, dtype)
23 changes: 23 additions & 0 deletions ibis/backends/duckdb/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pytest import param

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis.conftest import LINUX, SANDBOXED, not_windows
from ibis.util import gen_name
Expand Down Expand Up @@ -442,3 +443,25 @@ def test_pyarrow_batches_chunk_size(con): # 10443
batches = con.to_pyarrow_batches(t, chunk_size=-1)
with pytest.raises(TypeError):
next(batches)


@pytest.mark.parametrize(
"kwargs",
[
dict(obj=ibis.memtable({"a": [None]})),
dict(obj=ibis.memtable({"a": [None]}), schema=ibis.schema({"a": "null"})),
dict(schema=ibis.schema({"a": "null"})),
],
ids=["obj", "obj-schema", "schema"],
)
def test_create_table_with_nulls(con, kwargs):
t = ibis.memtable({"a": [None]})
schema = t.schema()

assert schema == ibis.schema({"a": "null"})
assert schema.null_fields == ("a",)

name = gen_name("duckdb_all_nulls")

with pytest.raises(com.IbisTypeError, match="NULL typed columns"):
con.create_table(name, **kwargs)
2 changes: 1 addition & 1 deletion ibis/backends/exasol/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Exasol cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
17 changes: 13 additions & 4 deletions ibis/backends/flink/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,13 +375,22 @@ def compile(
expr, params=params, pretty=pretty
) # Discard `limit` and other kwargs.

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
if null_columns := op.schema.null_fields:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)
self.create_view(op.name, op.data.to_frame(), schema=op.schema, temp=True)

def _finalize_memtable(self, name: str) -> None:
self.drop_view(name, temp=True, force=True)

def execute(self, expr: ir.Expr, **kwargs: Any) -> Any:
"""Execute an expression."""
self._verify_in_memory_tables_are_unique(expr)
self._register_udfs(expr)
self._run_pre_execute_hooks(expr)

table_expr = expr.as_table()
sql = self.compile(table_expr, **kwargs)
sql = self.compile(expr.as_table(), **kwargs)
df = self._table_env.sql_query(sql).to_pandas()

return expr.__pandas_result__(df)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/impala/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ def explain(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Impala cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"MS SQL cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/mysql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"MySQL cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
5 changes: 5 additions & 0 deletions ibis/backends/oracle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,11 @@ def drop_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := schema.null_fields:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)

name = op.name
quoted = self.compiler.quoted
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def _from_url(self, url: ParseResult, **kwargs):

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def create_table(

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
18 changes: 10 additions & 8 deletions ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,9 @@ def test_array_intersect(con, data):
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notyet(["athena"], raises=PyAthenaDatabaseError)
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_unnest_struct(con):
data = {"value": [[{"a": 1}, {"a": 2}], [{"a": 3}, {"a": 4}]]}
t = ibis.memtable(data, schema=ibis.schema({"value": "!array<!struct<a: !int>>"}))
Expand All @@ -1120,8 +1123,8 @@ def test_unnest_struct(con):
@pytest.mark.notimpl(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notimpl(
["flink"], reason="flink unnests a and b as separate columns", raises=Py4JJavaError
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
@pytest.mark.notyet(["athena"], raises=PyAthenaDatabaseError)
def test_unnest_struct_with_multiple_fields(con):
Expand Down Expand Up @@ -1229,9 +1232,7 @@ def test_zip_null(con, fn):
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
)
@pytest.mark.notyet(
["flink"],
raises=Py4JJavaError,
reason="does not seem to support field selection on unnest",
["flink"], raises=ValueError, reason="array of struct is not supported"
)
@pytest.mark.notyet(["athena"], raises=PyAthenaOperationalError)
def test_array_of_struct_unnest(con):
Expand Down Expand Up @@ -1765,16 +1766,17 @@ def test_table_unnest_column_expr(backend):
assert set(result.values) == set(expected.replace({np.nan: None}).values)


@pytest.mark.notimpl(
["datafusion", "polars", "flink"], raises=com.OperationNotDefinedError
)
@pytest.mark.notimpl(["datafusion", "polars"], raises=com.OperationNotDefinedError)
@pytest.mark.notimpl(["trino"], raises=TrinoUserError)
@pytest.mark.notimpl(["athena"], raises=PyAthenaOperationalError)
@pytest.mark.notimpl(["postgres"], raises=PsycoPg2SyntaxError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPg2ProgrammingError)
@pytest.mark.notyet(
["risingwave"], raises=PsycoPg2InternalError, reason="not supported in risingwave"
)
@pytest.mark.notyet(
["flink"], raises=ValueError, reason="array of struct is not supported"
)
def test_table_unnest_array_of_struct_of_array(con):
t = ibis.memtable(
{
Expand Down
6 changes: 2 additions & 4 deletions ibis/backends/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1773,9 +1773,7 @@ def test_insert_into_table_missing_columns(con, temp_table):

@pytest.mark.notyet(["druid"], raises=AssertionError, reason="can't drop tables")
@pytest.mark.notyet(
["clickhouse", "flink"],
raises=AssertionError,
reason="memtables are assembled every time",
["clickhouse"], raises=AssertionError, reason="memtables are assembled every time"
)
@pytest.mark.notyet(
["bigquery"], raises=AssertionError, reason="test is flaky", strict=False
Expand Down Expand Up @@ -1821,7 +1819,7 @@ def test_same_name_memtable_is_overwritten(con):


@pytest.mark.notimpl(
["clickhouse", "flink"],
["clickhouse"],
raises=AssertionError,
reason="backend doesn't use _register_in_memory_table",
)
Expand Down
43 changes: 43 additions & 0 deletions ibis/backends/tests/test_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from pytest import param

import ibis
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
from ibis import util
from ibis.backends.tests.errors import (
Expand All @@ -17,6 +18,7 @@
ExaQueryError,
MySQLOperationalError,
OracleDatabaseError,
Py4JJavaError,
PyAthenaOperationalError,
PyDeltaTableError,
PyDruidProgrammingError,
Expand All @@ -30,6 +32,7 @@

pd = pytest.importorskip("pandas")
pa = pytest.importorskip("pyarrow")
pat = pytest.importorskip("pyarrow.types")
cpcloud marked this conversation as resolved.
Show resolved Hide resolved

limit = [param(42, id="limit")]

Expand Down Expand Up @@ -661,4 +664,44 @@ def test_scalar_to_memory(limit, awards_players, output_format, converter):

expr = awards_players.filter(awards_players.awardID == "DEADBEEF").yearID.min()
res = method(expr)

assert converter(res) is None


mark_notyet_nulls = pytest.mark.notyet(
[
"clickhouse",
"exasol",
"flink",
"impala",
"mssql",
"mysql",
"oracle",
"postgres",
"risingwave",
"trino",
],
raises=com.IbisTypeError,
reason="unable to handle null types as input",
)


@mark_notyet_nulls
def test_all_null_table(con):
t = ibis.memtable({"a": [None]})
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
result = con.to_pyarrow(t)
assert pat.is_null(result["a"].type)


@mark_notyet_nulls
def test_all_null_column(con):
t = ibis.memtable({"a": [None]})
result = con.to_pyarrow(t.a)
assert pat.is_null(result.type)


@pytest.mark.notyet(["flink"], raises=Py4JJavaError)
def test_all_null_scalar(con):
e = ibis.literal(None)
result = con.to_pyarrow(e)
assert pat.is_null(result.type)
1 change: 1 addition & 0 deletions ibis/backends/tests/test_numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,6 +1629,7 @@ def test_scalar_round_is_integer(con):
],
)
@pytest.mark.notyet(["exasol"], raises=ExaQueryError)
@pytest.mark.notimpl(["flink"], raises=NotImplementedError)
def test_memtable_decimal(con, numbers):
schema = ibis.schema(dict(numbers=dt.Decimal(38, 9)))
t = ibis.memtable({"numbers": numbers}, schema=schema)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/trino/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
if null_columns := schema.null_fields:
raise com.IbisTypeError(
"Trino cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
Expand Down
Loading
Loading