Skip to content

Commit ab9e9e4

Browse files
Add db-sync config tests
1 parent b96d25f commit ab9e9e4

File tree

4 files changed

+677
-1
lines changed

4 files changed

+677
-1
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
"""Tests for basic DB-Sync configuration options."""
2+
3+
import logging
4+
import typing as tp
5+
6+
import pytest
7+
8+
from cardano_node_tests.cluster_management import cluster_management
9+
from cardano_node_tests.utils import configuration
10+
from cardano_node_tests.utils import dbsync_utils
11+
12+
LOGGER = logging.getLogger(__name__)
13+
14+
pytestmark = [
15+
pytest.mark.skipif(
16+
configuration.CLUSTERS_COUNT != 1 and not configuration.HAS_DBSYNC,
17+
reason="db-sync config tests can run on a single cluster only",
18+
),
19+
pytest.mark.dbsync_config,
20+
]
21+
22+
23+
def check_dbsync_state(expected_state: dict) -> None:
24+
"""Check the state of db-sync tables and columns against expected conditions.
25+
26+
Args:
27+
expected_state: Dictionary specifying conditions to verify where:
28+
- Key format:
29+
* "table" for table-level checks
30+
* "table.column" for column-level checks
31+
- Value format:
32+
* "empty" - verify table is empty
33+
* "not_empty" - verify table has rows
34+
* "exists" - verify table/column exists
35+
* "not_exists" - verify table/column doesn't exist
36+
* "column_condition:=0" - custom SQL condition
37+
* "column_condition:IS NULL" - NULL check condition
38+
39+
Returns:
40+
bool: True if all conditions match, False otherwise
41+
"""
42+
for key, condition in expected_state.items():
43+
if "." in key: # Column-level check
44+
table, column = key.split(".", 1)
45+
assert condition.startswith("column_condition:"), (
46+
f"Invalid column condition format: {condition}"
47+
)
48+
column_condition = condition.split(":", 1)[1]
49+
dbsync_utils.check_column_condition(table, column, column_condition)
50+
else: # Table-level check
51+
match condition:
52+
case "empty":
53+
assert dbsync_utils.table_empty(key), (
54+
f"Expected {key} to be empty, but it is not."
55+
)
56+
case "not_empty":
57+
assert not dbsync_utils.table_empty(key), (
58+
f"Expected {key} to have data, but it is empty."
59+
)
60+
case "exists":
61+
assert dbsync_utils.table_exists(key), (
62+
f"Expected {key} to exist, but it does not."
63+
)
64+
case "not_exists":
65+
assert not dbsync_utils.table_exists(key), (
66+
f"Expected {key} to NOT exist, but it does."
67+
)
68+
case _:
69+
error_msg = f"Unknown table condition '{condition}' for table '{key}'"
70+
raise ValueError(error_msg)
71+
72+
73+
@pytest.fixture
74+
def db_sync_manager(
75+
request: pytest.FixtureRequest, cluster_manager: cluster_management.ClusterManager
76+
) -> dbsync_utils.DBSyncManager:
77+
"""Provide db-sync manager on a singleton cluster.
78+
79+
Creates and returns a DBSyncManager instance with locked cluster resources
80+
to ensure exclusive access during testing.
81+
"""
82+
cluster_manager.get(lock_resources=[cluster_management.Resources.CLUSTER])
83+
return dbsync_utils.DBSyncManager(request)
84+
85+
86+
@pytest.mark.order(-1)
87+
class TestDBSyncConfig:
88+
"""Basic tests for DB-Sync Config."""
89+
90+
def test_basic_tx_out(
91+
self,
92+
db_sync_manager: dbsync_utils.DBSyncManager,
93+
):
94+
"""Test tx_out option."""
95+
db_config = db_sync_manager.get_config_builder()
96+
97+
# Test tx_out : enable
98+
db_sync_manager.restart_with_config(
99+
db_config.with_tx_out(value="enable", force_tx_in=False, use_address_table=False)
100+
)
101+
check_dbsync_state(
102+
{
103+
"address": "not_exists",
104+
"tx_in": "not_empty",
105+
"tx_out": "not_empty",
106+
"ma_tx_out": "not_empty",
107+
}
108+
)
109+
110+
# Test tx_out : disable
111+
db_sync_manager.restart_with_config(
112+
db_config.with_tx_out(value="disable", force_tx_in=True, use_address_table=True)
113+
)
114+
check_dbsync_state(
115+
{
116+
"address": "not_exists",
117+
"tx_in": "empty",
118+
"tx_out": "empty",
119+
"ma_tx_out": "empty",
120+
"tx.fee": "column_condition:=0",
121+
"redeemer.script_hash": "column_condition:IS NULL",
122+
}
123+
)
124+
125+
@pytest.mark.parametrize(
126+
("tx_cbor_value", "expected_state"), [("enable", "not_empty"), ("disable", "empty")]
127+
)
128+
def test_cbor(
129+
self,
130+
db_sync_manager: dbsync_utils.DBSyncManager,
131+
tx_cbor_value: tp.Literal["enable", "disable"],
132+
expected_state: str,
133+
):
134+
"""Test tx_cbor option with parametrization."""
135+
db_config = db_sync_manager.get_config_builder()
136+
137+
db_sync_manager.restart_with_config(db_config.with_tx_cbor(tx_cbor_value))
138+
check_dbsync_state({"tx_cbor": expected_state})
139+
140+
@pytest.mark.parametrize(
141+
("multi_asset_enable", "expected_state"), [(True, "not_empty"), (False, "empty")]
142+
)
143+
def test_multi_asset(
144+
self,
145+
db_sync_manager: dbsync_utils.DBSyncManager,
146+
multi_asset_enable: bool,
147+
expected_state: str,
148+
):
149+
"""Test multi_asset option with parametrization."""
150+
db_config = db_sync_manager.get_config_builder()
151+
152+
db_sync_manager.restart_with_config(db_config.with_multi_asset(enable=multi_asset_enable))
153+
check_dbsync_state({"multi_asset": expected_state})

