1
1
# ------------------------------------------------------------------------------
2
- # Copyright (c) 2020, 2024 , Oracle and/or its affiliates.
2
+ # Copyright (c) 2020, 2025 , Oracle and/or its affiliates.
3
3
#
4
4
# This software is dual-licensed to you under the Universal Permissive License
5
5
# (UPL) 1.0 as shown at https://oss.oracle.com/licenses/upl and Apache License
@@ -70,6 +70,7 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
70
70
self .set_wait_timeout(params.wait_timeout)
71
71
self .set_timeout(params.timeout)
72
72
self ._stmt_cache_size = params.stmtcachesize
73
+ self ._max_lifetime_session = params.max_lifetime_session
73
74
self ._ping_interval = params.ping_interval
74
75
self ._ping_timeout = params.ping_timeout
75
76
self ._free_new_conn_impls = []
@@ -154,6 +155,7 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
154
155
if conn_impl._protocol._transport is not None :
155
156
self ._conn_impls_to_drop.append(conn_impl)
156
157
self ._notify_bg_task()
158
+ self ._ensure_min_connections()
157
159
158
160
cdef int _drop_conn_impls_helper(self , list conn_impls_to_drop) except - 1 :
159
161
"""
@@ -167,6 +169,16 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
167
169
except :
168
170
pass
169
171
172
+ cdef int _ensure_min_connections(self ) except - 1 :
173
+ """
174
+ Ensure that the minimum number of connections in the pool is
175
+ maintained.
176
+ """
177
+ if self ._open_count < self .min:
178
+ self ._num_to_create = max (self ._num_to_create,
179
+ self .min - self ._open_count)
180
+ self ._notify_bg_task()
181
+
170
182
cdef PooledConnRequest _get_next_request(self ):
171
183
"""
172
184
Get the next request to process.
@@ -246,13 +258,15 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
246
258
"""
247
259
Called before the connection is connected. The connection class and
248
260
pool attributes are updated and the TLS session is stored on the
249
- transport for reuse.
261
+ transport for reuse. The timestamps are also retained for later use.
250
262
"""
251
263
if params is not None :
252
264
conn_impl._cclass = params._default_description.cclass
253
265
else :
254
266
conn_impl._cclass = self .connect_params._default_description.cclass
255
267
conn_impl._pool = self
268
+ conn_impl._time_created = time.monotonic()
269
+ conn_impl._time_returned = conn_impl._time_created
256
270
257
271
def _process_timeout (self ):
258
272
"""
@@ -280,25 +294,35 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
280
294
bint is_open = conn_impl._protocol._transport is not None
281
295
BaseThinDbObjectTypeCache type_cache
282
296
PooledConnRequest request
297
+ double tstamp
283
298
int cache_num
284
299
self ._busy_conn_impls.remove(conn_impl)
285
300
if conn_impl._dbobject_type_cache_num > 0 :
286
301
cache_num = conn_impl._dbobject_type_cache_num
287
302
type_cache = get_dbobject_type_cache(cache_num)
288
303
type_cache._clear_cursors()
304
+ if not is_open:
305
+ self ._open_count -= 1
306
+ self ._ensure_min_connections()
289
307
if conn_impl._is_pool_extra:
290
308
conn_impl._is_pool_extra = False
291
309
if is_open and self ._open_count >= self .max:
292
310
if self ._free_new_conn_impls and self ._open_count == self .max:
293
311
self ._drop_conn_impl(self ._free_new_conn_impls.pop(0 ))
294
312
else :
313
+ self ._open_count -= 1
295
314
self ._drop_conn_impl(conn_impl)
296
315
is_open = False
297
- if not is_open:
298
- self ._open_count -= 1
299
- else :
316
+ if is_open:
300
317
conn_impl.warning = None
301
- conn_impl._time_in_pool = time.monotonic()
318
+ conn_impl._time_returned = time.monotonic()
319
+ if self ._max_lifetime_session != 0 :
320
+ tstamp = conn_impl._time_created + self ._max_lifetime_session
321
+ if conn_impl._time_returned > tstamp:
322
+ self ._open_count -= 1
323
+ self ._drop_conn_impl(conn_impl)
324
+ is_open = False
325
+ if is_open:
302
326
for request in self ._requests:
303
327
if request.in_progress or request.wants_new \
304
328
or request.conn_impl is not None \
@@ -349,7 +373,7 @@ cdef class BaseThinPoolImpl(BasePoolImpl):
349
373
current_time = time.monotonic()
350
374
while conn_impls_to_check and self ._open_count > self .min:
351
375
conn_impl = conn_impls_to_check[0 ]
352
- if current_time - conn_impl._time_in_pool < self ._timeout:
376
+ if current_time - conn_impl._time_returned < self ._timeout:
353
377
break
354
378
conn_impls_to_check.pop(0 )
355
379
self ._drop_conn_impl(conn_impl)
@@ -535,7 +559,6 @@ cdef class ThinPoolImpl(BaseThinPoolImpl):
535
559
conn_impl = ThinConnImpl(self .dsn, self .connect_params)
536
560
self ._pre_connect(conn_impl, params)
537
561
conn_impl.connect(self .connect_params)
538
- conn_impl._time_in_pool = time.monotonic()
539
562
return conn_impl
540
563
541
564
def _notify_bg_task (self ):
@@ -729,7 +752,6 @@ cdef class AsyncThinPoolImpl(BaseThinPoolImpl):
729
752
conn_impl = AsyncThinConnImpl(self .dsn, self .connect_params)
730
753
self ._pre_connect(conn_impl, params)
731
754
await conn_impl.connect(self .connect_params)
732
- conn_impl._time_in_pool = time.monotonic()
733
755
return conn_impl
734
756
735
757
def _notify_bg_task (self ):
@@ -856,7 +878,7 @@ cdef class PooledConnRequest:
856
878
"""
857
879
cdef:
858
880
ReadBuffer buf = conn_impl._protocol._read_buf
859
- double elapsed_time
881
+ double elapsed_time, min_create_time
860
882
bint has_data_ready
861
883
if not buf._transport._is_async:
862
884
while buf._pending_error_num == 0 :
@@ -865,20 +887,27 @@ cdef class PooledConnRequest:
865
887
break
866
888
buf.check_control_packet()
867
889
if buf._pending_error_num != 0 :
868
- self .pool_impl._drop_conn_impl(conn_impl)
869
890
self .pool_impl._open_count -= 1
870
- else :
871
- self .conn_impl = conn_impl
872
- if self .pool_impl._ping_interval == 0 :
891
+ self .pool_impl._drop_conn_impl(conn_impl)
892
+ return 0
893
+ elif self .pool_impl._max_lifetime_session > 0 :
894
+ min_create_time = \
895
+ time.monotonic() - self .pool_impl._max_lifetime_session
896
+ if conn_impl._time_created < min_create_time:
897
+ self .pool_impl._open_count -= 1
898
+ self .pool_impl._drop_conn_impl(conn_impl)
899
+ return 0
900
+ self .conn_impl = conn_impl
901
+ if self .pool_impl._ping_interval == 0 :
902
+ self .requires_ping = True
903
+ elif self .pool_impl._ping_interval > 0 :
904
+ elapsed_time = time.monotonic() - conn_impl._time_returned
905
+ if elapsed_time > self .pool_impl._ping_interval:
873
906
self .requires_ping = True
874
- elif self .pool_impl._ping_interval > 0 :
875
- elapsed_time = time.monotonic() - conn_impl._time_in_pool
876
- if elapsed_time > self .pool_impl._ping_interval:
877
- self .requires_ping = True
878
- if self .requires_ping:
879
- self .pool_impl._add_request(self )
880
- else :
881
- self .completed = True
907
+ if self .requires_ping:
908
+ self .pool_impl._add_request(self )
909
+ else :
910
+ self .completed = True
882
911
883
912
def fulfill (self ):
884
913
"""
@@ -890,8 +919,8 @@ cdef class PooledConnRequest:
890
919
cdef:
891
920
BaseThinPoolImpl pool = self .pool_impl
892
921
BaseThinConnImpl conn_impl
922
+ ssize_t ix
893
923
object exc
894
- ssize_t i
895
924
896
925
# if an exception was raised in the background thread, raise it now
897
926
if self .exception is not None :
@@ -910,13 +939,14 @@ cdef class PooledConnRequest:
910
939
# connection is not required); in addition, ensure that the connection
911
940
# class matches
912
941
if not self .wants_new and pool._free_used_conn_impls:
913
- for i, conn_impl in enumerate (reversed (pool._free_used_conn_impls)):
942
+ ix = len (pool._free_used_conn_impls) - 1
943
+ for conn_impl in reversed (pool._free_used_conn_impls):
914
944
if self .cclass is None or conn_impl._cclass == self .cclass:
915
- i = len (pool._free_used_conn_impls) - i - 1
916
- pool._free_used_conn_impls.pop(i)
945
+ pool._free_used_conn_impls.pop(ix)
917
946
self ._check_connection(conn_impl)
918
947
if self .completed or self .requires_ping:
919
948
return self .completed
949
+ ix -= 1
920
950
921
951
# check for an available new connection (only permitted if the
922
952
# connection class matches)
0 commit comments