Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add script to fix lambda logging #14

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
35 changes: 35 additions & 0 deletions cloud-trail-lookup/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Cloud Trail Lookup

This script will lookup Cloud Trail Event History and find events with "AccessDenied" or "Unauthorized" errors.

## Why?

Throttling error may occur with cli commands, the rate limit is 2 per region per account, and it can't be raised upon request. That's where this script comes in handy. Keep in mind the API call uses the same quota in your account, so please keep the time window for lookup as short as possible.

## What the script does.

This script iterates through all events found by parameters and list events with "AccessDenied" or "Unauthorized" errors.

## Usage

```bash
usage: enable-lambda-log-group.py [-h] [--debug] [--error] [--timestamp]
[--region REGION] [--profile PROFILE]
[--actually-do-it]

optional arguments:
-h, --help show this help message and exit
--debug print debugging info
--error print error info only
--timestamp Output log with timestamp and toolname
--minutes minutes Minutes to look up till now, default is 30, optional
--user User name to look up in the events, can be email or any format shown in Event History, optional
```


## AWS Docs

* [boto3 lookup_events()](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/cloudtrail.html#CloudTrail.Client.lookup_events)



146 changes: 146 additions & 0 deletions cloud-trail-lookup/lookup-events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#!/usr/bin/env python3

import boto3
from botocore.exceptions import ClientError
import os
import logging
import time
from datetime import datetime as dt, timedelta, time
import json

# python3 ./lookup-events.py --debug --minutes 10 --user [email protected]

def get_arg_minutes(args):
minutes_to_lookup = 30
try:
if args.minutes:
minutes_to_lookup = int(args.minutes)
except Exception as e:
logger.info("An exception occurred converting arg minutes: ", e)
logger.info(f"minutes to look up: {minutes_to_lookup}")
return minutes_to_lookup

def get_arg_user(args):
user_to_lookup = ""
if args.user:
user_to_lookup = args.user
logger.info(f"user to look up: {user_to_lookup}")
return user_to_lookup

def main(args, logger):
# just want to quickly run "aws cloudtrail lookup-events" without ThrottleException
# only care about us-east-1, use default profile
client = boto3.client('cloudtrail')

minutes_to_lookup = get_arg_minutes(args)
user_to_lookup = get_arg_user(args)

lastHourDateTime = dt.utcnow() - timedelta(minutes=minutes_to_lookup)
now = dt.utcnow()
startTime = lastHourDateTime
endTime = now

logger.info(f"start Time: {startTime}")
logger.info(f"end time: {endTime}")

maxResult = 200
events_found = []
# no NextToken at first
event_arg = {"StartTime": startTime, "EndTime": endTime, "MaxResults": maxResult}
if user_to_lookup:
event_arg["LookupAttributes"] = [{
'AttributeKey': 'Username',
'AttributeValue': user_to_lookup
}]
response = client.lookup_events(**event_arg)

# process response, add to events array
events_found = events_found + search_events(response["Events"])

if "NextToken" in response:
nextToken = response["NextToken"]

logger.debug(nextToken)

while nextToken:
event_arg["NextToken"] = nextToken
response = client.lookup_events(**event_arg)

events_found = events_found + search_events(response["Events"])

if "NextToken" in response:
nextToken = response["NextToken"]
logger.debug(nextToken)
else:
# loop ends
nextToken = ""
logger.info(f"events_found: {events_found}")
logger.info(f"{len(events_found)} events found")
return events_found


def search_events(events):
# list events with error code of "accessdenied" or "unauthorized"
filtered = []
for event in events:
event_detail_str = event["CloudTrailEvent"]
event_detail = json.loads(event_detail_str)
if "errorCode" in event_detail:
if event_detail["errorCode"].lower() == "accessdenied" or event_detail["errorCode"].lower() == "unauthorized":
filtered.append(event)
return filtered


def do_args():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--debug", help="print debugging info", action='store_true')
parser.add_argument(
"--error", help="print error info only", action='store_true')
parser.add_argument(
"--timestamp", help="Output log with timestamp and toolname", action='store_true')
parser.add_argument(
"--minutes", help="minutes to look up till now")
parser.add_argument(
"--user", help="user name to look up")

args = parser.parse_args()

return (args)


if __name__ == '__main__':

args = do_args()

# create console handler and set level to debug
logger = logging.getLogger('cloud-trail-lookup-events')
ch = logging.StreamHandler()
if args.debug:
logger.setLevel(logging.DEBUG)
elif args.error:
logger.setLevel(logging.ERROR)
else:
logger.setLevel(logging.INFO)

# Silence Boto3 & Friends
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)

