Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(postgres): use psycopg rather than psycopg2 #10659

Merged
merged 8 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/renovate.json
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
},
{
"addLabels": ["postgres"],
"matchPackageNames": ["/psycopg2/", "/postgres/"]
"matchPackageNames": ["/psycopg/", "/postgres/"]
},
{
"addLabels": ["druid"],
Expand All @@ -89,7 +89,7 @@
},
{
"addLabels": ["risingwave"],
"matchPackageNames": ["/risingwave/"]
"matchPackageNames": ["/psycopg2/", "/risingwave/"]
},
{
"addLabels": ["snowflake"],
Expand Down
3 changes: 2 additions & 1 deletion conda/environment-arm64-flink.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ dependencies:
- pins >=0.8.2
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >=2.8.4
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow =11.0.0
- pyarrow-tests
- pyarrow-hotfix >=0.4
Expand Down
3 changes: 2 additions & 1 deletion conda/environment-arm64.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ dependencies:
- pins >=0.8.2
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >=2.8.4
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow >=10.0.1
- pyarrow-tests
- pyarrow-hotfix >=0.4
Expand Down
3 changes: 2 additions & 1 deletion conda/environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ dependencies:
- pip
- uv>=0.4.29
- polars >=1,<2
- psycopg2 >=2.8.4
- psycopg2 >= 2.8.4
- psycopg >= 3.2.0
- pyarrow >=10.0.1
- pyarrow-hotfix >=0.4
- pydata-google-auth
Expand Down
47 changes: 24 additions & 23 deletions ibis/backends/postgres/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

import pandas as pd
import polars as pl
import psycopg2
import psycopg
import pyarrow as pa


Expand Down Expand Up @@ -90,8 +90,6 @@ def _from_url(self, url: ParseResult, **kwargs):
return self.connect(**kwargs)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
from psycopg2.extras import execute_batch

schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
raise exc.IbisTypeError(
Expand Down Expand Up @@ -129,7 +127,7 @@ def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:

with self.begin() as cur:
cur.execute(create_stmt_sql)
execute_batch(cur, sql, data, 128)
cur.executemany(sql, data)

@contextlib.contextmanager
def begin(self):
Expand All @@ -145,14 +143,16 @@ def begin(self):
finally:
cursor.close()

def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:
def _fetch_from_cursor(
self, cursor: psycopg.Cursor, schema: sch.Schema
) -> pd.DataFrame:
import pandas as pd

from ibis.backends.postgres.converter import PostgresPandasData

try:
df = pd.DataFrame.from_records(
cursor, columns=schema.names, coerce_float=True
cursor.fetchall(), columns=schema.names, coerce_float=True
)
except Exception:
# clean up the cursor if we fail to create the DataFrame
Expand All @@ -166,7 +166,7 @@ def _fetch_from_cursor(self, cursor, schema: sch.Schema) -> pd.DataFrame:

@property
def version(self):
version = f"{self.con.server_version:0>6}"
version = f"{self.con.info.server_version:0>6}"
major = int(version[:2])
minor = int(version[2:4])
patch = int(version[4:])
Expand Down Expand Up @@ -233,17 +233,17 @@ def do_connect(
year int32
month int32
"""
import psycopg2
import psycopg2.extras
import psycopg
import psycopg.types.json

psycopg2.extras.register_default_json(loads=lambda x: x)
psycopg.types.json.set_json_loads(loads=lambda x: x)

self.con = psycopg2.connect(
self.con = psycopg.connect(
host=host,
port=port,
user=user,
password=password,
database=database,
dbname=database,
options=(f"-csearch_path={schema}" * (schema is not None)) or None,
**kwargs,
)
Expand All @@ -252,7 +252,7 @@ def do_connect(

@util.experimental
@classmethod
def from_connection(cls, con: psycopg2.extensions.connection) -> Backend:
def from_connection(cls, con: psycopg.Connection) -> Backend:
"""Create an Ibis client from an existing connection to a PostgreSQL database.

Parameters
Expand Down Expand Up @@ -701,8 +701,9 @@ def _safe_raw_sql(self, *args, **kwargs):
yield result

def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
import psycopg2
import psycopg2.extras
import psycopg
import psycopg.types
import psycopg.types.hstore

with contextlib.suppress(AttributeError):
query = query.sql(dialect=self.dialect)
Expand All @@ -711,16 +712,16 @@ def raw_sql(self, query: str | sg.Expression, **kwargs: Any) -> Any:
cursor = con.cursor()

try:
# try to load hstore, uuid and ipaddress extensions
with contextlib.suppress(psycopg2.ProgrammingError):
psycopg2.extras.register_hstore(cursor)
with contextlib.suppress(psycopg2.ProgrammingError):
psycopg2.extras.register_uuid(conn_or_curs=cursor)
with contextlib.suppress(psycopg2.ProgrammingError):
psycopg2.extras.register_ipaddress(cursor)
except Exception:
# try to load hstore
psycopg.types.hstore.register_hstore(
psycopg.types.TypeInfo.fetch(con, "hstore"),
cursor,
)
except (psycopg.InternalError, psycopg.ProgrammingError):
cursor.close()
raise
except TypeError:
pass

try:
cursor.execute(query, **kwargs)
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class TestConf(ServiceBackendTest):
supports_structs = False
rounding_method = "half_to_even"
service_name = "postgres"
deps = ("psycopg2",)
deps = ("psycopg",)

driver_supports_multiple_statements = True

Expand Down
8 changes: 4 additions & 4 deletions ibis/backends/postgres/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import ibis.common.exceptions as com
import ibis.expr.datatypes as dt
import ibis.expr.types as ir
from ibis.backends.tests.errors import PsycoPg2OperationalError
from ibis.backends.tests.errors import PsycoPgOperationalError
from ibis.util import gen_name

pytest.importorskip("psycopg2")
pytest.importorskip("psycopg")

POSTGRES_TEST_DB = os.environ.get("IBIS_TEST_POSTGRES_DATABASE", "ibis_testing")
IBIS_POSTGRES_HOST = os.environ.get("IBIS_TEST_POSTGRES_HOST", "localhost")
Expand Down Expand Up @@ -260,7 +260,7 @@ def test_kwargs_passthrough_in_connect():

def test_port():
# check that we parse and use the port (and then of course fail cuz it's bogus)
with pytest.raises(PsycoPg2OperationalError):
with pytest.raises(PsycoPgOperationalError):
ibis.connect("postgresql://postgres:postgres@localhost:1337/ibis_testing")


Expand Down Expand Up @@ -388,7 +388,7 @@ def test_password_with_bracket():
quoted_pass = quote_plus(password)
url = f"postgres://{IBIS_POSTGRES_USER}:{quoted_pass}@{IBIS_POSTGRES_HOST}:{IBIS_POSTGRES_PORT}/{POSTGRES_TEST_DB}"
with pytest.raises(
PsycoPg2OperationalError,
PsycoPgOperationalError,
match=f'password authentication failed for user "{IBIS_POSTGRES_USER}"',
):
ibis.connect(url)
Expand Down
6 changes: 3 additions & 3 deletions ibis/backends/postgres/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import ibis.expr.types as ir
from ibis import literal as L

pytest.importorskip("psycopg2")
pytest.importorskip("psycopg")


@pytest.mark.parametrize(
Expand Down Expand Up @@ -1195,7 +1195,7 @@ def test_string_to_binary_cast(con):
)
with con.begin() as c:
c.execute(sql_string)
raw_data = [row[0][0] for row in c.fetchall()]
raw_data = [row[0] for row in c.fetchall()]
expected = pd.Series(raw_data, name=name)
tm.assert_series_equal(result, expected)

Expand All @@ -1212,7 +1212,7 @@ def test_string_to_binary_round_trip(con):
)
with con.begin() as c:
c.execute(sql_string)
rows = [row[0] for (row,) in c.fetchall()]
rows = [row[0] for row in c.fetchall()]
expected = pd.Series(rows, name=name)
tm.assert_series_equal(result, expected)

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/tests/test_postgis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from numpy import testing

pytest.importorskip("psycopg2")
pytest.importorskip("psycopg")
gpd = pytest.importorskip("geopandas")
pytest.importorskip("shapely")

Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/postgres/tests/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from ibis import udf
from ibis.util import guid

pytest.importorskip("psycopg2")
pytest.importorskip("psycopg")


@pytest.fixture(scope="session")
Expand Down
43 changes: 0 additions & 43 deletions ibis/backends/risingwave/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import psycopg2
import sqlglot as sg
import sqlglot.expressions as sge
from pandas.api.types import is_float_dtype
from psycopg2 import extras

import ibis
Expand Down Expand Up @@ -106,48 +105,6 @@ def _from_url(self, url: ParseResult, **kwargs):

return self.connect(**kwargs)

def _register_in_memory_table(self, op: ops.InMemoryTable) -> None:
Copy link
Member

@cpcloud cpcloud Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was a duplicated definition not caught by ruff that I introduced in #10669.

from psycopg2.extras import execute_batch

schema = op.schema
if null_columns := [col for col, dtype in schema.items() if dtype.is_null()]:
raise exc.IbisTypeError(
f"{self.name} cannot yet reliably handle `null` typed columns; "
f"got null typed columns: {null_columns}"
)

name = op.name
quoted = self.compiler.quoted
create_stmt = sg.exp.Create(
kind="TABLE",
this=sg.exp.Schema(
this=sg.to_identifier(name, quoted=quoted),
expressions=schema.to_sqlglot(self.dialect),
),
properties=sg.exp.Properties(expressions=[sge.TemporaryProperty()]),
)
create_stmt_sql = create_stmt.sql(self.dialect)

df = op.data.to_frame()
# nan gets compiled into 'NaN'::float which throws errors in non-float columns
# In order to hold NaN values, pandas automatically converts integer columns
# to float columns if there are NaN values in them. Therefore, we need to convert
# them to their original dtypes (that support pd.NA) to figure out which columns
# are actually non-float, then fill the NaN values in those columns with None.
convert_df = df.convert_dtypes()
for col in convert_df.columns:
if not is_float_dtype(convert_df[col]):
df[col] = df[col].replace(float("nan"), None)

data = df.itertuples(index=False)
sql = self._build_insert_template(
name, schema=schema, columns=True, placeholder="%s"
)

with self.begin() as cur:
cur.execute(create_stmt_sql)
execute_batch(cur, sql, data, 128)

@contextlib.contextmanager
def begin(self):
con = self.con
Expand Down
19 changes: 19 additions & 0 deletions ibis/backends/tests/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,25 @@
PsycoPg2OperationalError
) = PsycoPg2UndefinedObject = PsycoPg2ArraySubscriptError = None

try:
from psycopg.errors import ArraySubscriptError as PsycoPgArraySubscriptError
from psycopg.errors import DivisionByZero as PsycoPgDivisionByZero
from psycopg.errors import IndeterminateDatatype as PsycoPgIndeterminateDatatype
from psycopg.errors import InternalError_ as PsycoPgInternalError
from psycopg.errors import (
InvalidTextRepresentation as PsycoPgInvalidTextRepresentation,
)
from psycopg.errors import OperationalError as PsycoPgOperationalError
from psycopg.errors import ProgrammingError as PsycoPgProgrammingError
from psycopg.errors import SyntaxError as PsycoPgSyntaxError
from psycopg.errors import UndefinedObject as PsycoPgUndefinedObject
except ImportError:
PsycoPgSyntaxError = PsycoPgIndeterminateDatatype = (
PsycoPgInvalidTextRepresentation
) = PsycoPgDivisionByZero = PsycoPgInternalError = PsycoPgProgrammingError = (
PsycoPgOperationalError
) = PsycoPgUndefinedObject = PsycoPgArraySubscriptError = None

try:
from MySQLdb import NotSupportedError as MySQLNotSupportedError
from MySQLdb import OperationalError as MySQLOperationalError
Expand Down
3 changes: 2 additions & 1 deletion ibis/backends/tests/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
PsycoPg2InternalError,
PsycoPg2ProgrammingError,
PsycoPg2SyntaxError,
PsycoPgSyntaxError,
Py4JJavaError,
PyAthenaDatabaseError,
PyAthenaOperationalError,
Expand Down Expand Up @@ -1094,7 +1095,7 @@ def test_array_intersect(con, data):


@builtin_array
@pytest.mark.notimpl(["postgres"], raises=PsycoPg2SyntaxError)
@pytest.mark.notimpl(["postgres"], raises=PsycoPgSyntaxError)
@pytest.mark.notimpl(["risingwave"], raises=PsycoPg2InternalError)
@pytest.mark.notimpl(
["trino"], reason="inserting maps into structs doesn't work", raises=TrinoUserError
Expand Down
4 changes: 2 additions & 2 deletions ibis/backends/tests/test_numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
ImpalaHiveServer2Error,
MySQLOperationalError,
OracleDatabaseError,
PsycoPg2DivisionByZero,
PsycoPg2InternalError,
PsycoPgDivisionByZero,
Py4JError,
Py4JJavaError,
PyAthenaOperationalError,
Expand Down Expand Up @@ -1323,7 +1323,7 @@ def test_floating_mod(backend, alltypes, df):
)
@pytest.mark.notyet(["mssql"], raises=PyODBCDataError)
@pytest.mark.notyet(["snowflake"], raises=SnowflakeProgrammingError)
@pytest.mark.notyet(["postgres"], raises=PsycoPg2DivisionByZero)
@pytest.mark.notyet(["postgres"], raises=PsycoPgDivisionByZero)
@pytest.mark.notimpl(["exasol"], raises=ExaQueryError)
@pytest.mark.xfail_version(duckdb=["duckdb<1.1"])
def test_divide_by_zero(backend, alltypes, df, column, denominator):
Expand Down
Loading
Loading