Skip to content

Commit 449dd2f

Browse files
Introduce table definition (#1450)
This pull request introduces significant enhancements to the table discovery module by adding new data structures and services for geting table definitions. The changes include the creation of data classes for table and field information, the introduction of an abstract service for fetching table definitions, and a concrete implementation for T-SQL table definitions. --------- Co-authored-by: sundar.shankar <[email protected]> Co-authored-by: SundarShankar89 <[email protected]>
1 parent 1578443 commit 449dd2f

File tree

5 files changed

+285
-0
lines changed

5 files changed

+285
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
from abc import ABC, abstractmethod
2+
from collections.abc import Iterable
3+
from typing import Any
4+
5+
from databricks.labs.remorph.discovery.table import TableDefinition
6+
7+
8+
class TableDefinitionService(ABC):
9+
10+
def __init__(self, connection: Any):
11+
self.connection = connection
12+
13+
@abstractmethod
14+
def get_table_definition(self, catalog_name: str) -> Iterable[TableDefinition]:
15+
pass
16+
17+
@abstractmethod
18+
def _get_table_definition_query(self, catalog_name: str) -> str:
19+
pass
20+
21+
@abstractmethod
22+
def get_all_catalog(self) -> Iterable[str]:
23+
pass
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
from collections.abc import Iterable
2+
3+
from databricks.labs.remorph.connections.database_manager import DatabaseManager
4+
from databricks.labs.remorph.discovery.table import TableDefinition, TableFQN, FieldInfo
5+
from databricks.labs.remorph.discovery.table_definition import TableDefinitionService
6+
7+
8+
class TsqlTableDefinitionService(TableDefinitionService):
9+
10+
# Hexadecimal value of § is U+00A7.Hexadecimal value of ‡ (double dagger) is U+2021
11+
def _get_table_definition_query(self, catalog_name: str) -> str:
12+
query = f"""
13+
WITH column_info AS (
14+
SELECT
15+
TABLE_CATALOG,
16+
TABLE_SCHEMA,
17+
TABLE_NAME,
18+
STRING_AGG(
19+
CONCAT(
20+
column_name,
21+
'§',
22+
CASE
23+
WHEN numeric_precision IS NOT NULL AND numeric_scale IS NOT NULL THEN CONCAT(data_type, '(', numeric_precision, ',', numeric_scale, ')')
24+
WHEN LOWER(data_type) = 'text' THEN CONCAT('varchar', '(', CHARACTER_MAXIMUM_LENGTH, ')')
25+
ELSE data_type
26+
END,
27+
'§',
28+
CASE
29+
WHEN cis.IS_NULLABLE = 'YES' THEN 'true'
30+
ELSE 'false'
31+
END,
32+
'§',
33+
ISNULL(CAST(ep_col.value AS NVARCHAR(MAX)), '')
34+
),
35+
'‡'
36+
) WITHIN GROUP (ORDER BY ordinal_position) AS DERIVED_SCHEMA
37+
FROM
38+
{catalog_name}.sys.tables t
39+
INNER JOIN {catalog_name}.sys.columns c ON t.object_id = c.object_id
40+
INNER JOIN {catalog_name}.INFORMATION_SCHEMA.COLUMNS cis ON t.name = cis.TABLE_NAME AND c.name = cis.COLUMN_NAME
41+
OUTER APPLY (
42+
SELECT TOP 1 value
43+
FROM {catalog_name}.sys.extended_properties
44+
WHERE major_id = t.object_id AND minor_id = 0
45+
ORDER BY name DESC
46+
) ep_tbl
47+
OUTER APPLY (
48+
SELECT TOP 1 value
49+
FROM {catalog_name}.sys.extended_properties
50+
WHERE major_id = c.object_id AND minor_id = c.column_id
51+
ORDER BY name DESC
52+
) ep_col
53+
GROUP BY
54+
TABLE_CATALOG,
55+
TABLE_SCHEMA,
56+
TABLE_NAME
57+
),
58+
table_file_info AS (
59+
SELECT
60+
s.name AS TABLE_SCHEMA,
61+
t.name AS TABLE_NAME,
62+
f.physical_name AS location,
63+
f.type_desc AS TABLE_FORMAT,
64+
CAST(ROUND(SUM(a.used_pages) * 8.0 / 1024, 2) AS DECIMAL(18, 2)) AS SIZE_GB
65+
FROM
66+
{catalog_name}.sys.tables t
67+
INNER JOIN {catalog_name}.sys.indexes i ON t.object_id = i.object_id
68+
INNER JOIN {catalog_name}.sys.partitions p ON i.object_id = p.object_id AND i.index_id = p.index_id
69+
INNER JOIN {catalog_name}.sys.allocation_units a ON p.partition_id = a.container_id
70+
INNER JOIN {catalog_name}.sys.schemas s ON t.schema_id = s.schema_id
71+
INNER JOIN {catalog_name}.sys.database_files f ON a.data_space_id = f.data_space_id
72+
LEFT JOIN {catalog_name}.sys.extended_properties ep ON ep.major_id = t.object_id AND ep.minor_id = 0
73+
GROUP BY
74+
s.name,
75+
t.name,
76+
f.name,
77+
f.physical_name,
78+
f.type_desc
79+
),
80+
table_comment_info AS (
81+
SELECT
82+
s.name AS TABLE_SCHEMA,
83+
t.name AS TABLE_NAME,
84+
CAST(ep.value AS NVARCHAR(MAX)) AS TABLE_COMMENT
85+
FROM
86+
{catalog_name}.sys.tables t
87+
INNER JOIN {catalog_name}.sys.schemas s ON t.schema_id = s.schema_id
88+
OUTER APPLY (
89+
SELECT TOP 1 value
90+
FROM {catalog_name}.sys.extended_properties
91+
WHERE major_id = t.object_id AND minor_id = 0
92+
ORDER BY name DESC
93+
) ep
94+
),
95+
table_pk_info AS (
96+
SELECT
97+
TC.TABLE_CATALOG,
98+
TC.TABLE_SCHEMA,
99+
TC.TABLE_NAME,
100+
STRING_AGG(KU.COLUMN_NAME,':') as PK_COLUMN_NAME
101+
FROM {catalog_name}.INFORMATION_SCHEMA.TABLE_CONSTRAINTS AS TC
102+
JOIN {catalog_name}.INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS KU
103+
ON TC.CONSTRAINT_NAME = KU.CONSTRAINT_NAME
104+
AND TC.TABLE_NAME = KU.TABLE_NAME
105+
WHERE TC.CONSTRAINT_TYPE = 'PRIMARY KEY' group by TC.TABLE_CATALOG, TC.TABLE_SCHEMA, TC.TABLE_NAME)
106+
SELECT
107+
sft.TABLE_CATALOG,
108+
sft.TABLE_SCHEMA,
109+
sft.TABLE_NAME,
110+
tfi.location,
111+
tfi.TABLE_FORMAT,
112+
'' as view_definition,
113+
column_info.DERIVED_SCHEMA,
114+
tfi.SIZE_GB,
115+
tci.TABLE_COMMENT,
116+
tpK.PK_COLUMN_NAME
117+
FROM
118+
column_info
119+
JOIN {catalog_name}.INFORMATION_SCHEMA.TABLES sft ON column_info.TABLE_CATALOG = sft.TABLE_CATALOG AND column_info.TABLE_SCHEMA = sft.TABLE_SCHEMA AND column_info.TABLE_NAME = sft.TABLE_NAME
120+
LEFT JOIN table_file_info tfi ON column_info.TABLE_SCHEMA = tfi.TABLE_SCHEMA AND column_info.TABLE_NAME = tfi.TABLE_NAME
121+
LEFT JOIN table_comment_info tci ON column_info.TABLE_SCHEMA = tci.TABLE_SCHEMA AND column_info.TABLE_NAME = tci.TABLE_NAME
122+
LEFT JOIN table_pk_info tpK ON column_info.TABLE_SCHEMA = tpK.TABLE_SCHEMA AND column_info.TABLE_NAME = tpK.TABLE_NAME
123+
124+
UNION ALL
125+
SELECT
126+
sfv.TABLE_CATALOG,
127+
sfv.TABLE_SCHEMA,
128+
sfv.TABLE_NAME,
129+
'' location,
130+
'' TABLE_FORMAT,
131+
sfv.view_definition,
132+
'' DERIVED_SCHEMA,
133+
0 SIZE_GB,
134+
'' TABLE_COMMENT,
135+
'' PK_COLUMN_NAME
136+
FROM {catalog_name}.INFORMATION_SCHEMA.VIEWS sfv
137+
"""
138+
return query
139+
140+
def get_table_definition(self, catalog_name: str) -> Iterable[TableDefinition]:
141+
sql = self._get_table_definition_query(catalog_name)
142+
tsql_connection = self.connection
143+
result = tsql_connection.execute_query(sql)
144+
145+
column_names = list(result.keys())
146+
table_definitions = []
147+
148+
for row in result:
149+
result = dict(zip(column_names, row))
150+
table_fqn = TableFQN(
151+
catalog=result["TABLE_CATALOG"], schema=result["TABLE_SCHEMA"], name=result["TABLE_NAME"]
152+
)
153+
columns = result["DERIVED_SCHEMA"].split("‡") if result["DERIVED_SCHEMA"] else None
154+
field_info = []
155+
if columns is not None:
156+
for column in columns:
157+
column_info = column.split("§")
158+
field = FieldInfo(
159+
name=column_info[0],
160+
data_type=column_info[1],
161+
nullable=column_info[2],
162+
comment=column_info[3],
163+
)
164+
field_info.append(field)
165+
166+
pks = result["PK_COLUMN_NAME"].split(":") if result["PK_COLUMN_NAME"] else None
167+
table_definition = TableDefinition(
168+
fqn=table_fqn,
169+
location=result["location"],
170+
table_format=result["TABLE_FORMAT"],
171+
view_text=result["view_definition"],
172+
columns=field_info,
173+
size_gb=result["SIZE_GB"],
174+
comment=result["TABLE_COMMENT"],
175+
primary_keys=pks,
176+
)
177+
table_definitions.append(table_definition)
178+
return table_definitions
179+
180+
def get_all_catalog(self) -> Iterable[str]:
181+
cursor: DatabaseManager = self.connection
182+
result = cursor.connector.execute_query("""select name from sys.databases""")
183+
catalogs = [row[0] for row in result]
184+
print(catalogs)
185+
return catalogs

tests/integration/discovery/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import pytest
2+
3+
from databricks.labs.remorph.discovery.tsql_table_definition import TsqlTableDefinitionService
4+
from ..connections.helpers import get_db_manager
5+
6+
7+
@pytest.fixture()
8+
def extractor(mock_credentials):
9+
return get_db_manager("remorph", "mssql")
10+
11+
12+
def test_tsql_get_catalog(extractor):
13+
tss = TsqlTableDefinitionService(extractor)
14+
catalogs = list(tss.get_all_catalog())
15+
assert catalogs is not None
16+
assert len(catalogs) > 0
17+
18+
19+
def test_tsql_get_table_definition(extractor):
20+
tss = TsqlTableDefinitionService(extractor)
21+
table_def = tss.get_table_definition("labs_azure_sandbox_remorph")
22+
assert table_def is not None
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
from unittest.mock import MagicMock
2+
from databricks.labs.remorph.discovery.table import TableDefinition
3+
from databricks.labs.remorph.discovery.tsql_table_definition import TsqlTableDefinitionService
4+
5+
6+
def test_get_table_definition_with_data():
7+
db_manager = MagicMock()
8+
mock_result = [
9+
(
10+
"catalog1",
11+
"schema1",
12+
"table1",
13+
"col1§int§YES§Primary Column‡col2§string§NO§Description",
14+
"col1:col2",
15+
"/path/to/table",
16+
"parquet",
17+
"",
18+
10.5,
19+
"Table Comment",
20+
),
21+
]
22+
23+
mock_column_names = [
24+
"TABLE_CATALOG",
25+
"TABLE_SCHEMA",
26+
"TABLE_NAME",
27+
"DERIVED_SCHEMA",
28+
"PK_COLUMN_NAME",
29+
"location",
30+
"TABLE_FORMAT",
31+
"view_definition",
32+
"SIZE_GB",
33+
"TABLE_COMMENT",
34+
]
35+
36+
mock_query_result = MagicMock()
37+
mock_query_result.keys.return_value = mock_column_names
38+
mock_query_result.__iter__.return_value = iter(mock_result)
39+
db_manager.execute_query.return_value = mock_query_result
40+
41+
tss = TsqlTableDefinitionService(db_manager)
42+
result = list(tss.get_table_definition("test_catalog"))
43+
assert result[0].primary_keys == ['col1', 'col2']
44+
assert isinstance(result[0], TableDefinition)
45+
assert result[0].fqn.catalog == 'catalog1'
46+
assert result[0].fqn.schema == 'schema1'
47+
assert result[0].fqn.name == 'table1'
48+
49+
50+
def test_get_catalogs():
51+
db_manager = MagicMock()
52+
db_manager.connector.execute_query.return_value = [('db1',), ('db2',)]
53+
tss = TsqlTableDefinitionService(db_manager)
54+
result = list(tss.get_all_catalog())
55+
assert result == ['db1', 'db2']

0 commit comments

Comments
 (0)