Skip to content

Commit c28a062

Browse files
authored
Implement Monitor and MonitoringThreadContainer (#58)
1 parent aac9021 commit c28a062

18 files changed

+1702
-120
lines changed

aws_wrapper/failover_plugin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
from aws_wrapper.utils.properties import Properties, WrapperProperties
4242
from aws_wrapper.utils.rds_url_type import RdsUrlType
4343
from aws_wrapper.utils.rdsutils import RdsUtils
44-
from aws_wrapper.utils.subscribed_method_utils import SubscribedMethodUtils
44+
from aws_wrapper.utils.utils import SubscribedMethodUtils
4545
from aws_wrapper.writer_failover_handler import (WriterFailoverHandler,
4646
WriterFailoverHandlerImpl)
4747

aws_wrapper/host_monitoring_plugin.py

Lines changed: 309 additions & 48 deletions
Large diffs are not rendered by default.

aws_wrapper/plugin_service.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def force_refresh_host_list(self, connection: Optional[Connection] = None):
145145
def connect(self, host_info: HostInfo, props: Properties) -> Connection:
146146
...
147147

148-
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Event) -> Connection:
148+
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Optional[Event]) -> Connection:
149149
...
150150

151151
def set_availability(self, host_aliases: FrozenSet[str], availability: HostAvailability):
@@ -264,7 +264,7 @@ def connect(self, host_info: HostInfo, props: Properties) -> Connection:
264264
plugin_manager: PluginManager = self._container.plugin_manager
265265
return plugin_manager.connect(host_info, props, self.current_connection is None)
266266

267-
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Event) -> Connection:
267+
def force_connect(self, host_info: HostInfo, props: Properties, timeout_event: Optional[Event]) -> Connection:
268268
plugin_manager: PluginManager = self._container.plugin_manager
269269
return plugin_manager.force_connect(host_info, props, self.current_connection is None)
270270

aws_wrapper/resources/messages.properties

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ HostMonitoringPlugin.ActivatedMonitoring=[HostMonitoringPlugin] Executing method
4848
HostMonitoringPlugin.ClusterEndpointHostInfo=[HostMonitoringPlugin] The HostInfo to monitor is associated with a cluster endpoint. The plugin will attempt to identify the connected database instance.
4949
HostMonitoringPlugin.MonitoringDeactivated=[HostMonitoringPlugin] Monitoring deactivated for method '{}'.
5050
HostMonitoringPlugin.NullConnection=[HostMonitoringPlugin] Attempted to execute method '{}' but the current connection is None.
51-
HostMonitoringPlugin.NullHostInfo=[HostMonitoringPlugin] Attempted to execute method '{}' but the current host info is None.
51+
HostMonitoringPlugin.NullHostInfo=[HostMonitoringPlugin] Could not find HostInfo to monitor for the current connection.
52+
HostMonitoringPlugin.NullHostInfoForMethod=[HostMonitoringPlugin] Attempted to execute method '{}' but the current host info is None.
5253
HostMonitoringPlugin.UnavailableHost=[HostMonitoringPlugin] Host '{}' is unavailable.
5354
HostMonitoringPlugin.ErrorIdentifyingConnection=[HostMonitoringPlugin] An error occurred while identifying the connection database instance: '{}'.
5455
HostMonitoringPlugin.UnableToIdentifyConnection=[HostMonitoringPlugin] Unable to identify the connected database instance: '{}', please ensure the correct host list provider is specified. The host list provider in use is: '{}'.
@@ -57,6 +58,11 @@ HostSelector.NoEligibleHost=[HostSelector] No Eligible Hosts Found.
5758

5859
IamPlugin.IsNullOrEmpty=[IamPlugin] Property "{}" is null or empty.
5960

61+
Monitor.NullContext=[Monitor] Parameter 'context' should not evaluate to None.
62+
Monitor.NullDialect=[Monitor] The host monitoring plugin tried to check the connection status but was unable to identify the target driver dialect.
63+
Monitor.OpeningMonitorConnection=[Monitor] Opening a monitoring connection to '{}'.
64+
Monitor.OpenedMonitorConnection=[Monitor] Opened a monitoring connection to '{}'.
65+
6066
MonitorContext.ExceptionAbortingConnection=[MonitorContext] An exception occurred while attempting to abort the monitored connection: '{}'.
6167
MonitorContext.HostAvailable=[MonitorContext] Host '{}' is *available*.
6268
MonitorContext.HostUnavailable=[MonitorContext] Host '{}' is *unavailable*.
@@ -66,6 +72,10 @@ MonitorService.EmptyAliasSet=[MonitorService] Empty alias set passed for '{}'. T
6672
MonitorService.ErrorPopulatingAliases=[MonitorService] An error occurred while populating aliases: '{}'.
6773
MonitorService.NullDialect=[MonitorService] The host monitoring plugin tried to monitor the connection but was unable to identify the target driver dialect.
6874

