Skip to content

Commit 5284e12

Browse files
committed
feat: adding weighted random host selection strategy
1 parent 0f65cba commit 5284e12

9 files changed

+164
-23
lines changed

aws_advanced_python_wrapper/connection_provider.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
from threading import Lock
2626

2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
28-
from aws_advanced_python_wrapper.host_selector import (HostSelector,
29-
RandomHostSelector,
30-
RoundRobinHostSelector)
28+
from aws_advanced_python_wrapper.host_selector import (
29+
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30+
WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.log import Logger
3333
from aws_advanced_python_wrapper.utils.messages import Messages
@@ -96,8 +96,9 @@ def connect(
9696

9797

9898
class DriverConnectionProvider(ConnectionProvider):
99-
_accepted_strategies: Dict[str, HostSelector] = \
100-
{"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
99+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
100+
"round_robin": RoundRobinHostSelector(),
101+
"weighted_random": WeightedRandomHostSelector()}
101102

102103
def accepts_host_info(self, host_info: HostInfo, props: Properties) -> bool:
103104
return True

aws_advanced_python_wrapper/host_selector.py

Lines changed: 75 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
from .utils.messages import Messages
3030
from .utils.properties import Properties, WrapperProperties
3131

32+
logger = Logger(__name__)
33+
3234

3335
class HostSelector(Protocol):
3436
"""
@@ -168,23 +170,92 @@ def _update_cache_properties_for_round_robin_cluster_info(self, round_robin_clus
168170

169171
for pair in host_weight_pairs:
170172
match = search(RoundRobinHostSelector._HOST_WEIGHT_PAIRS_PATTERN, pair)
173+
message = "RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"
171174
if match:
172175
host_name = match.group("host")
173176
host_weight = match.group("weight")
174177
else:
175-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
178+
logger.error(message, pair)
179+
raise AwsWrapperError(Messages.get_formatted(message), pair)
176180

177181
if len(host_name) == 0 or len(host_weight) == 0:
178-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
182+
logger.error(message, pair)
183+
raise AwsWrapperError(Messages.get_formatted(message), pair)
179184
try:
180185
weight: int = int(host_weight)
181186

182187
if weight < RoundRobinHostSelector._DEFAULT_WEIGHT:
183-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
188+
logger.error(message, pair)
189+
raise AwsWrapperError(Messages.get_formatted(message), pair)
184190

185191
round_robin_cluster_info.cluster_weights_dict[host_name] = weight
186192
except ValueError:
187-
raise AwsWrapperError(Messages.get("RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs"))
193+
logger.error(message, pair)
194+
raise AwsWrapperError(Messages.get_formatted(message), pair)
188195

189196
def clear_cache(self):
190197
RoundRobinHostSelector._round_robin_cache.clear()
198+
199+
200+
class WeightedRandomHostSelector(HostSelector):
201+
_DEFAULT_WEIGHT: int = 1
202+
_HOST_WEIGHT_PAIRS_PATTERN = r"((?P<host>[^:/?#]*):(?P<weight>.*))"
203+
_host_weight_map: Dict[str, int] = {}
204+
205+
def get_host(self, hosts: Tuple[HostInfo, ...], role: HostRole, props: Optional[Properties] = None) -> HostInfo:
206+
207+
eligible_hosts: List[HostInfo] = [host for host in hosts if host.role == role and host.get_availability() == HostAvailability.AVAILABLE]
208+
eligible_hosts.sort(key=lambda host: host.host, reverse=False)
209+
if len(eligible_hosts) == 0:
210+
message = "HostSelector.NoHostsMatchingRole"
211+
logger.error(message, role)
212+
raise AwsWrapperError(Messages.get_formatted("HostSelector.NoHostsMatchingRole", role))
213+
214+
self._update_host_weight_map_from_string(props)
215+
216+
default_weight: int = WeightedRandomHostSelector._DEFAULT_WEIGHT
217+
if props is not None:
218+
default_weight = WrapperProperties.WEIGHTED_RANDOM_DEFAULT_WEIGHT.get_int(props)
219+
if default_weight < WeightedRandomHostSelector._DEFAULT_WEIGHT:
220+
logger.error("WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight")
221+
raise AwsWrapperError(Messages.get("WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight"))
222+
223+
selection_list: List[HostInfo] = []
224+
for host in eligible_hosts:
225+
if host.host in self._host_weight_map:
226+
selection_list = selection_list + self._host_weight_map[host.host] * [host]
227+
else:
228+
selection_list = selection_list + default_weight * [host]
229+
230+
return random.choice(selection_list)
231+
232+
def _update_host_weight_map_from_string(self, props: Optional[Properties] = None) -> None:
233+
if props is not None:
234+
host_weights: Optional[str] = WrapperProperties.WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS.get(props)
235+
if host_weights is not None and len(host_weights) != 0:
236+
host_weight_pairs: List[str] = host_weights.split(",")
237+
238+
for pair in host_weight_pairs:
239+
match = search(WeightedRandomHostSelector._HOST_WEIGHT_PAIRS_PATTERN, pair)
240+
message = "WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs"
241+
if match:
242+
host_name = match.group("host")
243+
host_weight = match.group("weight")
244+
else:
245+
logger.error(message, pair)
246+
raise AwsWrapperError(Messages.get_formatted(message), pair)
247+
248+
if len(host_name) == 0 or len(host_weight) == 0:
249+
logger.error(message, pair)
250+
raise AwsWrapperError(Messages.get_formatted(message), pair)
251+
try:
252+
weight: int = int(host_weight)
253+
254+
if weight < WeightedRandomHostSelector._DEFAULT_WEIGHT:
255+
logger.error(message, pair)
256+
raise AwsWrapperError(Messages.get_formatted(message), pair)
257+
258+
self._host_weight_map[host_name] = weight
259+
except ValueError:
260+
logger.error(message, pair)
261+
raise AwsWrapperError(Messages.get_formatted(message), pair)

aws_advanced_python_wrapper/resources/aws_advanced_python_wrapper_messages.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,10 @@ ReadWriteSplittingPlugin.UnsupportedHostInfoSelectorStrategy=[ReadWriteSplitting
278278

279279
RoundRobinHostSelector.ClusterInfoNone=[RoundRobinHostSelector] The round robin cluster information cache should have an entry for the current cluster, but no entry was found.
280280
RoundRobinHostSelector.RoundRobinInvalidDefaultWeight=[RoundRobinHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
281-
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1.
281+
RoundRobinHostSelector.RoundRobinInvalidHostWeightPairs= [RoundRobinHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
282+
283+
WeightedRandomHostSelector.WeightedRandomInvalidHostWeightPairs= [WeightedRandomHostSelector] The provided host weight pairs have not been configured correctly. Please ensure the provided host weight pairs is a comma separated list of pairs, each pair in the format of <host>:<weight>. Weight values must be an integer greater than or equal to the default weight value of 1. Weight pair: '{}'
284+
WeightedRandomHostSelector.WeightedRandomInvalidDefaultWeight=[WeightedRandomHostSelector] The provided default weight value is not valid. Weight values must be an integer greater than or equal to 1.
282285

283286
SlidingExpirationCache.CleaningUp=[SlidingExpirationCache] Cleaning up...
284287

aws_advanced_python_wrapper/sql_alchemy_connection_provider.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
from aws_advanced_python_wrapper.connection_provider import ConnectionProvider
2727
from aws_advanced_python_wrapper.errors import AwsWrapperError
28-
from aws_advanced_python_wrapper.host_selector import (HostSelector,
29-
RandomHostSelector,
30-
RoundRobinHostSelector)
28+
from aws_advanced_python_wrapper.host_selector import (
29+
HostSelector, RandomHostSelector, RoundRobinHostSelector,
30+
WeightedRandomHostSelector)
3131
from aws_advanced_python_wrapper.plugin import CanReleaseResources
3232
from aws_advanced_python_wrapper.utils.messages import Messages
3333
from aws_advanced_python_wrapper.utils.properties import (Properties,
@@ -46,7 +46,9 @@ class SqlAlchemyPooledConnectionProvider(ConnectionProvider, CanReleaseResources
4646
"""
4747
_POOL_EXPIRATION_CHECK_NS: ClassVar[int] = 30 * 60_000_000_000 # 30 minutes
4848
_LEAST_CONNECTIONS: ClassVar[str] = "least_connections"
49-
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(), "round_robin": RoundRobinHostSelector()}
49+
_accepted_strategies: Dict[str, HostSelector] = {"random": RandomHostSelector(),
50+
"round_robin": RoundRobinHostSelector(),
51+
"weighted_random": WeightedRandomHostSelector()}
5052
_rds_utils: ClassVar[RdsUtils] = RdsUtils()
5153
_database_pools: ClassVar[SlidingExpirationCache[PoolKey, QueuePool]] = SlidingExpirationCache(
5254
should_dispose_func=lambda queue_pool: queue_pool.checkedout() == 0,

aws_advanced_python_wrapper/utils/properties.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,15 @@ class WrapperProperties:
271271
ROUND_ROBIN_HOST_WEIGHT_PAIRS = WrapperProperty("round_robin_host_weight_pairs",
272272
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
273273
"")
274+
275+
WEIGHTED_RANDOM_DEFAULT_WEIGHT = WrapperProperty("weighted_random_default_weight", "The default weight for any hosts that have not been " +
276+
"configured with the `weighted_random_host_weight_pairs` parameter.",
277+
1)
278+
279+
WEIGHTED_RANDOM_HOST_WEIGHT_PAIRS = WrapperProperty("weighted_random_host_weight_pairs",
280+
"Comma separated list of database host-weight pairs in the format of `<host>:<weight>`.",
281+
"")
282+
274283
# Federated Auth Plugin
275284
IDP_ENDPOINT = WrapperProperty("idp_endpoint",
276285
"The hosting URL of the Identity Provider",

docs/using-the-python-driver/ReaderSelectionStrategies.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ To balance connections to reader instances more evenly, different selection stra
44
| Reader Selection Strategy | Configuration Parameter | Description | Default Value |
55
|---------------------------|-------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
66
| `random` | This strategy does not have configuration parameters. | The random strategy is the default selection strategy. When switching to a reader connection, the reader instance will be chosen randomly from the available database instances. | N/A |
7+
| `weighted_random` | See the following rows for configuration parameters. | The weighted random strategy will be chosen randomly from the available database instances. A slight addition to the random strategy is the weighted random strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
8+
| | `weighted_random_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents the likeliness of the connection to be directed to the host. Larger the number, the more likely the connection is to be directed to the host. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |
9+
| | `weighted_random_default_weight` | This parameter value must be an integer value in the form of a `string`. This parameter represents the default weight for any hosts that have not been configured with the `weighted_random_host_weight_pairs` parameter. For example, if a connection were already established and host weights were set with `weighted_random_host_weight_pairs` but a new reader host was added to the database, the new reader host would use the default weight. <br><br> **Note:** This value must be an integer greater than or equal to 1. | `1` |
710
| `least_connections` | This strategy does not have configuration parameters. | The least connections strategy will select reader instances based on which database instance has the least number of currently active connections. Note that this strategy is only available when internal connection pools are enabled - if you set the connection property without enabling internal pools, an exception will be thrown. | N/A |
811
| `round_robin` | See the following rows for configuration parameters. | The round robin strategy will select a reader instance by taking turns with all available database instances in a cycle. A slight addition to the round robin strategy is the weighted round robin strategy, where more connections will be passed to reader instances based on user specified connection properties. | N/A |
912
| | `round_robin_host_weight_pairs` | This parameter value must be a `string` type comma separated list of database host-weight pairs in the format `<host>:<weight>`. The host represents the database instance name, and the weight represents how many connections should be directed to the host in one cycle through all available hosts. For example, the value `instance-1:1,instance-2:4` means that for every connection to `instance-1`, there will be four connections to `instance-2`. <br><br> **Note:** The `<weight>` value in the string must be an integer greater than or equal to 1. | `null` |

docs/using-the-python-driver/using-plugins/UsingTheFastestResponseStrategyPlugin.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ The host response time is measured at an interval set by `response_measurement_i
55

66
## Using the Fastest Response Strategy Plugin
77

8-
The plugin can be loaded by adding the plugin code `fastest_response_strategy` to the [`plugins`](../UsingThePythonDriver.md#aws-advanced-python-driver-parameters) parameter. The Fastest Response Strategy Plugin is not loaded by default, and must be loaded along with the [`read_write_splitting`](https://github.com/awslabs/aws-advanced-python-wrapper/blob/main/docs/using-the-python-driver/using-plugins/UsingTheReadWriteSplittingPlugin.md) plugin.
8+
The plugin can be loaded by adding the plugin code `fastest_response_strategy` to the [`plugins`](../UsingThePythonDriver.md#aws-advanced-python-driver-parameters) parameter. The Fastest Response Strategy Plugin is not loaded by default, and must be loaded along with the [`read_write_splitting`](./UsingTheReadWriteSplittingPlugin.md) plugin.
99

1010
> [!IMPORTANT]\
1111
> **`reader_response_strategy` must be set to `fastest_reponse` when using this plugin. Otherwise an error will be thrown:**

0 commit comments

Comments
 (0)