# create formatter
if args.timestamp:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
else:
formatter = logging.Formatter('%(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)

try:
main(args, logger)
except KeyboardInterrupt:
exit(1)
40 changes: 40 additions & 0 deletions ecr-basic-scan/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# ECR Registry Basic Scan

This script will lookup ECR repos and images given an ECR registry id, then trigger scan manually for each image given that the registry scan configuration is set to Basic scan.

## Why?

Manual scans using enhanced scanning isn’t supported.
When continuous scanning is enabled for a repository, if an image hasn’t been updated in the past 30 days based on the image push timestamp, then continuous scanning is suspended for that image.


## What the script does not do
Registry scan configuration as Basic scan needs to be set up ahead of time. Note that as of this writing, basic scanning is not able to detect the log4j 2 vulnerability.


## Errors
The following error could occur as an image can only be scanned once per day:
"An error occurred (LimitExceededException) when calling the StartImageScan operation: The scan quota per image has been exceeded. Wait and try again."

## Usage

```bash
usage: automate-manual-scan.py [-h] [--debug] [--error] [--timestamp]
[--registry-id]
[--actually-do-it]

optional arguments:
-h, --help show this help message and exit
--debug print debugging info
--error print error info only
--timestamp Output log with timestamp and toolname
--minutes minutes Minutes to look up till now, default is 30, optional
--user User name to look up in the events, can be email or any format shown in Event History, optional
```

## AWS Docs

* [boto3 ECR](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecr.html)



165 changes: 165 additions & 0 deletions ecr-basic-scan/automate-manual-scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#!/usr/bin/env python3

import boto3
from botocore.exceptions import ClientError
import os
import logging
import time
from datetime import datetime as dt, timedelta, time
import json

# Assume basic scan setting has been set up in the private registry, otherwise the script throws error
# Trigger manual scan for all images in a repos, catch and display if any errors for each, continue with loop
# python3 automate-manual-scan.py --registry-id <accountNumberForPrivateRegistry>
def start_image_scan(client, registry_id, repo_name, image_digest):
try:
response = client.start_image_scan(
registryId=registry_id,
repositoryName=repo_name,
imageId={
'imageDigest': image_digest
}
)
logger.info(response)
except Exception as e:
logger.info(f'An exception occurred starting image scan in repo {repo_name} with image digest {image_digest}')


def list_ecr_repos(client, registry_id):
MAX_RESULT = 100
repo_names = []
payload = {
'registryId': registry_id,
'maxResults': MAX_RESULT
}
try:
response = client.describe_repositories(**payload)
repos = response['repositories']
repo_names = [repo['repositoryName'] for repo in repos]
if "NextToken" in response:
nextToken = response["NextToken"]

while nextToken:
payload["NextToken"] = nextToken
response = client.describe_repositories(**payload)
repo_names = repo_names + [repo['repositoryName'] for repo in repos]

if "NextToken" in response:
nextToken = response["NextToken"]
logger.debug(nextToken)
else:
# loop ends
nextToken = ""
return repo_names
except Exception as e:
logger.info(f'An exception occurred in list_ecr_repos')


"""
returned array in this format
[
{
'imageDigest': 'string',
'imageTag': 'string'
},
],
"""
def list_images(client, registry_id, repo_name):
MAX_RESULT = 100
image_ids = []
payload = {
'registryId': registry_id,
'repositoryName': repo_name,
'maxResults': MAX_RESULT,
"filter": {
'tagStatus': 'ANY'
}
}
try:
response = client.list_images(**payload)
image_ids = response['imageIds']
if "NextToken" in response:
nextToken = response["NextToken"]

while nextToken:
payload["NextToken"] = nextToken
response = client.list_images(**payload)
image_ids = image_ids + response['imageIds']

if "NextToken" in response:
nextToken = response["NextToken"]
logger.debug(nextToken)
else:
# loop ends
nextToken = ""
return image_ids
except Exception as e:
logger.info(f'An exception occurred listing images in repo {repo_name}')


def main(args, logger):
# use default profile
client = boto3.client('ecr')
logger.info(f'registry_id: {args.registry_id}')
repo_names = list_ecr_repos(client, args.registry_id)
for repo_name in repo_names:
logger.info(f'repo_name: {repo_name}')
image_ids = list_images(client, args.registry_id, repo_name)
for image_id in image_ids:
if args.actually_do_it is True:
start_image_scan(client, args.registry_id, repo_name, image_id['imageDigest'])


def do_args():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument(
"--debug", help="print debugging info", action='store_true')
parser.add_argument(
"--error", help="print error info only", action='store_true')
parser.add_argument(
"--timestamp", help="Output log with timestamp and toolname", action='store_true')
parser.add_argument(
"--actually-do-it", help="Actually Perform the action", action='store_true')
parser.add_argument(
"--registry-id", help="private registry id (account number)")

args = parser.parse_args()

return (args)


if __name__ == '__main__':

args = do_args()

# create console handler and set level to debug
logger = logging.getLogger('cloud-trail-lookup-events')
ch = logging.StreamHandler()
if args.debug:
logger.setLevel(logging.DEBUG)
elif args.error:
logger.setLevel(logging.ERROR)
else:
logger.setLevel(logging.INFO)

# Silence Boto3 & Friends
logging.getLogger('botocore').setLevel(logging.WARNING)
logging.getLogger('boto3').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)

# create formatter
if args.timestamp:
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
else:
formatter = logging.Formatter('%(levelname)s - %(message)s')
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
logger.addHandler(ch)

try:
main(args, logger)
except KeyboardInterrupt:
exit(1)
Loading