Skip to content

Commit f71e7c9

Browse files
committed
Implementa integration tests for shard connection backof policies
Tests cover: 1. LimitedConcurrencyShardConnectionBackoffPolicy 2. NoDelayShardConnectionBackoffPolicy For both Scylla and Cassandra backend.
1 parent 6d40364 commit f71e7c9

File tree

1 file changed

+110
-2
lines changed

1 file changed

+110
-2
lines changed

tests/integration/long/test_policies.py

Lines changed: 110 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,25 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14-
14+
import inspect
15+
import os
16+
import time
1517
import unittest
18+
from typing import Optional
19+
from unittest.mock import Mock
1620

1721
from cassandra import ConsistencyLevel, Unavailable
18-
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT
22+
from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT, Session
23+
from cassandra.policies import LimitedConcurrencyShardConnectionBackoffPolicy, ConstantReconnectionPolicy, \
24+
ShardConnectionBackoffPolicy, NoDelayShardConnectionBackoffPolicy, _ScopeBucket, \
25+
_NoDelayShardConnectionBackoffScheduler
26+
from cassandra.shard_info import _ShardingInfo
1927

2028
from tests.integration import use_cluster, get_cluster, get_node, TestCluster
2129

2230

2331
def setup_module():
32+
os.environ['SCYLLA_EXT_OPTS'] = "--smp 4"
2433
use_cluster('test_cluster', [4])
2534

2635

@@ -65,3 +74,102 @@ def test_should_rethrow_on_unvailable_with_default_policy_if_cas(self):
6574
self.assertEqual(exception.consistency, ConsistencyLevel.SERIAL)
6675
self.assertEqual(exception.required_replicas, 2)
6776
self.assertEqual(exception.alive_replicas, 1)
77+
78+
79+
class ShardBackoffPolicyTests(unittest.TestCase):
80+
@classmethod
81+
def tearDownClass(cls):
82+
cluster = get_cluster()
83+
cluster.start(wait_for_binary_proto=True, wait_other_notice=True) # make sure other nodes are restarted
84+
85+
def test_limited_concurrency_1_connection_per_host(self):
86+
self._test_backoff(
87+
LimitedConcurrencyShardConnectionBackoffPolicy(
88+
backoff_policy=ConstantReconnectionPolicy(0.1),
89+
max_concurrent=1,
90+
)
91+
)
92+
93+
def test_limited_concurrency_2_connection_per_host(self):
94+
self._test_backoff(
95+
LimitedConcurrencyShardConnectionBackoffPolicy(
96+
backoff_policy=ConstantReconnectionPolicy(0.1),
97+
max_concurrent=1,
98+
)
99+
)
100+
101+
def test_no_delay(self):
102+
self._test_backoff(NoDelayShardConnectionBackoffPolicy())
103+
104+
def _test_backoff(self, shard_connection_backoff_policy: ShardConnectionBackoffPolicy):
105+
backoff_policy = None
106+
if isinstance(shard_connection_backoff_policy, LimitedConcurrencyShardConnectionBackoffPolicy):
107+
backoff_policy = shard_connection_backoff_policy.backoff_policy
108+
109+
cluster = TestCluster(
110+
shard_connection_backoff_policy=shard_connection_backoff_policy,
111+
reconnection_policy=ConstantReconnectionPolicy(0),
112+
)
113+
114+
# Collect scheduled calls and execute them right away
115+
scheduler_calls = []
116+
original_schedule = cluster.scheduler.schedule
117+
pending = 0
118+
119+
def new_schedule(delay, fn, *args, **kwargs):
120+
nonlocal pending
121+
pending+=1
122+
123+
def execute():
124+
nonlocal pending
125+
try:
126+
fn(*args, **kwargs)
127+
finally:
128+
pending-=1
129+
130+
scheduler_calls.append((delay, fn, args, kwargs))
131+
return original_schedule(0, execute)
132+
133+
cluster.scheduler.schedule = Mock(side_effect=new_schedule)
134+
135+
session = cluster.connect()
136+
sharding_info = get_sharding_info(session)
137+
138+
# Since scheduled calls executed in a separate thread we need to give them some time to complete
139+
while pending > 0:
140+
time.sleep(0.01)
141+
142+
if not sharding_info:
143+
# If it is not scylla `ShardConnectionBackoffScheduler` should not be involved
144+
for delay, fn, args, kwargs in scheduler_calls:
145+
if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler:
146+
self.fail(
147+
"in non-shard-aware case connection should be created directly, not involving ShardConnectionBackoffScheduler")
148+
return
149+
150+
sleep_time = 0
151+
if backoff_policy:
152+
schedule = backoff_policy.new_schedule()
153+
sleep_time = next(iter(schedule))
154+
155+
# Make sure that all scheduled calls have delay according to policy
156+
found_related_calls = 0
157+
for delay, fn, args, kwargs in scheduler_calls:
158+
if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler:
159+
found_related_calls += 1
160+
self.assertEqual(delay, sleep_time)
161+
self.assertLessEqual(len(session.hosts) * (sharding_info.shards_count - 1), found_related_calls)
162+
163+
164+
def get_connections_per_host(session: Session) -> dict[str, int]:
165+
host_connections = {}
166+
for host, pool in session._pools.items():
167+
host_connections[host.host_id] = len(pool._connections)
168+
return host_connections
169+
170+
171+
def get_sharding_info(session: Session) -> Optional[_ShardingInfo]:
172+
for host in session.hosts:
173+
if host.sharding_info:
174+
return host.sharding_info
175+
return None

0 commit comments

Comments
 (0)