Skip to content

Commit 7e4f6d4

Browse files
committed
feat: adding weighted random host selection strategy
1 parent 45048db commit 7e4f6d4

File tree

8 files changed

+164
-22
lines changed

8 files changed

+164
-22
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
from threading import Lock
2626

2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
28-
from aws_advanced_python_wrapper.host_selector import (HostSelector,
29-
RandomHostSelector,
30-
RoundRobinHostSelector)
28+
from aws_advanced_python_wrapper.host_selector import (
29+
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30+
WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.log import Logger
3333
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -96,8 +96,9 @@ def connect(
9696

9797

9898
class DriverConnectionProvider(ConnectionProvider):
99-
_accepted_strategies: Dict[str, HostSelector] = \
100-
{"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
99+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100+
"round_robin": RoundRobinHostSelector(),
101+
"weighted_random": WeightedRandomHostSelector()}
101102

102103
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
103104
return True

aws_advanced_python_wrapper/host_selector.py

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from re import search
1919
from typing import TYPE_CHECKING, Dict, List, Optional, Protocol, Tuple
2020

21+
from aws_advanced_python_wrapper.utils.log import Logger
2122
from .host_availability import HostAvailability
2223

2324
if TYPE_CHECKING:
@@ -29,6 +30,8 @@
2930
from .utils.messages import Messages
3031
from .utils.properties import Properties, WrapperProperties
3132

33+
logger = Logger(__name__)
34+
3235

3336
class HostSelector(Protocol):
3437
"""
@@ -168,23 +171,92 @@ def _update_cache_properties_for_round_robin_cluster_info(self, round_robin_clus
168171

169172
for pair in host_weight_pairs:
170173
match = search(RoundRobinHostSelector._HOST_WEIGHT_PAIRS_PATTERN, pair)
174+
message = "RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"
171175
if match:
172176
host_name = match.group("host")
173177
host_weight = match.group("weight")
174178
else:
175-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
179+
logger.error(message, pair)
180+
raise AwsWrapperError(Messages.get_formatted(message, pair))
176181

177182
if len(host_name) == 0 or len(host_weight) == 0:
178-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
183+
logger.error(message, pair)
184+
raise AwsWrapperError(Messages.get_formatted(message, pair))
179185
try:
180186
weight: int = int(host_weight)
181187

182188
if weight < RoundRobinHostSelector._DEFAULT_WEIGHT:
183-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
189+
logger.error(message, pair)
190+
raise AwsWrapperError(Messages.get_formatted(message, pair))
184191

185192
round_robin_cluster_info.cluster_weights_dict[host_name] = weight
186193
except ValueError:
187-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
194+
logger.error(message, pair)
195+
raise AwsWrapperError(Messages.get_formatted(message, pair))
188196

189197
def clear_cache(self):
190198
RoundRobinHostSelector._round_robin_cache.clear()
199+
200+
201+
class WeightedRandomHostSelector(HostSelector):
202+
_DEFAULT_WEIGHT: int = 1
203+
_HOST_WEIGHT_PAIRS_PATTERN = r"((?P<host>[^:/?#]*):(?P<weight>.*))"
204+
_host_weight_map: Dict[str, int] = {}
205+
206+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
207+
208+
eligible_hosts: List[HostInfo] = [host for host in hosts if host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
209+
eligible_hosts.sort(key=lambda host: host.host, reverse=False)
210+
if len(eligible_hosts) == 0:
211+
message = "HostSelector.NoHostsMatchingRole"
212+
logger.error(message, role)
213+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
214+
215+
self._update_host_weight_map_from_string(props)
216+
217+
default_weight: int = WeightedRandomHostSelector._DEFAULT_WEIGHT
218+
if props is not None:
219+
default_weight = WrapperProperties.WEIGHTED_RANDOM_DEFAULT_WEIGHT.get_int(props)
220+
if default_weight < WeightedRandomHostSelector._DEFAULT_WEIGHT:
221+
logger.error("WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight")
222+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight"))
223+
224+
selection_list: List[HostInfo] = []
225+
for host in eligible_hosts:
226+
if host.host in self._host_weight_map:
227+
selection_list = selection_list + self._host_weight_map[host.host] * [host]
228+
else:
229+
selection_list = selection_list + default_weight * [host]
230+
231+
return random.choice(selection_list)
232+
233+
def _update_host_weight_map_from_string(self, props: Optional[Properties] = None) -> None:
234+
if props is not None:
235+
host_weights: Optional[str] = WrapperProperties.WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS.get(props)
236+
if host_weights is not None and len(host_weights) != 0:
237+
host_weight_pairs: List[str] = host_weights.split(",")
238+
239+
for pair in host_weight_pairs:
240+
match = search(WeightedRandomHostSelector._HOST_WEIGHT_PAIRS_PATTERN, pair)
241+
message = "WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"
242+
if match:
243+
host_name = match.group("host")
244+
host_weight = match.group("weight")
245+
else:
246+
logger.error(message, pair)
247+
raise AwsWrapperError(Messages.get_formatted(message, pair))
248+
249+
if len(host_name) == 0 or len(host_weight) == 0:
250+
logger.error(message, pair)
251+
raise AwsWrapperError(Messages.get_formatted(message, pair))
252+
try:
253+
weight: int = int(host_weight)
254+
255+
if weight < WeightedRandomHostSelector._DEFAULT_WEIGHT:
256+
logger.error(message, pair)
257+
raise AwsWrapperError(Messages.get_formatted(message, pair))
258+
259+
self._host_weight_map[host_name] = weight
260+
except ValueError:
261+
logger.error(message, pair)
262+
raise AwsWrapperError(Messages.get_formatted(message, pair))

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,10 @@ ReadWriteSplittingPlugin.UnsupportedHostInfoSelectorStrategy=[ReadWriteSplitting
279279

280280
RoundRobinHostSelector.ClusterInfoNone=[RoundRobinHostSelector] The round robin cluster information cache should have an entry for the current cluster, but no entry was found.
281281
RoundRobinHostSelector.RoundRobinInvalidDefaultWeight=[RoundRobinHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
282-
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1.
282+
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
283+
284+
WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs= [WeightedRandomHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
285+
WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight=[WeightedRandomHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
283286

284287
SlidingExpirationCache.CleaningUp=[SlidingExpirationCache] Cleaning up...
285288

aws_advanced_python_wrapper/sql_alchemy_connection_provider.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
from aws_advanced_python_wrapper.connection_provider import ConnectionProvider
2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
28-
from aws_advanced_python_wrapper.host_selector import (HostSelector,
29-
RandomHostSelector,
30-
RoundRobinHostSelector)
28+
from aws_advanced_python_wrapper.host_selector import (
29+
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30+
WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.messages import Messages
3333
from aws_advanced_python_wrapper.utils.properties import (Properties,
@@ -46,7 +46,9 @@ class SqlAlchemyPooledConnectionProvider(ConnectionProvider, CanReleaseResources
4646
"""
4747
_POOL_EXPIRATION_CHECK_NS: ClassVar[int] = 30 * 60_000_000_000 # 30 minutes
4848
_LEAST_CONNECTIONS: ClassVar[str] = "least_connections"
49-
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
49+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
50+
"round_robin": RoundRobinHostSelector(),
51+
"weighted_random": WeightedRandomHostSelector()}
5052
_rds_utils: ClassVar[RdsUtils] = RdsUtils()
5153
_database_pools: ClassVar[SlidingExpirationCache[PoolKey, QueuePool]] = SlidingExpirationCache(
5254
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,15 @@ class WrapperProperties:
276276
ROUND_ROBIN_HOST_WEIGHT_PAIRS = WrapperProperty("round_robin_host_weight_pairs",
277277
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
278278
"")
279+
280+
WEIGHTED_RANDOM_DEFAULT_WEIGHT = WrapperProperty("weighted_random_default_weight", "The default weight for any hosts that have not been " +
281+
"configured with the `weighted_random_host_weight_pairs` parameter.",
282+
1)
283+
284+
WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS = WrapperProperty("weighted_random_host_weight_pairs",
285+
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
286+
"")
287+
279288
# Federated Auth Plugin
280289
IDP_ENDPOINT = WrapperProperty("idp_endpoint",
281290
"The hosting URL of the Identity Provider",

docs/using-the-python-driver/ReaderSelectionStrategies.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ To balance connections to reader instances more evenly, different selection stra
44
| Reader Selection Strategy | Configuration Parameter | Description | Default Value |
55
|---------------------------|-------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
66
| `random` | This strategy does not have configuration parameters. | The random strategy is the default selection strategy. When switching to a reader connection, the reader instance will be chosen randomly from the available database instances. | N/A |
7+
| `weighted_random` | See the following rows for configuration parameters. | The weighted random strategy will be chosen randomly from the available database instances. A slight addition to the random strategy is the weighted random strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
8+
| | `weighted_random_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents the likeliness of the connection to be directed to the host. Larger the number, the more likely the connection is to be directed to the host. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |
9+
| | `weighted_random_default_weight` | This parameter value must be an integer value in the form of a `string`. This parameter represents the default weight for any hosts that have not been configured with the `weighted_random_host_weight_pairs` parameter. For example, if a connection were already established and host weights were set with `weighted_random_host_weight_pairs` but a new reader host was added to the database, the new reader host would use the default weight. <br><br> **Note:** This value must be an integer greater than or equal to 1. | `1` |
710
| `least_connections` | This strategy does not have configuration parameters. | The least connections strategy will select reader instances based on which database instance has the least number of currently active connections. Note that this strategy is only available when internal connection pools are enabled - if you set the connection property without enabling internal pools, an exception will be thrown. | N/A |
811
| `round_robin` | See the following rows for configuration parameters. | The round robin strategy will select a reader instance by taking turns with all available database instances in a cycle. A slight addition to the round robin strategy is the weighted round robin strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
912
| | `round_robin_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents how many connections should be directed to the host in one cycle through all available hosts. For example, the value `instance-1:1,instance-2:4` means that for every connection to `instance-1`, there will be four connections to `instance-2`. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |

0 commit comments

Comments
 (0)