Skip to content

Commit 1538a97

Browse files
authored
Add forwarder plugin for federated Alerta (alerta#1161)
* Add alerts and action forwarder plugin for federated setups * Add tests for forwarder plugin
1 parent 33723c0 commit 1538a97

18 files changed

+745
-34
lines changed

.isort.cfg

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
[settings]
2-
known_third_party = blinker,bson,click,flask,flask_compress,flask_cors,itsdangerous,jwt,ldap,mohawk,pkg_resources,psycopg2,pymongo,pyparsing,pytz,requests,requests_mock,saml2,sentry_sdk,setuptools,werkzeug,yaml
2+
known_third_party = blinker,bson,click,flask,flask_compress,flask_cors,itsdangerous,jwt,ldap,mohawk,pkg_resources,psycopg2,pymongo,pyparsing,pytz,requests,requests_hawk,requests_mock,saml2,sentry_sdk,setuptools,werkzeug,yaml

.pre-commit-config.yaml

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ repos:
1717
- id: debug-statements
1818
- id: double-quote-string-fixer
1919
- id: end-of-file-fixer
20-
- id: flake8
2120
- id: fix-encoding-pragma
2221
args: ['--remove']
2322
- id: pretty-format-json
@@ -26,6 +25,10 @@ repos:
2625
args: ['--django']
2726
- id: requirements-txt-fixer
2827
- id: trailing-whitespace
28+
- repo: https://gitlab.com/pycqa/flake8
29+
rev: 3.7.7
30+
hooks:
31+
- id: flake8
2932
- repo: https://github.com/asottile/pyupgrade
3033
rev: v1.27.0
3134
hooks:

alerta/auth/hmac.py

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,13 @@
33

44

55
def get_credentials(key_id: str):
6-
credentials_map = {creds['id']: creds for creds in current_app.config['HMAC_AUTH_CREDENTIALS']}
6+
credentials_map = {
7+
creds['key']: dict(
8+
id=creds['key'], # access_key
9+
key=creds['secret'], # secret_key
10+
algorithm=creds.get('algorithm', 'sha256')
11+
) for creds in current_app.config['HMAC_AUTH_CREDENTIALS']}
12+
713
if key_id in credentials_map:
814
return credentials_map[key_id]
915
else:

alerta/exceptions.py

+5
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ class BlackoutPeriod(AlertaException):
3333
pass
3434

3535

36+
class ForwardingLoop(AlertaException):
37+
"""Forwarding loop detected."""
38+
pass
39+
40+
3641
class InvalidAction(AlertaException):
3742
"""Invalid or redundant action for the current alert status."""
3843
pass

alerta/plugins/forwarder.py

+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import logging
2+
from typing import TYPE_CHECKING, Any, Optional
3+
4+
from flask import request
5+
6+
from alerta.exceptions import ForwardingLoop
7+
from alerta.plugins import PluginBase
8+
from alerta.utils.client import Client
9+
from alerta.utils.response import base_url
10+
11+
if TYPE_CHECKING:
12+
from alerta.models.alert import Alert # noqa
13+
14+
15+
LOG = logging.getLogger('alerta.plugins.forwarder')
16+
17+
X_LOOP_HEADER = 'X-Alerta-Loop'
18+
19+
20+
def append_to_header(origin):
21+
x_loop = request.headers.get(X_LOOP_HEADER)
22+
return origin if not x_loop else '{},{}'.format(x_loop, origin)
23+
24+
25+
def is_in_xloop(server):
26+
x_loop = request.headers.get(X_LOOP_HEADER)
27+
return server in x_loop if server and x_loop else False
28+
29+
30+
class Forwarder(PluginBase):
31+
"""
32+
Alert and action forwarder for federated Alerta deployments
33+
See https://docs.alerta.io/en/latest/federated.html
34+
"""
35+
36+
def pre_receive(self, alert: 'Alert', **kwargs) -> 'Alert':
37+
38+
if is_in_xloop(base_url()):
39+
http_origin = request.origin or '(unknown)'
40+
raise ForwardingLoop('Alert forwarded by {} already processed by {}'.format(http_origin, base_url()))
41+
return alert
42+
43+
def post_receive(self, alert: 'Alert', **kwargs) -> Optional['Alert']:
44+
45+
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
46+
if is_in_xloop(remote):
47+
LOG.debug('Forward [action=alerts]: {} ; Remote {} already processed alert. Skip.'.format(alert.id, remote))
48+
continue
49+
if not ('*' in actions or 'alerts' in actions):
50+
LOG.debug('Forward [action=alerts]: {} ; Remote {} not configured for alerts. Skip.'.format(alert.id, remote))
51+
continue
52+
53+
headers = {X_LOOP_HEADER: append_to_header(base_url())}
54+
client = Client(endpoint=remote, headers=headers, **auth)
55+
56+
LOG.info('Forward [action=alerts]: {} ; {} -> {}'.format(alert.id, base_url(), remote))
57+
try:
58+
r = client.send_alert(**alert.get_body())
59+
except Exception as e:
60+
LOG.warning('Forward [action=alerts]: {} ; Failed to forward alert to {} - {}'.format(alert.id, remote, str(e)))
61+
continue
62+
LOG.debug('Forward [action=alerts]: {} ; [{}] {}'.format(alert.id, r.status_code, r.text))
63+
64+
return alert
65+
66+
def status_change(self, alert: 'Alert', status: str, text: str, **kwargs) -> Any:
67+
return
68+
69+
def take_action(self, alert: 'Alert', action: str, text: str, **kwargs) -> Any:
70+
71+
if is_in_xloop(base_url()):
72+
http_origin = request.origin or '(unknown)'
73+
raise ForwardingLoop('Action {} forwarded by {} already processed by {}'.format(
74+
action, http_origin, base_url())
75+
)
76+
77+
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
78+
if is_in_xloop(remote):
79+
LOG.debug('Forward [action={}]: {} ; Remote {} already processed action. Skip.'.format(action, alert.id, remote))
80+
continue
81+
if not ('*' in actions or 'actions' in actions or action in actions):
82+
LOG.debug('Forward [action={}]: {} ; Remote {} not configured for action. Skip.'.format(action, alert.id, remote))
83+
continue
84+
85+
headers = {X_LOOP_HEADER: append_to_header(base_url())}
86+
client = Client(endpoint=remote, headers=headers, **auth)
87+
88+
LOG.info('Forward [action={}]: {} ; {} -> {}'.format(action, alert.id, base_url(), remote))
89+
try:
90+
r = client.action(alert.id, action, text)
91+
except Exception as e:
92+
LOG.warning('Forward [action={}]: {} ; Failed to action alert on {} - {}'.format(action, alert.id, remote, str(e)))
93+
continue
94+
LOG.debug('Forward [action={}]: {} ; [{}] {}'.format(action, alert.id, r.status_code, r.text))
95+
96+
return alert
97+
98+
def delete(self, alert: 'Alert', **kwargs) -> bool:
99+
100+
if is_in_xloop(base_url()):
101+
http_origin = request.origin or '(unknown)'
102+
raise ForwardingLoop('Delete forwarded by {} already processed by {}'.format(http_origin, base_url()))
103+
104+
for remote, auth, actions in self.get_config('FWD_DESTINATIONS', default=[], type=list, **kwargs):
105+
print('{} actions={}'.format(remote, actions))
106+
if is_in_xloop(remote):
107+
LOG.debug('Forward [action=delete]: {} ; Remote {} already processed delete. Skip.'.format(alert.id, remote))
108+
continue
109+
if not ('*' in actions or 'delete' in actions):
110+
LOG.debug('Forward [action=delete]: {} ; Remote {} not configured for deletes. Skip.'.format(alert.id, remote))
111+
continue
112+
113+
headers = {X_LOOP_HEADER: append_to_header(base_url())}
114+
client = Client(endpoint=remote, headers=headers, **auth)
115+
116+
LOG.info('Forward [action=delete]: {} ; {} -> {}'.format(alert.id, base_url(), remote))
117+
try:
118+
r = client.delete_alert(alert.id)
119+
except Exception as e:
120+
LOG.warning('Forward [action=delete]: {} ; Failed to delete alert on {} - {}'.format(alert.id, remote, str(e)))
121+
continue
122+
LOG.debug('Forward [action=delete]: {} ; [{}] {}'.format(alert.id, r.status_code, r.text))
123+
124+
return True # always continue with local delete even if remote delete(s) fail

alerta/settings.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#
77
# Further information on settings can be found at https://docs.alerta.io
88

9-
from typing import Any, Dict, List # noqa
9+
from typing import Any, Dict, List, Tuple # noqa
1010

1111
DEBUG = False
1212

@@ -71,8 +71,8 @@
7171

7272
HMAC_AUTH_CREDENTIALS = [
7373
# {
74-
# 'id': 'AKID001', # access key id
75-
# 'key': 'supersecret', # secret key
74+
# 'id': '', # access key id => $ uuidgen | tr '[:upper:]' '[:lower:]'
75+
# 'key': '', # secret key => $ date | md5 | base64
7676
# 'algorithm': 'sha256' # valid hmac algorithm eg. sha256, sha384, sha512
7777
# }
7878
] # type: List[Dict[str, Any]]
@@ -206,7 +206,7 @@
206206
AUTO_REFRESH_INTERVAL = 5000 # ms
207207

208208
# Plugins
209-
PLUGINS = ['remote_ip', 'reject', 'heartbeat', 'blackout']
209+
PLUGINS = ['remote_ip', 'reject', 'heartbeat', 'blackout', 'forwarder']
210210
PLUGINS_RAISE_ON_ERROR = True # raise RuntimeError exception on first failure
211211

212212
# reject plugin settings
@@ -219,3 +219,14 @@
219219
NOTIFICATION_BLACKOUT = False # True - set alert status=blackout, False - do not process alert (default)
220220
BLACKOUT_ACCEPT = [] # type: List[str]
221221
# BLACKOUT_ACCEPT = ['normal', 'ok', 'cleared'] # list of severities accepted during blackout period
222+
223+
# northbound interface
224+
FWD_DESTINATIONS = [
225+
# ('http://localhost:9000', {'username': 'user', 'password': 'pa55w0rd', 'timeout': 10}, ['alerts', 'actions']), # BasicAuth
226+
# ('https://httpbin.org/anything', dict(username='foo', password='bar', ssl_verify=False), ['alerts', 'actions']),
227+
# ('http://localhost:9000', {'key': 'access-key', 'secret': 'secret-key'}, ['alerts', 'actions']), # Hawk HMAC
228+
# ('http://localhost:9000', {'key': 'my-api-key'}, ['alerts', 'actions']), # API key
229+
# ('http://localhost:9000', {'token': 'bearer-token'}, ['alerts', 'actions']), # Bearer token
230+
] # type: List[Tuple]
231+
232+
# valid actions=['*', 'alerts', 'actions', 'open', 'assign', 'ack', 'unack', 'shelve', 'unshelve', 'close', 'delete']

alerta/utils/api.py

+12-14
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44
from flask import current_app, g
55

66
from alerta.app import plugins
7-
from alerta.exceptions import (ApiError, BlackoutPeriod, HeartbeatReceived,
8-
RateLimit, RejectException)
7+
from alerta.exceptions import (ApiError, BlackoutPeriod, ForwardingLoop,
8+
HeartbeatReceived, RateLimit, RejectException)
99
from alerta.models.alert import Alert
1010
from alerta.models.enums import Scope
1111

@@ -40,7 +40,7 @@ def process_alert(alert: Alert) -> Alert:
4040
alert = plugin.pre_receive(alert, config=wanted_config)
4141
except TypeError:
4242
alert = plugin.pre_receive(alert) # for backward compatibility
43-
except (RejectException, HeartbeatReceived, BlackoutPeriod, RateLimit):
43+
except (RejectException, HeartbeatReceived, BlackoutPeriod, RateLimit, ForwardingLoop):
4444
raise
4545
except Exception as e:
4646
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
@@ -97,23 +97,21 @@ def process_action(alert: Alert, action: str, text: str, timeout: int) -> Tuple[
9797
updated = plugin.take_action(alert, action, text, timeout=timeout, config=wanted_config)
9898
except NotImplementedError:
9999
pass # plugin does not support action() method
100-
except RejectException:
100+
except (RejectException, ForwardingLoop):
101101
raise
102102
except Exception as e:
103103
if current_app.config['PLUGINS_RAISE_ON_ERROR']:
104104
raise ApiError("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
105105
else:
106106
logging.error("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
107-
if updated:
108-
try:
109-
if len(updated) == 3:
110-
alert, action, text = updated
111-
elif len(updated) == 4:
112-
alert, action, text, timeout = updated
113-
else:
114-
alert = updated
115-
except Exception as e:
116-
logging.error("Error while running action plugin '{}': {}".format(plugin.name, str(e)))
107+
108+
if isinstance(updated, Alert):
109+
updated = updated, action, text, timeout
110+
if isinstance(updated, tuple):
111+
if len(updated) == 4:
112+
alert, action, text, timeout = updated
113+
elif len(updated) == 3:
114+
alert, action, text = updated
117115

118116
# remove keys from attributes with None values
119117
new_attrs = {k: v for k, v in alert.attributes.items() if v is not None}

0 commit comments

Comments
 (0)