75+
MonitoringThreadContainer.EmptyNodeKeys=[MonitorThreadContainer] The provided host_aliases set was empty.
76+
MonitoringThreadContainer.ErrorGettingMonitor=[MonitorThreadContainer] Unable to find or create monitor for host with aliases '{}'.
77+
MonitoringThreadContainer.NullMonitorReturnedFromSupplier=[MonitorThreadContainer] The monitor supplier passed into get_or_create_monitor returned None.
78+
6979
Plugin.UnsupportedMethod=[Plugin] '{}' is not supported by this plugin.
7080

7181
PluginManager.InvalidPlugin=[PluginManager] Invalid plugin requested: '{}'.

aws_wrapper/utils/atomic.py

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from threading import Lock
16+
17+
18+
class AtomicInt:
19+
def __init__(self, initial_value: int = 0):
20+
self._value = initial_value
21+
self._lock: Lock = Lock()
22+
23+
def get(self):
24+
with self._lock:
25+
return self._value
26+
27+
def set(self, value: int):
28+
with self._lock:
29+
self._value = value
30+
31+
def get_and_increment(self):
32+
with self._lock:
33+
value = self._value
34+
self._value += 1
35+
return value
36+
37+
def increment_and_get(self):
38+
with self._lock:
39+
self._value += 1
40+
return self._value
41+
42+
def get_and_decrement(self):
43+
with self._lock:
44+
value = self._value
45+
self._value -= 1
46+
return value
47+
48+
def decrement_and_get(self):
49+
with self._lock:
50+
self._value -= 1
51+
return self._value

aws_wrapper/utils/concurrent.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from threading import Lock
16+
from typing import Callable, Generic, List, Optional, TypeVar
17+
18+
K = TypeVar('K')
19+
V = TypeVar('V')
20+
21+
22+
class ConcurrentDict(Generic[K, V]):
23+
def __init__(self):
24+
self._dict = dict()
25+
self._lock = Lock()
26+
27+
def __len__(self):
28+
return len(self._dict)
29+
30+
def get(self, key: K, default_value: Optional[V] = None) -> Optional[V]:
31+
return self._dict.get(key, default_value)
32+
33+
def clear(self):
34+
self._dict.clear()
35+
36+
def compute_if_present(self, key: K, remapping_func: Callable) -> Optional[V]:
37+
with self._lock:
38+
existing_value = self._dict.get(key)
39+
if existing_value is None:
40+
return None
41+
new_value = remapping_func(key, existing_value)
42+
if new_value is not None:
43+
self._dict[key] = new_value
44+
return new_value
45+
else:
46+
self._dict.pop(key, None)
47+
return None
48+
49+
def compute_if_absent(self, key: K, mapping_func: Callable) -> Optional[V]:
50+
with self._lock:
51+
value = self._dict.get(key)
52+
if value is None:
53+
new_value = mapping_func(key)
54+
if new_value is not None:
55+
self._dict[key] = new_value
56+
return new_value
57+
return value
58+
59+
def put_if_absent(self, key: K, new_value: V) -> V:
60+
with self._lock:
61+
existing_value = self._dict.get(key)
62+
if existing_value is None:
63+
self._dict[key] = new_value
64+
return new_value
65+
return existing_value
66+
67+
def remove_if(self, predicate: Callable) -> bool:
68+
with self._lock:
69+
original_len = len(self._dict)
70+
self._dict = {key: value for key, value in self._dict.items() if not predicate(key, value)}
71+
return len(self._dict) < original_len
72+
73+
def remove_matching_values(self, removal_values: List[V]) -> bool:
74+
with self._lock:
75+
original_len = len(self._dict)
76+
self._dict = {key: value for key, value in self._dict.items() if value not in removal_values}
77+
return len(self._dict) < original_len
78+
79+
def apply_if(self, predicate: Callable, apply: Callable):
80+
with self._lock:
81+
for key, value in self._dict.items():
82+
if predicate(key, value):
83+
apply(key, value)

aws_wrapper/utils/properties.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020

2121
class Properties(Dict[str, str]):
22-
...
22+
pass
2323

2424

2525
class WrapperProperty:

