Skip to content

Fix dead locks in connection pool #499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 55 additions & 46 deletions cassandra/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,16 +580,18 @@ def return_connection(self, connection, stream_was_orphaned=False):
return
self._is_replacing = True
self._session.submit(self._replace, connection)
else:
if connection in self._trash:
with connection.lock:
if connection.in_flight == len(connection.orphaned_request_ids):
with self._lock:
if connection in self._trash:
self._trash.remove(connection)
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
connection.close()
return
elif connection in self._trash:
with connection.lock:
no_pending_requests = connection.in_flight <= len(connection.orphaned_request_ids)
if no_pending_requests:
with self._lock:
close_connection = False
if connection in self._trash:
self._trash.remove(connection)
close_connection = True
if close_connection:
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
connection.close()

def on_orphaned_stream_released(self):
"""
Expand Down Expand Up @@ -756,23 +758,26 @@ def _open_connection_to_missing_shard(self, shard_id):
)
old_conn = None
with self._lock:
if self.is_shutdown:
conn.close()
return
if conn.features.shard_id in self._connections.keys():
# Move the current connection to the trash and use the new one from now on
old_conn = self._connections[conn.features.shard_id]
log.debug(
"Replacing overloaded connection (%s) with (%s) for shard %i for host %s",
id(old_conn),
id(conn),
conn.features.shard_id,
self.host
)
if self._keyspace:
conn.set_keyspace_blocking(self._keyspace)
is_shutdown = self.is_shutdown
if not is_shutdown:
if conn.features.shard_id in self._connections.keys():
# Move the current connection to the trash and use the new one from now on
old_conn = self._connections[conn.features.shard_id]
log.debug(
"Replacing overloaded connection (%s) with (%s) for shard %i for host %s",
id(old_conn),
id(conn),
conn.features.shard_id,
self.host
)
if self._keyspace:
conn.set_keyspace_blocking(self._keyspace)
self._connections[conn.features.shard_id] = conn

if is_shutdown:
conn.close()
return

self._connections[conn.features.shard_id] = conn
if old_conn is not None:
remaining = old_conn.in_flight - len(old_conn.orphaned_request_ids)
if remaining == 0:
Expand All @@ -792,10 +797,11 @@ def _open_connection_to_missing_shard(self, shard_id):
remaining,
)
with self._lock:
if self.is_shutdown:
old_conn.close()
else:
is_shutdown = self.is_shutdown
if not is_shutdown:
self._trash.add(old_conn)
if is_shutdown:
conn.close()
num_missing_or_needing_replacement = self.num_missing_or_needing_replacement
log.debug(
"Connected to %s/%i shards on host %s (%i missing or needs replacement)",
Expand Down Expand Up @@ -1132,15 +1138,18 @@ def return_connection(self, connection, stream_was_orphaned=False):
self.shutdown()
else:
self._replace(connection)
else:
if connection in self._trash:
with connection.lock:
if connection.in_flight == 0:
with self._lock:
if connection in self._trash:
self._trash.remove(connection)
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
connection.close()
elif connection in self._trash:
with connection.lock:
no_pending_requests = connection.in_flight <= len(connection.orphaned_request_ids)
if no_pending_requests:
with self._lock:
close_connection = False
if connection in self._trash:
self._trash.remove(connection)
close_connection = True
if close_connection:
log.debug("Closing trashed connection (%s) to %s", id(connection), self.host)
connection.close()
return

core_conns = self._session.cluster.get_core_connections_per_host(self.host_distance)
Expand Down Expand Up @@ -1175,14 +1184,14 @@ def _maybe_trash_connection(self, connection):
new_connections.remove(connection)
self._connections = new_connections

with connection.lock:
if connection.in_flight == 0:
log.debug("Skipping trash and closing unused connection (%s) to %s", id(connection), self.host)
connection.close()

# skip adding it to the trash if we're already closing it
return

if did_trash:
with connection.lock:
no_pending_requests = connection.in_flight <= len(connection.orphaned_request_ids)
if no_pending_requests:
log.debug("Skipping trash and closing unused connection (%s) to %s", id(connection), self.host)
connection.close()
return
with self._lock:
self._trash.add(connection)

if did_trash:
Expand Down
Loading