Skip to content

Add network tunnel monitor #80

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Cisco Secure Access/.DS_Store
Cisco Secure Access/Samples/.DS_Store
Cisco Secure Access/Samples/client-samples/.DS_Store
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Monitor Cisco Secure Access Network Tunnel State

## Use Cases

The Monitor Secure Access Network Tunnel State script checks the state of your organization's Network Tunnels (TunnelState), and logs the tunnel state information received from the Umbrella Network Tunnels API. If configured to generate email alerts, the script sends an email message about the state of a tunnel or the status of a Secure Access Network Tunnel API response.

The script alerts on these conditions:

* Secure Access Network Tunnel state that is `disconnected`.
* Secure Access Tunnel state that is `warning`.

## Prerequisites

* Python 3.x.x
* Cisco Secure Access
* Create an App password for the desired email to send the notification from, save the App Password in the PASSWD environment variable
* Modify the recipients variable in the tunnel_monitor_sse.py and enter the desired email address to send the notifications from


## Before You Begin

* Create a Secure Access Management API key.
* Install Python libraries. For more information, see `Requirements.txt`.

```shell
pip install -r requirements.txt
```
* Create environment variables:
* export API_KEY=VALUE
* export API_SECRET=VALUE
* export PASSWD=VALUE
* export EMAIL=VALUE


## Usage

* Run the sample script:

```shell
python3 tunnel_monitor_sse.py
```



Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#!/usr/bin/env python3

"""
Copyright (c) 2022 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.1 (the "License"). You may obtain a copy of the
License at "https://developer.cisco.com/docs/licenses"
All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS IS"
BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.

----------------------------------------------------------------------
"""

import requests
from oauthlib.oauth2 import BackendApplicationClient
from oauthlib.oauth2 import TokenExpiredError
from requests_oauthlib import OAuth2Session
from requests.auth import HTTPBasicAuth



class SSEAPI:
def __init__(self, url, ident, secret):
self.url = url
self.ident = ident
self.secret = secret
self.token = None

def GetToken(self):
auth = HTTPBasicAuth(self.ident, self.secret)
client = BackendApplicationClient(client_id=self.ident)
oauth = OAuth2Session(client=client)
self.token = oauth.fetch_token(token_url=self.url, auth=auth)
return self.token

''' GET API request to a SSE endpoint '''
def ReqGet(self, end_point):
success = False
resp = None
if self.token == None:
self.GetToken()
while not success:
try:

bearer_token = "Bearer " + self.token['access_token']
api_headers = {
"Authorization": bearer_token,
"Content-Type": "application/json"
}
resp = requests.get('https://api.sse.cisco.com/{}'.format(end_point), headers=api_headers)
resp.raise_for_status()

success = True
except TokenExpiredError:
token = self.GetToken()
except Exception as e:
raise(e)
return resp

''' POST API request to an SSE endpoint '''
def ReqPost(self, end_point, data):
success = False
resp = None
if self.token == None:
self.GetToken()
while not success:
try:
bearer_token = "Bearer " + self.token['access_token']
api_headers = { 'Authorization': bearer_token }
resp = requests.post('https://api.sse.cisco.com/{}'.format(end_point), json=data, headers=api_headers)
resp.raise_for_status()
success = True
except TokenExpiredError:
token = self.GetToken()
except Exception as e:
raise(e)
return resp

''' DELETE API request to an SSE endpoint '''
def ReqDelete(self, end_point, data):
success = False
resp = None
if self.token == None:
self.GetToken()
while not success:
try:
bearer_token = "Bearer " + self.token['access_token']
api_headers = { 'Authorization': bearer_token }
resp = requests.delete('https://api.sse.cisco.com/{}'.format(end_point), json=data, headers=api_headers)
resp.raise_for_status()
success = True
except TokenExpiredError:
token = self.GetToken()
except Exception as e:
raise(e)
return resp
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
certifi==2024.8.30
charset-normalizer==3.3.2
idna==3.10
oauthlib==3.2.2
requests==2.32.3
requests-oauthlib==2.0.0
termcolor==2.4.0
urllib3==2.2.3
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""
Copyright (c) 2023 Cisco and/or its affiliates.
This software is licensed to you under the terms of the Cisco Sample
Code License, Version 1.1 (the "License"). You may obtain a copy of the
License at

https://developer.cisco.com/docs/licenses

All use of the material herein must be in accordance with the terms of
the License. All rights not expressly granted by the License are
reserved. Unless required by applicable law or agreed to separately in
writing, software distributed under the License is distributed on an "AS
IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
"""

''' Get summary information from Network Tunnels to detect Tunnels in "Disconnected" or "Warning" state '''


# Export/Set the environment variables
from SSEAPI import SSEAPI
from email.message import EmailMessage
import smtplib
import datetime
import os
client_id = os.environ.get('API_KEY')
client_secret = os.environ.get('API_SECRET')
email_address = os.environ.get('EMAIL')
passw = os.environ.get('PASSWD')
recipients = ['REPLACE_THIS_VALUE_WITH_YOUR_EMAIL_ADDRESS']
token_url = os.environ.get(
'TOKEN_URL') or 'https://api.sse.cisco.com/auth/v2/token'

def send_email(tunnelInfo):
"""This function will send an alert to the desired recipients"""
msg = EmailMessage()
msg['Subject'] = 'Network Tunnel Error Found!'
msg['From'] = email_address
msg['To'] = recipients
msg.set_content(
f'Connection Error found in Network Tunnel(s). With status DISCONNECTED. Please check your SSE Dashboard for more information')

msg.add_alternative("""
<!DOCTYPE >
<html>
<head>
<meta http-equiv="Content-Type" content="text/html;charset=UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Network Tunnel Monitor</title>
</head>
<body>
<h1>Tunnel connection error detected at """ + str(datetime.datetime.now()) + """</h1>
<p>The Tunnel Monitor script detected a connection error with: """ + tunnelInfo +"""</p>
<br/>
<p>Please check your SSE dashboard.</p>

<style type="text/css">
body{
margin: 0;
background-color: #cccccc;
}
</style>

</body>
</html>
""", subtype='html')

with smtplib.SMTP_SSL('smtp.gmail.com', 465) as smtp:
smtp.login(email_address, passw)
print('login success')
smtp.send_message(msg)
print("Email has been sent to: ", recipients)


# main
if __name__ == '__main__':

# Exit out if the required API_KEY and API_SECRET are not set in the environment
for var in ['API_SECRET', 'API_KEY']:
if os.environ.get(var) == None:
print("Required environment variable: {} not set".format(var))
exit()

try:
# Step 1: Create the API client
sse_api = SSEAPI(token_url, client_id, client_secret)

# Step 2: Send a request checking for status of the Tunnel Groups
tunnel_endpoints = 'deployments/v2/networktunnelgroups'
tunnelComponents = sse_api.ReqGet(tunnel_endpoints).json()
tunnelData = tunnelComponents["data"]
tunnelInfo ="<ul>"
for i in tunnelData:
if i["status"] == "disconnected" or i["status"] == "warning":
tunnelInfo = tunnelInfo + "<li>" + i["name"] + " " + i["deviceType"] + " " + i["status"] + "</li>"
tunnelInfo = tunnelInfo + "</ul>"
send_email(tunnelInfo)

except Exception as e:
raise (e)