-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.py
274 lines (223 loc) · 8.02 KB
/
pool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import contextlib
import logging
import warnings
from base_db import Connection, MySQLdb
from clean import observer
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# 忽略 MySQLdb warnnings
warnings.filterwarnings(
"ignore",
category=MySQLdb.Warning,
)
class DatabaseError(Exception):
pass
class TransactionAbortError(DatabaseError):
pass
class ConnectionClosedError(DatabaseError):
pass
class _Connector(object):
"""
从池中取出链接的代理
"""
def __init__(self, conn: Connection, pool, is_transaction=False):
self.conn = conn
self._pool = pool
self._closed = False
self.transaction_level = 0
self.is_transaction = is_transaction
if is_transaction:
self.transaction_level += 1
def __getattr__(self, name):
if self._closed:
raise ConnectionClosedError("Connection has closed")
if self.conn:
return getattr(self.conn, name)
@property
def in_transaction(self):
return self.transaction_level > 0
def close(self):
"""
链接彻底关闭返回True,放回池返回False
"""
# 如果连接池在事务中则不执行真正的关闭
if self.in_transaction:
return
self._closed = True
if self.conn not in self._pool._connections:
self.conn.close()
return True
else:
self._pool._idle_connections.append(self.conn)
return False
def __del__(self):
if not self._closed:
self.close()
def __enter__(self):
return self.conn
def __exit__(self, type, value, traceback):
if self.in_transaction and type is not None:
self._pool.rollback_transaction()
self.close()
class ConnectionPool:
"""数据库连接池, 如果没有超过连接数则创建连接, 并在使用完毕后将此连接放入池中,
如果超过则创建一个临时连接, 在使用完毕后会关闭连接
"""
def __init__(
self,
database,
pool_size=10,
host="127.0.0.1",
port=3306,
user="root",
password=None,
*args,
**kwargs,
):
self.pool_size = pool_size
self._connections = []
self._idle_connections = []
self.pool_close = False
self.database = database
kwargs.update({"host": host, "port": port, "user": user, "password": password})
self._args, self._kwargs = args, kwargs
observer.add(self)
# def __del__(self):
# if not self.pool_close:
# self.clean_pool()
def _create(self) -> Connection:
return Connection(self.database, *self._args, **self._kwargs)
def _ping(self, conn, reconnect=True):
if not conn.allow_ping:
try:
# 无法使用ping的数据库使用select 1的方式验证链接
conn.db.verify_ping()
except Exception:
try:
conn.db.close()
except Exception:
pass
alive = False
else:
alive = True
else:
try:
conn.db.ping()
except Exception:
logger.warning("当前数据库连接ping失败", exc_info=True)
alive = False
else:
alive = True
if not alive:
if reconnect:
try:
_conn = self._create()
except Exception as err:
logger.error("数据库链接创建失败", exc_info=True)
raise err from err
else:
if conn in self._connections:
self._connections.remove(conn)
self._connections.append(_conn)
conn = _conn
else:
raise ConnectionClosedError
return conn
def get_idle_connect(self):
if self._idle_connections:
conn = self._idle_connections.pop(0)
try:
_ = self._ping(conn)
except Exception:
logger.warning("空闲连接中存在关闭连接,已移除")
return self.get_idle_connect()
else:
logger.warning("池中无空闲链接")
return None
return conn
def connect(self, is_transaction=False):
"""连接数据库"""
if self._idle_connections:
conn = self.get_idle_connect()
if conn:
return _Connector(conn, self, is_transaction=is_transaction)
conn = self._create()
logger.info(f"创建新链接{conn}")
current_pool_size = len(self._connections)
if current_pool_size < self.pool_size:
logger.debug(f"当前连接池容量为:{current_pool_size}/{self.pool_size}")
self._connections.append(conn)
else:
logger.debug(f"链接池已满 {current_pool_size}/{self.pool_size}")
return _Connector(conn, self, is_transaction=is_transaction)
def clean_pool(self):
"""清空连接池"""
self.pool_close = True
for connect in self._idle_connections:
connect.close()
self._idle_connections = []
self._connections = []
def rollback_transaction(self, using_conn):
"""回滚事务"""
with self._end_transaction(using_conn):
if using_conn.conn.db is not None:
logger.warning("事务回滚开始")
using_conn.db.rollback()
# 嵌套事务回滚回滚后抛出异常
if using_conn.transaction_level > 1:
using_conn.transaction_level = 0
raise TransactionAbortError("当前事务已回滚,但整体事务未结束")
@contextlib.contextmanager
def _end_transaction(self, using_conn):
if using_conn.transaction_level < 1:
raise TransactionAbortError("未开启事务或已被关闭")
using_conn.transaction_level -= 1
try:
yield
finally:
if using_conn.transaction_level < 1:
using_conn.close()
using_conn.is_transaction = False
def commit_transaction(self, using_conn):
"""提交事务"""
with self._end_transaction(using_conn):
if using_conn.transaction_level > 1:
logger.debug(f"处于嵌套事务中,当前事务层级 {using_conn.transaction_level}")
return
try:
using_conn.conn.db.commit()
except MySQLdb.OperationalError as err: # pylint: disable=E1101
using_conn.conn.db.rollback()
raise err from err
@contextlib.contextmanager
def transaction_context(self, using_conn: _Connector = None):
"""事务"""
# 若第一次开启事务则创建连接并开启事务
if not using_conn:
using_conn: _Connector = self.connect(is_transaction=True)
else:
try:
# 嵌套事务链接中断则抛出异常
self._ping(using_conn.conn)
except Exception as err:
logger.error("嵌套事务中断:sql链接断开!回滚取决于mysql配置")
raise err from err
using_conn.transaction_level += 1
using_conn.conn.db.begin()
logger.info(f"开启事务,当前事务层级: {using_conn.transaction_level}")
try:
yield using_conn
except Exception as err:
try:
self.rollback_transaction(using_conn)
except Exception as e:
logger.exception("事务回滚异常", exc_info=e)
raise err from err
else:
try:
self.commit_transaction(using_conn)
except Exception as err:
logger.error("事务提交失败")
raise err from err