Skip to content

Commit 78be473

Browse files
committed
feat: adding weighted random host selection strategy
1 parent 0f65cba commit 78be473

File tree

8 files changed

+114
-3
lines changed

8 files changed

+114
-3
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,9 @@ def connect(
9696

9797

9898
class DriverConnectionProvider(ConnectionProvider):
99-
_accepted_strategies: Dict[str, HostSelector] = \
100-
{"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
99+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100+
"round_robin": RoundRobinHostSelector(),
101+
"weighted_random": RandomHostSelector()}
101102

102103
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
103104
return True

aws_advanced_python_wrapper/host_selector.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,61 @@ def _update_cache_properties_for_round_robin_cluster_info(self, round_robin_clus
188188

189189
def clear_cache(self):
190190
RoundRobinHostSelector._round_robin_cache.clear()
191+
192+
193+
class WeightedRandomHostSelector(HostSelector):
194+
_DEFAULT_WEIGHT: int = 1
195+
_HOST_WEIGHT_PAIRS_PATTERN = r"((?P<host>[^:/?#]*):(?P<weight>.*))"
196+
_host_weight_map: Dict[str, int] = {}
197+
198+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
199+
200+
eligible_hosts: List[HostInfo] = [host for host in hosts if host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
201+
eligible_hosts.sort(key=lambda host: host.host, reverse=False)
202+
if len(eligible_hosts) == 0:
203+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
204+
205+
self._update_host_weight_map_from_string()
206+
207+
default_weight: int = RoundRobinHostSelector._DEFAULT_WEIGHT
208+
if props is not None:
209+
props_weight = WrapperProperties.WEIGHTED_RANDOM_DEFAULT_WEIGHT.get_int(props)
210+
if props_weight < WeightedRandomHostSelector._DEFAULT_WEIGHT:
211+
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidDefaultWeight"))
212+
elif props_weight:
213+
default_weight = props_weight
214+
215+
selection_list: List[HostInfo] = []
216+
for host in eligible_hosts:
217+
if host.host in self._host_weight_map:
218+
selection_list = selection_list + self._host_weight_map[host.host] * [host]
219+
else:
220+
selection_list = selection_list + default_weight * [host]
221+
222+
return random.choice(selection_list)
223+
224+
def _update_host_weight_map_from_string(self, props: Optional[Properties] = None) -> None:
225+
if props is not None:
226+
host_weights: Optional[str] = WrapperProperties.WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS.get(props)
227+
if host_weights is not None and len(host_weights) != 0:
228+
host_weight_pairs: List[str] = host_weights.split(",")
229+
230+
for pair in host_weight_pairs:
231+
match = search(WeightedRandomHostSelector._HOST_WEIGHT_PAIRS_PATTERN, pair)
232+
if match:
233+
host_name = match.group("host")
234+
host_weight = match.group("weight")
235+
else:
236+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"))
237+
238+
if len(host_name) == 0 or len(host_weight) == 0:
239+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"))
240+
try:
241+
weight: int = int(host_weight)
242+
243+
if weight < self._DEFAULT_WEIGHT:
244+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"))
245+
246+
self._host_weight_map[host_name] = weight
247+
except ValueError:
248+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"))

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,8 @@ RoundRobinHostSelector.ClusterInfoNone=[RoundRobinHostSelector] The round robin
280280
RoundRobinHostSelector.RoundRobinInvalidDefaultWeight=[RoundRobinHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
281281
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1.
282282

283+
WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs= [WeightedRandomHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1.
284+
283285
SlidingExpirationCache.CleaningUp=[SlidingExpirationCache] Cleaning up...
284286

285287
SqlAlchemyPooledConnectionProvider.PoolNone=[SqlAlchemyPooledConnectionProvider] Attempted to find or create a pool for '{}' but the result of the attempt evaluated to None.

aws_advanced_python_wrapper/sql_alchemy_connection_provider.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ class SqlAlchemyPooledConnectionProvider(ConnectionProvider, CanReleaseResources
4646
"""
4747
_POOL_EXPIRATION_CHECK_NS: ClassVar[int] = 30 * 60_000_000_000 # 30 minutes
4848
_LEAST_CONNECTIONS: ClassVar[str] = "least_connections"
49-
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
49+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
50+
"round_robin": RoundRobinHostSelector(),
51+
"weighted_random": RandomHostSelector()}
5052
_rds_utils: ClassVar[RdsUtils] = RdsUtils()
5153
_database_pools: ClassVar[SlidingExpirationCache[PoolKey, QueuePool]] = SlidingExpirationCache(
5254
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,15 @@ class WrapperProperties:
271271
ROUND_ROBIN_HOST_WEIGHT_PAIRS = WrapperProperty("round_robin_host_weight_pairs",
272272
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
273273
"")
274+
275+
WEIGHTED_RANDOM_DEFAULT_WEIGHT = WrapperProperty("weighted_random_default_weight", "The default weight for any hosts that have not been " +
276+
"configured with the `weighted_random_host_weight_pairs` parameter.",
277+
1)
278+
279+
WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS = WrapperProperty("weighted_random_host_weight_pairs",
280+
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
281+
"")
282+
274283
# Federated Auth Plugin
275284
IDP_ENDPOINT = WrapperProperty("idp_endpoint",
276285
"The hosting URL of the Identity Provider",

docs/using-the-python-driver/ReaderSelectionStrategies.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ To balance connections to reader instances more evenly, different selection stra
44
| Reader Selection Strategy | Configuration Parameter | Description | Default Value |
55
|---------------------------|-------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
66
| `random` | This strategy does not have configuration parameters. | The random strategy is the default selection strategy. When switching to a reader connection, the reader instance will be chosen randomly from the available database instances. | N/A |
7+
| `weighted_random` | See the following rows for configuration parameters. | The weighted random strategy will be chosen randomly from the available database instances. A slight addition to the random strategy is the weighted random strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
8+
| | `weighted_random_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents the likeliness of the connection to be directed to the host. Larger the number, the more likely the connection is to be directed to the host. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |
9+
| | `weighted_random_default_weight` | This parameter value must be an integer value in the form of a `string`. This parameter represents the default weight for any hosts that have not been configured with the `weighted_random_host_weight_pairs` parameter. For example, if a connection were already established and host weights were set with `weighted_random_weight_pairs` but a new reader host was added to the database, the new reader host would use the default weight. <br><br> **Note:** This value must be an integer greater than or equal to 1. | `1` |
710
| `least_connections` | This strategy does not have configuration parameters. | The least connections strategy will select reader instances based on which database instance has the least number of currently active connections. Note that this strategy is only available when internal connection pools are enabled - if you set the connection property without enabling internal pools, an exception will be thrown. | N/A |
811
| `round_robin` | See the following rows for configuration parameters. | The round robin strategy will select a reader instance by taking turns with all available database instances in a cycle. A slight addition to the round robin strategy is the weighted round robin strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
912
| | `round_robin_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents how many connections should be directed to the host in one cycle through all available hosts. For example, the value `instance-1:1,instance-2:4` means that for every connection to `instance-1`, there will be four connections to `instance-2`. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |

docs/using-the-python-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ conn = AwsWrapperConnection.connect(psycopg.Connection.connect, **params)
104104
| `random` | The random strategy is the default connection strategy. When switching to a reader connection, the reader instance will be chosen randomly from the available database instances. | N/A |
105105
| `least_connections` | The least connections strategy will select reader instances based on which database instance has the least number of currently active connections. Note that this strategy is only available when internal connection pools are enabled - if you set the connection property without enabling internal pools, an exception will be thrown. | N/A |
106106
| `round_robin` | The round robin strategy will select a reader instance by taking turns with all available database instances in a cycle. A slight addition to the round robin strategy is the weighted round robin strategy, where more connections will be passed to reader instances based on user specified connection properties. | See the rows `round_robin_host_weight_pairs` and `round_robin_default_weight` for configuration parameters |
107+
| `weighted_random` | The weighted random strategy will be chosen randomly from the available database instances. A slight addition to the random strategy is the weighted random strategy, where more connections will be passed to reader instances based on user specified connection properties. | See the rows `round_robin_host_weight_pairs` and `round_robin_default_weight` for configuration parameters |
107108
| `round_robin_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents how many connections should be directed to the host in one cycle through all available hosts. For example, the value `instance-1:1,instance-2:4` means that for every connection to `instance-1`, there will be four connections to `instance-2`. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `""` (empty string) |
108109
| `round_robin_default_weight` | This parameter value must be an integer value. This parameter represents the default weight for any hosts that have not been configured with the `round_robin_host_weight_pairs` parameter. For example, if a connection were already established and host weights were set with `round_robin_host_weight_pairs` but a new reader host was added to the database, the new reader host would use the default weight. <br><br> **Note:** This value must be an integer greater than or equal to 1. | 1 |
109110
| `fastest_response` | The fastest_response strategy will select reader instances based on which database instance has the fastest response time. Note that this strategy requires that the `fastest_response_strategy` and `read_write_splitting` plugins are both enabled. See [`Fastest Response Strategy Plugin`](./UsingTheFastestResponseStrategyPlugin.md) | N/A |
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import pytest
2+
3+
from aws_advanced_python_wrapper.host_availability import HostAvailability
4+
from aws_advanced_python_wrapper.host_selector import \
5+
WeightedRandomHostSelector
6+
from aws_advanced_python_wrapper.hostinfo import HostInfo, HostRole
7+
from aws_advanced_python_wrapper.utils.properties import Properties
8+
9+
HOST_ROLE = HostRole.READER
10+
11+
12+
@pytest.mark.parametrize("execution_number", range(50))
13+
def test_get_host_given_unavailable_host(execution_number):
14+
unavailable_host: HostInfo = HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE)
15+
available_host: HostInfo = HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE)
16+
17+
host_selector = WeightedRandomHostSelector()
18+
actual_host = host_selector.get_host((unavailable_host, available_host), HOST_ROLE, Properties())
19+
20+
assert available_host == actual_host
21+
22+
23+
@pytest.mark.parametrize("execution_number", range(50))
24+
def test_get_host_given_multiple_unavailable_hosts(execution_number):
25+
hosts = (
26+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
27+
HostInfo(host="some_unavailable_host", role=HOST_ROLE, availability=HostAvailability.UNAVAILABLE),
28+
HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE),
29+
HostInfo(host="some_available_host", role=HOST_ROLE, availability=HostAvailability.AVAILABLE)
30+
)
31+
32+
host_selector = WeightedRandomHostSelector()
33+
actual_host = host_selector.get_host(hosts, HOST_ROLE, Properties())
34+
35+
assert HostAvailability.AVAILABLE == actual_host.get_availability()

0 commit comments

Comments
 (0)