Skip to content

Commit 84b6f94

Browse files
committed
Make guard to have only one pending reconnect
ControlConnection.reconnect could be calld from many places in parallel. When it happens you get to see streak for reconnects happning one by one. This commit adds guard that makes sure there is only one pending reconnect.
1 parent 23a6e85 commit 84b6f94

File tree

2 files changed

+58
-57
lines changed

2 files changed

+58
-57
lines changed

cassandra/cluster.py

+34-57
Original file line numberDiff line numberDiff line change
@@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception):
35463546
pass
35473547

35483548

3549-
class _ControlReconnectionHandler(_ReconnectionHandler):
3550-
"""
3551-
Internal
3552-
"""
3553-
3554-
def __init__(self, control_connection, *args, **kwargs):
3555-
_ReconnectionHandler.__init__(self, *args, **kwargs)
3556-
self.control_connection = weakref.proxy(control_connection)
3557-
3558-
def try_reconnect(self):
3559-
return self.control_connection._reconnect_internal()
3560-
3561-
def on_reconnection(self, connection):
3562-
self.control_connection._set_new_connection(connection)
3563-
3564-
def on_exception(self, exc, next_delay):
3565-
# TODO only overridden to add logging, so add logging
3566-
if isinstance(exc, AuthenticationFailed):
3567-
return False
3568-
else:
3569-
log.debug("Error trying to reconnect control connection: %r", exc)
3570-
return True
3571-
3572-
35733549
def _watch_callback(obj_weakref, method_name, *args, **kwargs):
35743550
"""
35753551
A callback handler for the ControlConnection that tolerates
@@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout,
36623638

36633639
self._reconnection_handler = None
36643640
self._reconnection_lock = RLock()
3641+
self._reconnection_pending = False
36653642

36663643
self._event_schedule_times = {}
36673644

@@ -3695,6 +3672,8 @@ def _connect_host_in_lbp(self):
36953672
)
36963673

36973674
for host in lbp.make_query_plan():
3675+
if self._is_shutdown:
3676+
break
36983677
try:
36993678
return (self._try_connect(host), None)
37003679
except ConnectionException as exc:
@@ -3818,44 +3797,47 @@ def reconnect(self):
38183797
if self._is_shutdown:
38193798
return
38203799

3800+
with self._reconnection_lock:
3801+
if self._reconnection_pending:
3802+
return
3803+
self._reconnection_pending = True
3804+
38213805
self._submit(self._reconnect)
38223806

3823-
def _reconnect(self):
3807+
def _reconnect(self, schedule = None):
38243808
log.debug("[control connection] Attempting to reconnect")
3809+
if self._is_shutdown:
3810+
return
3811+
38253812
try:
38263813
self._set_new_connection(self._reconnect_internal())
3814+
self._reconnection_pending = False
3815+
return
38273816
except NoHostAvailable:
3828-
# make a retry schedule (which includes backoff)
3829-
schedule = self._cluster.reconnection_policy.new_schedule()
3817+
log.debug("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt")
3818+
except Exception as ex:
3819+
log.debug("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s", ex)
38303820

3831-
with self._reconnection_lock:
3821+
if schedule is None:
3822+
schedule = self._cluster.reconnection_policy.new_schedule()
38323823

3833-
# cancel existing reconnection attempts
3834-
if self._reconnection_handler:
3835-
self._reconnection_handler.cancel()
3824+
try:
3825+
next_delay = next(schedule)
3826+
except StopIteration:
3827+
# the schedule has been exhausted
3828+
schedule = self._cluster.reconnection_policy.new_schedule()
3829+
try:
3830+
next_delay = next(schedule)
3831+
except StopIteration:
3832+
next_delay = 0
38363833

3837-
# when a connection is successfully made, _set_new_connection
3838-
# will be called with the new connection and then our
3839-
# _reconnection_handler will be cleared out
3840-
self._reconnection_handler = _ControlReconnectionHandler(
3841-
self, self._cluster.scheduler, schedule,
3842-
self._get_and_set_reconnection_handler,
3843-
new_handler=None)
3844-
self._reconnection_handler.start()
3845-
except Exception:
3846-
log.debug("[control connection] error reconnecting", exc_info=True)
3847-
raise
3834+
if self._is_shutdown:
3835+
return
38483836

3849-
def _get_and_set_reconnection_handler(self, new_handler):
3850-
"""
3851-
Called by the _ControlReconnectionHandler when a new connection
3852-
is successfully created. Clears out the _reconnection_handler on
3853-
this ControlConnection.
3854-
"""
3855-
with self._reconnection_lock:
3856-
old = self._reconnection_handler
3857-
self._reconnection_handler = new_handler
3858-
return old
3837+
if next_delay == 0:
3838+
self._submit(self._reconnect)
3839+
else:
3840+
self._cluster.scheduler.schedule(next_delay, partial(self._reconnect, schedule))
38593841

38603842
def _submit(self, *args, **kwargs):
38613843
try:
@@ -3866,11 +3848,6 @@ def _submit(self, *args, **kwargs):
38663848
return None
38673849

38683850
def shutdown(self):
3869-
# stop trying to reconnect (if we are)
3870-
with self._reconnection_lock:
3871-
if self._reconnection_handler:
3872-
self._reconnection_handler.cancel()
3873-
38743851
with self._lock:
38753852
if self._is_shutdown:
38763853
return

tests/integration/standard/test_cluster.py

+24
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,30 @@ def test_invalid_protocol_negotation(self):
326326
cluster.connect()
327327
cluster.shutdown()
328328

329+
def test_control_connection_reconnect(self):
330+
"""
331+
Ensure clusters that connect on a keyspace, do
332+
"""
333+
cassandra.cluster.log.setLevel(logging.DEBUG)
334+
335+
cluster = TestCluster()
336+
_ = cluster.connect()
337+
338+
cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal)
339+
340+
cluster.control_connection.reconnect()
341+
cluster.control_connection.reconnect()
342+
cluster.control_connection.reconnect()
343+
cluster.control_connection.reconnect()
344+
345+
while cluster.control_connection._reconnection_pending:
346+
time.sleep(0.1)
347+
348+
self.assertFalse(cluster.control_connection._connection.is_closed)
349+
self.assertFalse(cluster.control_connection._connection.is_defunct)
350+
self.assertTrue(cluster.control_connection.refresh_schema())
351+
self.assertEqual(1, len(cluster.control_connection._reconnect_internal.mock_calls))
352+
329353
def test_connect_on_keyspace(self):
330354
"""
331355
Ensure clusters that connect on a keyspace, do

0 commit comments

Comments
 (0)