cardano_node_tests/utils/dbsync_queries.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -620,6 +620,27 @@ def execute(query: str, vars: tp.Sequence = ()) -> tp.Iterator[psycopg2.extensio
620620
cur.close()
621621

622622

623+
@contextlib.contextmanager
624+
def db_transaction() -> tp.Iterator[None]:
625+
"""Transaction manager that works with connection pool."""
626+
conn = None
627+
try:
628+
conn = dbsync_conn.conn()
629+
# Only modify autocommit if not already in a transaction
630+
if conn.get_transaction_status() == psycopg2.extensions.TRANSACTION_STATUS_IDLE:
631+
conn.autocommit = False
632+
yield
633+
conn.commit()
634+
except Exception:
635+
if conn and not conn.closed:
636+
conn.rollback()
637+
raise
638+
finally:
639+
if conn and not conn.closed:
640+
conn.autocommit = True # Reset to default psycopg2 behavior
641+
# Don't close the connection - let execute() manage it
642+
643+
623644
class SchemaVersion:
624645
"""Query and cache db-sync schema version."""
625646

@@ -1546,9 +1567,77 @@ def query_drep_distr(
15461567

15471568
def delete_reserved_pool_tickers() -> int:
15481569
"""Delete all records from reserved_pool_ticker and return the number of affected rows."""
1549-
query = "DELETE FROM reserved_pool_ticker"
1570+
query = "DELETE FROM reserved_pool_ticker;"
15501571

15511572
with execute(query=query) as cur:
15521573
affected_rows = cur.rowcount or 0
15531574
dbsync_conn.conn().commit()
15541575
return affected_rows
1576+
1577+
1578+
def query_db_sync_progress() -> float:
1579+
"""Calculate blockchain sync percentage (0-100).
1580+
1581+
Returns:
1582+
float: Sync percentage (0-100) if blocks exist
1583+
None: If no blocks in database
1584+
"""
1585+
query = (
1586+
"SELECT"
1587+
" 100 * (EXTRACT(EPOCH FROM (MAX(time) AT TIME ZONE 'UTC')) -"
1588+
" EXTRACT(EPOCH FROM (MIN(time) AT TIME ZONE 'UTC')))"
1589+
" / (EXTRACT(EPOCH FROM (NOW() AT TIME ZONE 'UTC')) -"
1590+
" EXTRACT(EPOCH FROM (MIN(time) AT TIME ZONE 'UTC')))"
1591+
" AS sync_percent "
1592+
" FROM block;"
1593+
)
1594+
1595+
with execute(query=query) as cur:
1596+
result = cur.fetchone()
1597+
return min(100.0, float(result[0])) if result else 0.0
1598+
1599+
1600+
def query_rows_count(
1601+
table: str,
1602+
column: str | None = None,
1603+
condition: str | None = None,
1604+
lock: bool = False,
1605+
lock_timeout: str = "5s",
1606+
) -> int:
1607+
"""Query row count with optional table locking for atomic snapshots.
1608+
1609+
Args:
1610+
table: Table name
1611+
column: Column name (optional)
1612+
condition: SQL condition (required if column is provided)
1613+
lock: Whether to lock table during query
1614+
lock_timeout: PostgreSQL timeout string (e.g., '5s', '1min')
1615+
1616+
Returns:
1617+
Count of matching rows (0 if none match or table is empty)
1618+
1619+
Raises:
1620+
ValueError: For invalid inputs
1621+
RuntimeError: If lock times out
1622+
"""
1623+
# Validation
1624+
if column is not None and condition is None:
1625+
error_msg = "Condition required when column is specified."
1626+
raise ValueError(error_msg)
1627+
1628+
# Query construction
1629+
lock_clause = (
1630+
f"SET LOCAL lock_timeout = '{lock_timeout}'; LOCK TABLE {table} IN SHARE MODE;"
1631+
if lock
1632+
else ""
1633+
)
1634+
where_clause = f" WHERE {column} {condition}" if (column and condition) else ""
1635+
query = f"{lock_clause} SELECT COUNT(*) FROM {table}{where_clause}"
1636+
1637+
# Execution
1638+
with execute(query=query) as cur:
1639+
try:
1640+
return cur.fetchone()[0] or 0
1641+
except psycopg2.errors.LockNotAvailable as e:
1642+
error_msg = f"Could not acquire lock on {table} within {lock_timeout}"
1643+
raise RuntimeError(error_msg) from e

0 commit comments

Comments
 (0)