Skip to content

Commit 6d40364

Browse files
committed
Implement LimitedConcurrencyShardConnectionBackoffPolicy
This policy is an implementation of `ShardConnectionBackoffPolicy`. Its primary purpose is to prevent connection storms by imposing restrictions on the number of concurrent pending connections per host and backoff time between each connection attempt.
1 parent eb8d396 commit 6d40364

File tree

3 files changed

+422
-8
lines changed

3 files changed

+422
-8
lines changed

cassandra/policies.py

Lines changed: 233 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
from __future__ import annotations
1515

1616
import random
17-
1817
from collections import namedtuple
18+
from functools import partial
1919
from itertools import islice, cycle, groupby, repeat
2020
import logging
2121
from random import randint, shuffle
2222
from threading import Lock
2323
import socket
2424
import warnings
25-
from typing import Callable, TYPE_CHECKING
25+
from typing import Callable, TYPE_CHECKING, Iterator, List, Tuple
2626
from abc import ABC, abstractmethod
2727
from cassandra import WriteType as WT
2828

@@ -999,6 +999,237 @@ def shutdown(self):
999999
self.is_shutdown = True
10001000

10011001

1002+
class ShardConnectionBackoffSchedule(ABC):
1003+
@abstractmethod
1004+
def new_schedule(self) -> Iterator[float]:
1005+
"""
1006+
This should return a finite or infinite iterable of delays (each as a
1007+
floating point number of seconds).
1008+
Note that if the iterable is finite, schedule will be recreated right after iterable is exhausted.
1009+
"""
1010+
raise NotImplementedError()
1011+
1012+
1013+
class ConstantShardConnectionBackoffSchedule(ShardConnectionBackoffSchedule):
1014+
"""
1015+
A :class:`.ShardConnectionBackoffSchedule` subclass which introduce a constant delay with jitter
1016+
between shard connections.
1017+
"""
1018+
1019+
def __init__(self, delay: float, jitter: float = 0.0):
1020+
"""
1021+
`delay` should be a floating point number of seconds to wait in-between
1022+
each connection attempt.
1023+
1024+
`jitter` is a random jitter in seconds.
1025+
"""
1026+
if delay < 0:
1027+
raise ValueError("delay must not be negative")
1028+
if jitter < 0:
1029+
raise ValueError("jitter must not be negative")
1030+
1031+
self.delay = delay
1032+
self.jitter = jitter
1033+
1034+
def new_schedule(self):
1035+
if self.jitter == 0:
1036+
yield from repeat(self.delay)
1037+
def iterator():
1038+
while True:
1039+
yield self.delay + random.uniform(0.0, self.jitter)
1040+
return iterator()
1041+
1042+
1043+
class LimitedConcurrencyShardConnectionBackoffPolicy(ShardConnectionBackoffPolicy):
1044+
"""
1045+
A shard connection backoff policy that allows only `max_concurrent` concurrent connections per `host_id`.
1046+
1047+
For backoff calculation, it requires either a `cassandra.policies.ShardConnectionBackoffSchedule` or
1048+
a `cassandra.policies.ReconnectionPolicy`, as both expose the same API.
1049+
1050+
It spawns threads when there are pending requests, maximum number of threads is `max_concurrent` multiplied by nodes in the cluster.
1051+
When thread is spawn it initiates backoff schedule, which is local for this thread.
1052+
If there are no remaining requests for that `host_id`, thread is killed.
1053+
1054+
This policy also prevents multiple pending or scheduled connections for the same (host, shard) pair;
1055+
any duplicate attempts to schedule a connection are silently ignored.
1056+
"""
1057+
backoff_policy: ShardConnectionBackoffSchedule | ReconnectionPolicy
1058+
1059+
max_concurrent: int
1060+
"""
1061+
Max concurrent connection creation requests per scope.
1062+
"""
1063+
1064+
def __init__(
1065+
self,
1066+
backoff_policy: ShardConnectionBackoffSchedule | ReconnectionPolicy,
1067+
max_concurrent: int = 1,
1068+
):
1069+
if not isinstance(backoff_policy, (ShardConnectionBackoffSchedule, ReconnectionPolicy)):
1070+
raise ValueError("backoff_policy must be a ShardConnectionBackoffSchedule or ReconnectionPolicy")
1071+
if max_concurrent < 1:
1072+
raise ValueError("max_concurrent must be a positive integer")
1073+
self.backoff_policy = backoff_policy
1074+
self.max_concurrent = max_concurrent
1075+
1076+
def new_connection_scheduler(self, scheduler: _Scheduler) -> ShardConnectionScheduler:
1077+
return _LimitedConcurrencyShardConnectionScheduler(scheduler, self.backoff_policy, self.max_concurrent)
1078+
1079+
1080+
class _ScopeBucket:
1081+
"""
1082+
Holds information for a shard connection backoff policy scope, schedules and executes requests to create connection.
1083+
"""
1084+
session: _Scheduler
1085+
backoff_policy: ShardConnectionBackoffSchedule
1086+
lock: Lock
1087+
is_shutdown: bool = False
1088+
1089+
max_concurrent: int
1090+
"""
1091+
Max concurrent connection creation requests in the scope.
1092+
"""
1093+
1094+
currently_pending: int
1095+
"""
1096+
Number of currently pending connections.
1097+
"""
1098+
1099+
items: List[Callable[[], None]]
1100+
"""
1101+
List of scheduled create connections requests.
1102+
"""
1103+
1104+
def __init__(
1105+
self,
1106+
scheduler: _Scheduler,
1107+
backoff_policy: ShardConnectionBackoffSchedule,
1108+
max_concurrent: int,
1109+
):
1110+
self.items = []
1111+
self.scheduler = scheduler
1112+
self.backoff_policy = backoff_policy
1113+
self.lock = Lock()
1114+
self.max_concurrent = max_concurrent
1115+
self.currently_pending = 0
1116+
1117+
def _get_delay(self, schedule: Iterator[float]) -> Tuple[Iterator[float], float]:
1118+
try:
1119+
return schedule, next(schedule)
1120+
except StopIteration:
1121+
# A bit of trickery to avoid having lock around self.schedule
1122+
schedule = self.backoff_policy.new_schedule()
1123+
delay = next(schedule)
1124+
self.schedule = schedule
1125+
return schedule, delay
1126+
1127+
def _run(self, schedule: Iterator[float]):
1128+
if self.is_shutdown:
1129+
return
1130+
1131+
with self.lock:
1132+
try:
1133+
request = self.items.pop(0)
1134+
except IndexError:
1135+
# Just in case
1136+
if self.currently_pending > 0:
1137+
self.currently_pending -= 1
1138+
# When items are exhausted reset schedule to ensure that new items going to get another schedule
1139+
# It is important for exponential policy
1140+
return
1141+
1142+
try:
1143+
request()
1144+
finally:
1145+
schedule, delay = self._get_delay(schedule)
1146+
self.scheduler.schedule(delay, self._run, schedule)
1147+
1148+
def schedule_new_connection(self, cb: Callable[[], None]):
1149+
with self.lock:
1150+
if self.is_shutdown:
1151+
return
1152+
self.items.append(cb)
1153+
if self.currently_pending < self.max_concurrent:
1154+
self.currently_pending += 1
1155+
schedule = self.backoff_policy.new_schedule()
1156+
delay = next(schedule)
1157+
self.scheduler.schedule(delay, self._run, schedule)
1158+
1159+
def shutdown(self):
1160+
with self.lock:
1161+
self.is_shutdown = True
1162+
1163+
1164+
class _LimitedConcurrencyShardConnectionScheduler(ShardConnectionScheduler):
1165+
"""
1166+
A scheduler for ``cassandra.policies.LimitedConcurrencyShardConnectionPolicy``.
1167+
1168+
Limits concurrency for connection creation requests to ``max_concurrent`` per host_id.
1169+
"""
1170+
1171+
already_scheduled: set[tuple[str, int]]
1172+
"""
1173+
Set of (host_id, shard_id) of scheduled or pending requests.
1174+
"""
1175+
1176+
per_host_scope: dict[str, _ScopeBucket]
1177+
"""
1178+
Scopes storage, key is host_id, value is an instance that holds scope data.
1179+
"""
1180+
1181+
backoff_policy: ShardConnectionBackoffSchedule
1182+
scheduler: _Scheduler
1183+
lock: Lock
1184+
is_shutdown: bool = False
1185+
1186+
max_concurrent: int
1187+
"""
1188+
Max concurrent connection creation requests per host_id.
1189+
"""
1190+
1191+
def __init__(
1192+
self,
1193+
scheduler: _Scheduler,
1194+
backoff_policy: ShardConnectionBackoffSchedule,
1195+
max_concurrent: int,
1196+
):
1197+
self.already_scheduled = set()
1198+
self.per_host_scope = {}
1199+
self.backoff_policy = backoff_policy
1200+
self.max_concurrent = max_concurrent
1201+
self.scheduler = scheduler
1202+
self.lock = Lock()
1203+
1204+
def _execute(self, host_id: str, shard_id: int, method: Callable[[], None]):
1205+
if self.is_shutdown:
1206+
return
1207+
try:
1208+
method()
1209+
finally:
1210+
with self.lock:
1211+
self.already_scheduled.remove((host_id, shard_id))
1212+
1213+
def schedule(self, host_id: str, shard_id: int, method: Callable[[], None]) -> bool:
1214+
with self.lock:
1215+
if self.is_shutdown or (host_id, shard_id) in self.already_scheduled:
1216+
return False
1217+
self.already_scheduled.add((host_id, shard_id))
1218+
1219+
scope_info = self.per_host_scope.get(host_id)
1220+
if not scope_info:
1221+
scope_info = _ScopeBucket(self.scheduler, self.backoff_policy, self.max_concurrent)
1222+
self.per_host_scope[host_id] = scope_info
1223+
scope_info.schedule_new_connection(partial(self._execute, host_id, shard_id, method))
1224+
return True
1225+
1226+
def shutdown(self):
1227+
with self.lock:
1228+
self.is_shutdown = True
1229+
for scope in self.per_host_scope.values():
1230+
scope.shutdown()
1231+
1232+
10021233
class RetryPolicy(object):
10031234
"""
10041235
A policy that describes whether to retry, rethrow, or ignore coordinator

0 commit comments

Comments
 (0)