-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathtest_connection_pool.py
88 lines (66 loc) · 2.24 KB
/
test_connection_pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import pytest
from psqlpy import (
Connection,
ConnectionPool,
ConnRecyclingMethod,
QueryResult,
create_pool,
)
from psqlpy.exceptions import RustPSQLDriverPyBaseError
pytestmark = pytest.mark.anyio
async def test_connect_func() -> None:
"""Test that connect function makes new connection pool."""
pg_pool = create_pool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
await pg_pool.execute("SELECT 1")
async def test_pool_dsn_startup() -> None:
"""Test that connection pool can startup with dsn."""
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
await pg_pool.execute("SELECT 1")
async def test_pool_execute(
psql_pool: ConnectionPool,
table_name: str,
number_database_records: int,
) -> None:
"""Test that ConnectionPool can execute queries."""
select_result = await psql_pool.execute(
f"SELECT * FROM {table_name}",
)
assert type(select_result) == QueryResult
inner_result = select_result.result()
assert isinstance(inner_result, list)
assert len(inner_result) == number_database_records
async def test_pool_connection(
psql_pool: ConnectionPool,
) -> None:
"""Test that ConnectionPool can return single connection from the pool."""
connection = await psql_pool.connection()
assert isinstance(connection, Connection)
@pytest.mark.parametrize(
"conn_recycling_method",
[
ConnRecyclingMethod.Fast,
ConnRecyclingMethod.Verified,
ConnRecyclingMethod.Clean,
],
)
async def test_pool_conn_recycling_method(
conn_recycling_method: ConnRecyclingMethod,
) -> None:
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
conn_recycling_method=conn_recycling_method,
)
await pg_pool.execute("SELECT 1")
async def test_close_connection_pool() -> None:
"""Test that `close` method closes connection pool."""
pg_pool = ConnectionPool(
dsn="postgres://postgres:postgres@localhost:5432/psqlpy_test",
)
await pg_pool.execute("SELECT 1")
pg_pool.close()
with pytest.raises(expected_exception=RustPSQLDriverPyBaseError):
await pg_pool.execute("SELECT 1")