3
3
import functools
4
4
import itertools
5
5
import logging
6
+ import posixpath
6
7
from typing import Any , Dict , List , Mapping , Optional , Sequence , Union
7
8
8
9
import ydb
@@ -78,6 +79,7 @@ def __init__(
78
79
session_pool : Union [ydb .SessionPool , ydb .aio .SessionPool ],
79
80
tx_mode : ydb .AbstractTransactionModeBuilder ,
80
81
tx_context : Optional [ydb .BaseTxContext ] = None ,
82
+ table_path_prefix : str = "" ,
81
83
):
82
84
self .session_pool = session_pool
83
85
self .tx_mode = tx_mode
@@ -86,33 +88,36 @@ def __init__(
86
88
self .arraysize = 1
87
89
self .rows = None
88
90
self ._rows_prefetched = None
91
+ self .root_directory = table_path_prefix
89
92
90
93
@_handle_ydb_errors
91
94
def describe_table (self , abs_table_path : str ) -> ydb .TableDescription :
92
95
return self ._retry_operation_in_pool (self ._describe_table , abs_table_path )
93
96
94
- def check_exists (self , table_path : str ) -> bool :
97
+ def check_exists (self , abs_table_path : str ) -> bool :
95
98
try :
96
- self ._retry_operation_in_pool (self ._describe_path , table_path )
99
+ self ._retry_operation_in_pool (self ._describe_path , abs_table_path )
97
100
return True
98
101
except ydb .SchemeError :
99
102
return False
100
103
101
104
@_handle_ydb_errors
102
- def get_table_names (self ) -> List [str ]:
103
- directory : ydb .Directory = self ._retry_operation_in_pool (self ._list_directory )
104
- return [child .name for child in directory .children if child .is_table ()]
105
+ def get_table_names (self , abs_dir_path : str ) -> List [str ]:
106
+ directory : ydb .Directory = self ._retry_operation_in_pool (self ._list_directory , abs_dir_path )
107
+ result = []
108
+ for child in directory .children :
109
+ child_abs_path = posixpath .join (abs_dir_path , child .name )
110
+ if child .is_table ():
111
+ result .append (child_abs_path )
112
+ elif child .is_directory () and not child .name .startswith ("." ):
113
+ result .extend (self .get_table_names (child_abs_path ))
114
+ return result
105
115
106
116
def execute (self , operation : YdbQuery , parameters : Optional [Mapping [str , Any ]] = None ):
107
- if operation .is_ddl or not operation .parameters_types :
108
- query = operation .yql_text
109
- is_ddl = operation .is_ddl
110
- else :
111
- query = ydb .DataQuery (operation .yql_text , operation .parameters_types )
112
- is_ddl = operation .is_ddl
117
+ query = self ._get_ydb_query (operation )
113
118
114
119
logger .info ("execute sql: %s, params: %s" , query , parameters )
115
- if is_ddl :
120
+ if operation . is_ddl :
116
121
chunks = self ._execute_ddl (query )
117
122
else :
118
123
chunks = self ._execute_dml (query , parameters )
@@ -130,6 +135,15 @@ def execute(self, operation: YdbQuery, parameters: Optional[Mapping[str, Any]] =
130
135
131
136
self .rows = rows
132
137
138
+ def _get_ydb_query (self , operation : YdbQuery ) -> Union [ydb .DataQuery , str ]:
139
+ pragma = ""
140
+ if self .root_directory :
141
+ pragma = f'PRAGMA TablePathPrefix = "{ self .root_directory } ";\n '
142
+ if operation .is_ddl or not operation .parameters_types :
143
+ return pragma + operation .yql_text
144
+
145
+ return ydb .DataQuery (pragma + operation .yql_text , operation .parameters_types )
146
+
133
147
@_handle_ydb_errors
134
148
def _execute_dml (
135
149
self , query : Union [ydb .DataQuery , str ], parameters : Optional [Mapping [str , Any ]] = None
@@ -163,8 +177,8 @@ def _describe_path(session: ydb.Session, table_path: str) -> ydb.SchemeEntry:
163
177
return session ._driver .scheme_client .describe_path (table_path )
164
178
165
179
@staticmethod
166
- def _list_directory (session : ydb .Session ) -> ydb .Directory :
167
- return session ._driver .scheme_client .list_directory (session . _driver . _driver_config . database )
180
+ def _list_directory (session : ydb .Session , abs_dir_path : str ) -> ydb .Directory :
181
+ return session ._driver .scheme_client .list_directory (abs_dir_path )
168
182
169
183
@staticmethod
170
184
def _prepare (session : ydb .Session , query : str ) -> ydb .DataQuery :
@@ -264,12 +278,12 @@ async def _describe_table(session: ydb.aio.table.Session, abs_table_path: str) -
264
278
return await session .describe_table (abs_table_path )
265
279
266
280
@staticmethod
267
- async def _describe_path (session : ydb .aio .table .Session , table_path : str ) -> ydb .SchemeEntry :
268
- return await session ._driver .scheme_client .describe_path (table_path )
281
+ async def _describe_path (session : ydb .aio .table .Session , abs_table_path : str ) -> ydb .SchemeEntry :
282
+ return await session ._driver .scheme_client .describe_path (abs_table_path )
269
283
270
284
@staticmethod
271
- async def _list_directory (session : ydb .aio .table .Session ) -> ydb .Directory :
272
- return await session ._driver .scheme_client .list_directory (session . _driver . _driver_config . database )
285
+ async def _list_directory (session : ydb .aio .table .Session , abs_dir_path : str ) -> ydb .Directory :
286
+ return await session ._driver .scheme_client .list_directory (abs_dir_path )
273
287
274
288
@staticmethod
275
289
async def _execute_scheme (session : ydb .aio .table .Session , query : str ) -> ydb .convert .ResultSets :
0 commit comments