Skip to content

Commit cf420ff

Browse files
committed
Make guard to have only one pending reconnect
ControlConnection.reconnect could be calld from many places in parallel. When it happens you get to see streak for reconnects happning one by one. This commit adds guard that makes sure there is only one pending reconnect.
1 parent 23a6e85 commit cf420ff

File tree

2 files changed

+53
-42
lines changed

2 files changed

+53
-42
lines changed

cassandra/cluster.py

+29-42
Original file line numberDiff line numberDiff line change
@@ -3546,30 +3546,6 @@ class UserTypeDoesNotExist(Exception):
35463546
pass
35473547

35483548

3549-
class _ControlReconnectionHandler(_ReconnectionHandler):
3550-
"""
3551-
Internal
3552-
"""
3553-
3554-
def __init__(self, control_connection, *args, **kwargs):
3555-
_ReconnectionHandler.__init__(self, *args, **kwargs)
3556-
self.control_connection = weakref.proxy(control_connection)
3557-
3558-
def try_reconnect(self):
3559-
return self.control_connection._reconnect_internal()
3560-
3561-
def on_reconnection(self, connection):
3562-
self.control_connection._set_new_connection(connection)
3563-
3564-
def on_exception(self, exc, next_delay):
3565-
# TODO only overridden to add logging, so add logging
3566-
if isinstance(exc, AuthenticationFailed):
3567-
return False
3568-
else:
3569-
log.debug("Error trying to reconnect control connection: %r", exc)
3570-
return True
3571-
3572-
35733549
def _watch_callback(obj_weakref, method_name, *args, **kwargs):
35743550
"""
35753551
A callback handler for the ControlConnection that tolerates
@@ -3662,6 +3638,7 @@ def __init__(self, cluster, timeout,
36623638

36633639
self._reconnection_handler = None
36643640
self._reconnection_lock = RLock()
3641+
self._reconnection_pending = False
36653642

36663643
self._event_schedule_times = {}
36673644

@@ -3818,33 +3795,43 @@ def reconnect(self):
38183795
if self._is_shutdown:
38193796
return
38203797

3798+
if self._reconnection_pending:
3799+
return
3800+
self._reconnection_pending = True
3801+
38213802
self._submit(self._reconnect)
38223803

3823-
def _reconnect(self):
3804+
def _reconnect(self, schedule = None):
38243805
log.debug("[control connection] Attempting to reconnect")
3806+
if self._is_shutdown:
3807+
return
3808+
38253809
try:
38263810
self._set_new_connection(self._reconnect_internal())
3811+
self._reconnection_pending = False
3812+
return
38273813
except NoHostAvailable:
3828-
# make a retry schedule (which includes backoff)
3829-
schedule = self._cluster.reconnection_policy.new_schedule()
3814+
log.debug("[control connection] Reconnection plan is exhausted, scheduling new reconnection attempt")
3815+
except Exception as ex:
3816+
log.debug("[control connection] Unexpected exception during reconnect, scheduling new reconnection attempt: %s", ex)
38303817

3831-
with self._reconnection_lock:
3818+
if schedule is None:
3819+
schedule = self._cluster.reconnection_policy.new_schedule()
38323820

3833-
# cancel existing reconnection attempts
3834-
if self._reconnection_handler:
3835-
self._reconnection_handler.cancel()
3821+
try:
3822+
next_delay = next(schedule)
3823+
except StopIteration:
3824+
# the schedule has been exhausted
3825+
schedule = self._cluster.reconnection_policy.new_schedule()
3826+
try:
3827+
next_delay = next(schedule)
3828+
except StopIteration:
3829+
next_delay = 0
38363830

3837-
# when a connection is successfully made, _set_new_connection
3838-
# will be called with the new connection and then our
3839-
# _reconnection_handler will be cleared out
3840-
self._reconnection_handler = _ControlReconnectionHandler(
3841-
self, self._cluster.scheduler, schedule,
3842-
self._get_and_set_reconnection_handler,
3843-
new_handler=None)
3844-
self._reconnection_handler.start()
3845-
except Exception:
3846-
log.debug("[control connection] error reconnecting", exc_info=True)
3847-
raise
3831+
if next_delay == 0:
3832+
self._submit(self._reconnect)
3833+
else:
3834+
self._cluster.scheduler.schedule(next_delay, partial(self._reconnect, schedule))
38483835

38493836
def _get_and_set_reconnection_handler(self, new_handler):
38503837
"""

tests/integration/standard/test_cluster.py

+24
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,30 @@ def test_invalid_protocol_negotation(self):
326326
cluster.connect()
327327
cluster.shutdown()
328328

329+
def test_control_connection_reconnect(self):
330+
"""
331+
Ensure clusters that connect on a keyspace, do
332+
"""
333+
cassandra.cluster.log.setLevel(logging.DEBUG)
334+
335+
cluster = TestCluster()
336+
_ = cluster.connect()
337+
338+
cluster.control_connection._reconnect_internal = Mock(wraps=cluster.control_connection._reconnect_internal)
339+
340+
cluster.control_connection.reconnect()
341+
cluster.control_connection.reconnect()
342+
cluster.control_connection.reconnect()
343+
cluster.control_connection.reconnect()
344+
345+
while cluster.control_connection._reconnection_pending:
346+
time.sleep(0.1)
347+
348+
self.assertFalse(cluster.control_connection._connection.is_closed)
349+
self.assertFalse(cluster.control_connection._connection.is_defunct)
350+
self.assertTrue(cluster.control_connection.refresh_schema())
351+
self.assertEqual(1, len(cluster.control_connection._reconnect_internal.mock_calls))
352+
329353
def test_connect_on_keyspace(self):
330354
"""
331355
Ensure clusters that connect on a keyspace, do

0 commit comments

Comments
 (0)