|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 |
| - |
| 14 | +import inspect |
| 15 | +import os |
| 16 | +import time |
15 | 17 | import unittest
|
| 18 | +from typing import Optional |
| 19 | +from unittest.mock import Mock |
16 | 20 |
|
17 | 21 | from cassandra import ConsistencyLevel, Unavailable
|
18 |
| -from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT |
| 22 | +from cassandra.cluster import ExecutionProfile, EXEC_PROFILE_DEFAULT, Session |
| 23 | +from cassandra.policies import LimitedConcurrencyShardConnectionBackoffPolicy, ConstantReconnectionPolicy, \ |
| 24 | + ShardConnectionBackoffPolicy, NoDelayShardConnectionBackoffPolicy, _ScopeBucket, \ |
| 25 | + _NoDelayShardConnectionBackoffScheduler |
| 26 | +from cassandra.shard_info import _ShardingInfo |
19 | 27 |
|
20 | 28 | from tests.integration import use_cluster, get_cluster, get_node, TestCluster
|
21 | 29 |
|
22 | 30 |
|
23 | 31 | def setup_module():
|
| 32 | + os.environ['SCYLLA_EXT_OPTS'] = "--smp 8" |
24 | 33 | use_cluster('test_cluster', [4])
|
25 | 34 |
|
26 | 35 |
|
@@ -65,3 +74,90 @@ def test_should_rethrow_on_unvailable_with_default_policy_if_cas(self):
|
65 | 74 | self.assertEqual(exception.consistency, ConsistencyLevel.SERIAL)
|
66 | 75 | self.assertEqual(exception.required_replicas, 2)
|
67 | 76 | self.assertEqual(exception.alive_replicas, 1)
|
| 77 | + |
| 78 | + |
| 79 | +class ShardBackoffPolicyTests(unittest.TestCase): |
| 80 | + def test_limited_concurrency_1_connection_per_host(self): |
| 81 | + self._test_backoff( |
| 82 | + LimitedConcurrencyShardConnectionBackoffPolicy( |
| 83 | + backoff_policy=ConstantReconnectionPolicy(0.1), |
| 84 | + max_concurrent=1, |
| 85 | + ) |
| 86 | + ) |
| 87 | + |
| 88 | + def test_limited_concurrency_2_connection_per_host(self): |
| 89 | + self._test_backoff( |
| 90 | + LimitedConcurrencyShardConnectionBackoffPolicy( |
| 91 | + backoff_policy=ConstantReconnectionPolicy(0.1), |
| 92 | + max_concurrent=1, |
| 93 | + ) |
| 94 | + ) |
| 95 | + |
| 96 | + def test_no_delay(self): |
| 97 | + self._test_backoff(NoDelayShardConnectionBackoffPolicy()) |
| 98 | + |
| 99 | + def _test_backoff(self, shard_connection_backoff_policy: ShardConnectionBackoffPolicy): |
| 100 | + backoff_policy = None |
| 101 | + if isinstance(shard_connection_backoff_policy, LimitedConcurrencyShardConnectionBackoffPolicy): |
| 102 | + backoff_policy = shard_connection_backoff_policy.backoff_policy |
| 103 | + |
| 104 | + cluster = TestCluster( |
| 105 | + shard_connection_backoff_policy=shard_connection_backoff_policy, |
| 106 | + reconnection_policy=ConstantReconnectionPolicy(0), |
| 107 | + ) |
| 108 | + |
| 109 | + # Collect scheduled calls and execute them right away |
| 110 | + scheduler_calls = [] |
| 111 | + original_schedule = cluster.scheduler.schedule |
| 112 | + pending = 0 |
| 113 | + |
| 114 | + def new_schedule(delay, fn, *args, **kwargs): |
| 115 | + nonlocal pending |
| 116 | + pending+=1 |
| 117 | + |
| 118 | + def execute(): |
| 119 | + nonlocal pending |
| 120 | + try: |
| 121 | + fn(*args, **kwargs) |
| 122 | + finally: |
| 123 | + pending-=1 |
| 124 | + |
| 125 | + scheduler_calls.append((delay, fn, args, kwargs)) |
| 126 | + return original_schedule(0, execute) |
| 127 | + |
| 128 | + cluster.scheduler.schedule = Mock(side_effect=new_schedule) |
| 129 | + |
| 130 | + session = cluster.connect() |
| 131 | + sharding_info = get_sharding_info(session) |
| 132 | + |
| 133 | + # Since scheduled calls executed in a separate thread we need to give them some time to complete |
| 134 | + while pending > 0: |
| 135 | + time.sleep(0.01) |
| 136 | + |
| 137 | + if not sharding_info: |
| 138 | + # If it is not scylla `ShardConnectionBackoffScheduler` should not be involved |
| 139 | + for delay, fn, args, kwargs in scheduler_calls: |
| 140 | + if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler: |
| 141 | + self.fail( |
| 142 | + "in non-shard-aware case connection should be created directly, not involving ShardConnectionBackoffScheduler") |
| 143 | + return |
| 144 | + |
| 145 | + sleep_time = 0 |
| 146 | + if backoff_policy: |
| 147 | + schedule = backoff_policy.new_schedule() |
| 148 | + sleep_time = next(iter(schedule)) |
| 149 | + |
| 150 | + # Make sure that all scheduled calls have delay according to policy |
| 151 | + found_related_calls = 0 |
| 152 | + for delay, fn, args, kwargs in scheduler_calls: |
| 153 | + if fn.__self__.__class__ is _ScopeBucket or fn.__self__.__class__ is _NoDelayShardConnectionBackoffScheduler: |
| 154 | + found_related_calls += 1 |
| 155 | + self.assertEqual(delay, sleep_time) |
| 156 | + self.assertLessEqual(len(session.hosts) * (sharding_info.shards_count - 1), found_related_calls) |
| 157 | + |
| 158 | + |
| 159 | +def get_sharding_info(session: Session) -> Optional[_ShardingInfo]: |
| 160 | + for host in session.hosts: |
| 161 | + if host.sharding_info: |
| 162 | + return host.sharding_info |
| 163 | + return None |
0 commit comments