aws_wrapper/utils/utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15+
from queue import Empty, Queue
1516
from typing import Optional
1617

1718

@@ -35,3 +36,30 @@ def log_topology(hosts: list, message_prefix: Optional[str] = None):
3536
msg = "\n\t".join(["<null>" if not host else str(host) for host in hosts])
3637
prefix = "" if not message_prefix else message_prefix + " "
3738
return prefix + f"Topology: {{\n\t{msg}}}"
39+
40+
41+
class SubscribedMethodUtils:
42+
# TODO: check for missing network methods
43+
NETWORK_BOUND_METHODS = {
44+
"Connection.commit",
45+
"Connection.rollback",
46+
"Cursor.callproc",
47+
"Cursor.execute",
48+
"Cursor.executemany"
49+
}
50+
51+
52+
class QueueUtils:
53+
@staticmethod
54+
def get(q: Queue):
55+
try:
56+
return q.get_nowait()
57+
except Empty:
58+
return None
59+
60+
@staticmethod
61+
def clear(q: Queue):
62+
with q.mutex:
63+
q.queue.clear()
64+
q.all_tasks_done.notify_all()
65+
q.unfinished_tasks = 0

unit_testing/test_atomic_int.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from concurrent.futures import ThreadPoolExecutor
16+
17+
from aws_wrapper.utils.atomic import AtomicInt
18+
19+
20+
def test_set_and_get():
21+
n = AtomicInt(1)
22+
assert 1 == n.get()
23+
n.set(3)
24+
assert 3 == n.get()
25+
26+
27+
def test_get_and_increment():
28+
n = AtomicInt()
29+
assert 0 == n.get_and_increment()
30+
assert 1 == n.get_and_increment()
31+
n.set(5)
32+
assert 5 == n.get_and_increment()
33+
34+
35+
def test_get_and_increment__multithreaded():
36+
n = AtomicInt()
37+
num_threads = 50
38+
39+
def get_and_increment_thread(atomic_num: AtomicInt):
40+
atomic_num.get_and_increment()
41+
42+
with ThreadPoolExecutor(num_threads) as executor:
43+
for _ in range(num_threads):
44+
executor.submit(get_and_increment_thread, n)
45+
46+
assert num_threads == n.get()
47+
48+
49+
def test_increment_and_get():
50+
n = AtomicInt()
51+
assert 1 == n.increment_and_get()
52+
assert 2 == n.increment_and_get()
53+
assert 2 == n.get()
54+
n.set(5)
55+
assert 6 == n.increment_and_get()
56+
57+
58+
def test_increment_and_get__multithreaded():
59+
n = AtomicInt()
60+
num_threads = 50
61+
62+
def increment_and_get_thread(atomic_num: AtomicInt):
63+
atomic_num.increment_and_get()
64+
65+
with ThreadPoolExecutor(num_threads) as executor:
66+
for _ in range(num_threads):
67+
executor.submit(increment_and_get_thread, n)
68+
69+
assert num_threads == n.get()
70+
71+
72+
def test_get_and_decrement():
73+
n = AtomicInt()
74+
assert 0 == n.get_and_decrement()
75+
assert -1 == n.get_and_decrement()
76+
n.set(5)
77+
assert 5 == n.get_and_decrement()
78+
79+
80+
def test_get_and_decrement__multithreaded():
81+
num_threads = 50
82+
n = AtomicInt(num_threads)
83+
84+
def get_and_decrement_thread(atomic_num: AtomicInt):
85+
atomic_num.get_and_decrement()
86+
87+
with ThreadPoolExecutor(num_threads) as executor:
88+
for _ in range(num_threads):
89+
executor.submit(get_and_decrement_thread, n)
90+
91+
assert 0 == n.get()
92+
93+
94+
def test_decrement_and_get():
95+
n = AtomicInt()
96+
assert -1 == n.decrement_and_get()
97+
assert -2 == n.decrement_and_get()
98+
assert -2 == n.get()
99+
n.set(5)
100+
assert 4 == n.decrement_and_get()
101+
102+
103+
def test_decrement_and_get__multithreaded():
104+
num_threads = 50
105+
n = AtomicInt(num_threads)
106+
107+
def decrement_and_get_thread(atomic_num: AtomicInt):
108+
atomic_num.decrement_and_get()
109+
110+
with ThreadPoolExecutor(num_threads) as executor:
111+
for _ in range(num_threads):
112+
executor.submit(decrement_and_get_thread, n)
113+
114+
assert 0 == n.get()

0 commit comments

Comments
 (0)