Skip to content

Commit cbc2288

Browse files
Add Cloudwatch integration
1 parent 50078ec commit cbc2288

File tree

4 files changed

+132
-67
lines changed

4 files changed

+132
-67
lines changed

requirements.txt

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
app-common-python>=0.1.9
2-
aiobotocore==1.4.1
32
aiohttp==3.7.4.post0
43
aioitertools==0.8.0
54
async-timeout==3.0.1
65
attrs==21.2.0
7-
botocore==1.20.106
6+
botocore<1.22.9
87
chardet==4.0.0
98
fsspec==2021.8.1
109
idna==3.2

setup.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,10 @@
1414
package_dir={'': 'src'},
1515
packages=find_packages(where='src'),
1616
python_requires='>=3.6, <4',
17-
install_require=[
18-
'app-common-python'
17+
install_requires=[
18+
'app-common-python',
19+
'boto3',
20+
'watchtower'
1921
],
2022
extras_require={
2123
'test': ['pytest'],

src/floorist/config.py

+28
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import logging
2+
13
from app_common_python import LoadedConfig, ObjectBuckets
24
from app_common_python import isClowderEnabled
35
from os import environ, access, R_OK
@@ -20,6 +22,8 @@ class Config:
2022
database_password=attr.ib(default=None)
2123
database_name=attr.ib(default=None)
2224
floorplan_filename=attr.ib(default=None)
25+
loglevel=attr.ib(default="INFO")
26+
cloudwatch_config=attr.ib(factory=dict)
2327

2428

2529
def get_config():
@@ -28,6 +32,7 @@ def get_config():
2832
_set_bucket_config(config)
2933
_set_database_config(config)
3034
_set_floorist_config(config)
35+
_set_logging_config(config)
3136
_validate_config(config)
3237

3338
return config
@@ -48,6 +53,7 @@ def _get_bucket_url(endpoint):
4853
else:
4954
return f"https://{endpoint}"
5055

56+
5157
def get_bucket_requested_name_from_environment():
5258

5359
name = environ.get('AWS_BUCKET')
@@ -84,6 +90,28 @@ def _set_floorist_config(config):
8490
config.floorplan_filename = environ.get('FLOORPLAN_FILE')
8591

8692

93+
def _set_logging_config(config):
94+
95+
config.loglevel = environ.get('LOGLEVEL', 'INFO').upper()
96+
97+
if isClowderEnabled():
98+
_set_cloudwatch_config_from_clowder(config)
99+
else:
100+
_set_cloudwatch_config_from_environment(config)
101+
102+
103+
def _set_cloudwatch_config_from_clowder(config):
104+
105+
if LoadedConfig.logging:
106+
config.cloudwatch_config = LoadedConfig.logging.cloudwatch
107+
else:
108+
logging.warning("No Cloudwatch logging config provided by Clowder!")
109+
110+
111+
def _set_cloudwatch_config_from_environment(config):
112+
pass
113+
114+
87115
def _validate_config(config):
88116

89117
if not config.floorplan_filename:

src/floorist/floorist.py

+99-63
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,114 @@
11
from datetime import date
22
from uuid import uuid4 as uuid
3+
4+
import watchtower
35
from s3fs import S3FileSystem as s3
46
from floorist.config import get_config
5-
from os import environ
7+
import os
8+
from boto3.session import Session
69

710
import logging
811
import pandas.io.sql as sqlio
912
import psycopg2
1013
import yaml
1114

1215

13-
def _configure_loglevel():
16+
def _get_logger(config):
17+
logging.basicConfig(level=config.loglevel)
18+
logger = logging.getLogger(__name__)
19+
logger.setLevel(config.loglevel)
1420

15-
LOGLEVEL = environ.get('LOGLEVEL', 'INFO').upper()
16-
logging.basicConfig(level=LOGLEVEL)
21+
if config.cloudwatch_config:
22+
logger.info("Configuring Cloudwatch logging")
23+
logger.addHandler(_get_cloudwatch_handler(config.cloudwatch_config))
24+
else:
25+
logger.info("Cloudwatch config not found - skipping")
1726

27+
return logger
1828

19-
def main():
2029

21-
_configure_loglevel()
22-
config = get_config()
23-
24-
# Fails if can't connect to S3 or the bucket does not exist
25-
s3(secret=config.bucket_secret_key, key=config.bucket_access_key,
26-
client_kwargs={'endpoint_url': config.bucket_url }).ls(config.bucket_name)
27-
logging.debug('Successfully connected to the S3 bucket')
28-
29-
conn = psycopg2.connect(
30-
host=config.database_hostname,
31-
user=config.database_username,
32-
password=config.database_password,
33-
database=config.database_name
34-
)
35-
logging.debug('Successfully connected to the database')
36-
37-
dump_count = 0
38-
dumped_count = 0
39-
40-
with open(config.floorplan_filename, 'r') as stream:
41-
# This try block allows us to proceed if a single SQL query fails
42-
for row in yaml.safe_load(stream):
43-
dump_count += 1
44-
45-
try:
46-
logging.debug(f"Dumping #{dump_count}: {row['query']} to {row['prefix']}")
47-
48-
data = sqlio.read_sql_query(row['query'], conn)
49-
target = '/'.join([
50-
f"s3://{config.bucket_name}",
51-
row['prefix'],
52-
date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d'),
53-
f"{uuid()}.parquet"
54-
])
55-
56-
data.to_parquet(
57-
path=target,
58-
compression='gzip',
59-
index=False,
60-
storage_options={
61-
'secret': config.bucket_secret_key,
62-
'key' : config.bucket_access_key,
63-
'client_kwargs':{'endpoint_url': config.bucket_url }
64-
}
65-
)
66-
67-
logging.debug(f"Dumped #{dumped_count}: {row['query']} to {row['prefix']}")
68-
69-
dumped_count += 1
70-
except Exception as ex:
71-
logging.exception(ex)
72-
73-
logging.info(f'Dumped {dumped_count} from total of {dump_count}')
74-
75-
conn.close()
76-
77-
if dumped_count != dump_count:
78-
exit(1)
30+
def _get_cloudwatch_handler(config):
31+
aws_access_key_id = config.accessKeyId
32+
aws_secret_access_key = config.secretAccessKey
33+
aws_region_name = config.region
34+
aws_log_group = config.logGroup
35+
aws_log_stream = os.getenv("AWS_LOG_STREAM", _get_hostname())
36+
37+
logging.info(f"Configuring watchtower logging (log_group={aws_log_group}, "
38+
f"stream_name={aws_log_stream})")
39+
boto3_session = Session(
40+
aws_access_key_id=aws_access_key_id,
41+
aws_secret_access_key=aws_secret_access_key,
42+
region_name=aws_region_name,
43+
)
44+
handler = watchtower.CloudWatchLogHandler(boto3_session=boto3_session,
45+
stream_name=aws_log_stream,
46+
log_group=aws_log_group,
47+
create_log_group=False)
48+
49+
return handler
50+
51+
52+
def _get_hostname():
53+
return os.uname().nodename
54+
55+
56+
def main():
57+
config = get_config()
58+
logger = _get_logger(config)
59+
60+
# Fails if can't connect to S3 or the bucket does not exist
61+
s3(secret=config.bucket_secret_key, key=config.bucket_access_key,
62+
client_kwargs={'endpoint_url': config.bucket_url}).ls(config.bucket_name)
63+
logger.debug('Successfully connected to the S3 bucket')
64+
65+
conn = psycopg2.connect(
66+
host=config.database_hostname,
67+
user=config.database_username,
68+
password=config.database_password,
69+
database=config.database_name
70+
)
71+
logger.debug('Successfully connected to the database')
72+
73+
dump_count = 0
74+
dumped_count = 0
75+
76+
with open(config.floorplan_filename, 'r') as stream:
77+
# This try block allows us to proceed if a single SQL query fails
78+
for row in yaml.safe_load(stream):
79+
dump_count += 1
80+
81+
try:
82+
logger.debug(f"Dumping #{dump_count}: {row['query']} to {row['prefix']}")
83+
84+
data = sqlio.read_sql_query(row['query'], conn)
85+
target = '/'.join([
86+
f"s3://{config.bucket_name}",
87+
row['prefix'],
88+
date.today().strftime('year_created=%Y/month_created=%-m/day_created=%-d'),
89+
f"{uuid()}.parquet"
90+
])
91+
92+
data.to_parquet(
93+
path=target,
94+
compression='gzip',
95+
index=False,
96+
storage_options={
97+
'secret': config.bucket_secret_key,
98+
'key': config.bucket_access_key,
99+
'client_kwargs': {'endpoint_url': config.bucket_url}
100+
}
101+
)
102+
103+
logger.debug(f"Dumped #{dumped_count}: {row['query']} to {row['prefix']}")
104+
105+
dumped_count += 1
106+
except Exception as ex:
107+
logger.exception(ex)
108+
109+
logger.info(f'Dumped {dumped_count} from total of {dump_count}')
110+
111+
conn.close()
112+
113+
if dumped_count != dump_count:
114+
exit(1)

0 commit comments

Comments
 (0)