-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathload_dynamodb_lms.py
256 lines (208 loc) · 9.99 KB
/
load_dynamodb_lms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import asyncio
import contextlib
import csv
import datetime as dt
import json
import logging
import os
import re
import sys
import time
from argparse import ArgumentParser as Ap, Namespace
from typing import Any
import aioboto3
import aiofiles
import boto3
from aiocsv import AsyncDictReader
from botocore.exceptions import ClientError, ParamValidationError
# keep these constants
AWS_REGION = 'us-west-2'
TABLE_NAME: str = 'lms_assignments'
POS_INT_REGX: str = r'^\d+$'
ERROR_HELP_STRINGS = {
# Operation specific errors
'ConditionalCheckFailedException': 'Condition check specified in the operation failed,'
' review and update the condition check before retrying',
'TransactionConflictException': 'Operation was rejected because there is an ongoing transaction for'
' the given, generally safe to retry with exponential back-off',
'ItemCollectionSizeLimitExceededException': 'A given collection is too large, you\'re using Local Secondary Index'
' and exceeded size limit of items per partition key.'
' Consider using Global Secondary Index instead',
# Common Errors
'AccessDeniedException': 'Configure identity based access before retrying',
'InternalServerError': 'Internal Server Error,'
' generally safe to retry with exponential back-off',
'ProvisionedThroughputExceededException': 'Request rate is too high. If you\'re using a custom retry strategy'
' make sure to retry with exponential back-off. Otherwise consider'
' reducing frequency of requests or increasing provisioned capacity'
' for your table or secondary index',
'ResourceNotFoundException': 'One of the tables was not found, verify table exists before retrying',
'ServiceUnavailable': 'Had trouble reaching DynamoDB. generally safe to retry with'
' exponential back-off',
'ThrottlingException': 'Request denied due to throttling, generally safe to retry with'
' exponential back-off',
'UnrecognizedClientException': 'The request signature is incorrect most likely due to an'
' invalid AWS access key ID or secret key, fix before retrying',
'ValidationException': 'The input fails to satisfy the constraints specified by DynamoDB,'
' fix input before retrying',
'RequestLimitExceeded': 'Throughput exceeds the current throughput limit for your account,'
' increase account level throughput before retrying',
}
# these constants are for running this on a dev host
HOME_DIR = os.path.abspath(os.path.join(os.path.realpath(__file__), os.pardir))
RESULT_DIR = f'{HOME_DIR}/results/'
global args, dynamodb, logger
# functions
def parse_arguments() -> Namespace:
"""
Returns a dictionary containing all arguments provided on the command line
:return: dict
"""
parser = Ap(
description="Adds data from a specified csv file to the LMS DynamoDB table"
)
parser.add_argument(
'file',
type=str,
help="The file containing csv data"
)
parser.add_argument(
'-D', '--debug',
action='store_true',
help='Set logging level to DEBUG '
'(default is INFO)'
)
arguments: dict[str, Any] = vars(parser.parse_args())
if "file" not in arguments.keys():
parser.error("You must specify a file")
if "file" in arguments.keys() is not None and not os.path.isfile(arguments["file"]):
parser.error("The specified file does not exist")
if "file" in arguments.keys() is not None and not str(arguments["file"]).endswith('.csv'):
parser.error("You must specify a csv file")
return parser.parse_args()
def convert_date(date: str) -> str:
if date is None or date == '':
return ''
else:
return dt.datetime.strptime(date, '%m/%d/%y').isoformat()
def convert_nullable(value: str) -> str:
return value if value is not None and value != '' else 'null'
def convert_orgcode(code: str) -> str:
return code.replace('01HD', '')
def log_boto_client_error(error) -> None:
error_code = error.response['Error']['Code']
error_help_string = ERROR_HELP_STRINGS[error_code]
logger.exception('[%s] %s.\n\tError message: %s', error_code, error_help_string, error.response['Error']['Message'])
def build_item(given: dict) -> dict:
global logger
item: dict = None
if re.fullmatch(POS_INT_REGX, given['Username']): # only process items representing persons
try:
item = {
"lms_user_id": int(given['Username']),
"activity_id": int(given['ActivityIDSource']),
"calnet_uid": given['SourceIDEmpPk'],
"empl_id": convert_nullable(given['LocalEmployeeID']),
"full_name": given['EmpFullName1'],
"given_name": given['FirstName'],
"family_name": given['LastName'],
"empl_org_code": convert_orgcode(given['UserPrimaryOrganizationCode']),
"manager_empl_id": convert_nullable(given['ManagerLocalEmployeeID']),
"activity_code": given['ActivityCode'],
"activity_name": given['ActivityName'],
"is_required": bool(given['AssignmentStatus'] == 'Required'),
"assignment_status": given['UCRequirementStatus'],
"assigned_date": convert_date(given['PlanDate']),
"due_date": convert_date(given['DueDate']),
"expiration_date": convert_date(given['ExpirationDate']),
"last_attempt_date": convert_date(given['AttemptEndDate']),
"last_completion_date": convert_date(given['LastCompletionDateRealtime']),
"last_updated_datetime": dt.datetime.now().isoformat(timespec='seconds'),
"last_updated_event_id": 0
}
except Exception as e:
logger.debug('Exception while building item: %s', e)
raise e
return item
async def load_from_csv() -> int:
global dynamodb, logger
item_count: int = 0
table = await dynamodb.Table(TABLE_NAME)
async with aiofiles.open(args.file, mode='r', encoding='utf-8', errors='replace') as f:
reader: AsyncDictReader[str] = AsyncDictReader(f)
async with table.batch_writer() as writer:
writes = []
async for this in reader:
try:
item = build_item(this)
if item:
future = asyncio.ensure_future(writer.put_item(Item=item))
writes.append(future)
item_count += 1
await asyncio.gather(*writes)
if item_count % 100 == 0:
logger.debug('Loaded %s items', item_count)
# no need to load all 180k for method comparison
if item_count == 5000:
item_count = item_count
break
except ValueError as e:
logger.exception('Invalid value while putting item: %s\n%s', item, e)
except ParamValidationError as e:
logger.exception('Invalid parameter while putting item: %s\n%s', item, e)
except ClientError as e:
log_boto_client_error(e)
return item_count
except Exception as e:
logger.exception('Unknown error during load_from_csv: %s', e)
return item_count
return item_count
async def main() -> None:
global args, dynamodb, logger
args = parse_arguments()
# remove this block once automated
if not os.path.exists(RESULT_DIR):
os.makedirs(RESULT_DIR)
results_file = RESULT_DIR + 'load_dynamodb_lms_' + dt.datetime.now().strftime('%Y%m%d%H%M') + '.txt'
# end of optional block
# set logging
logging.basicConfig(
format='%(asctime)s %(levelname)s: %(message)s',
handlers=[
logging.FileHandler(filename=results_file, mode='a'), # this line can be removed w results file block
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
if args.debug:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.INFO)
# begin
start_time = time.time()
logger.info('Started')
try:
# create a durable aws session
stack = contextlib.AsyncExitStack()
data_session = aioboto3.Session(
aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
aws_session_token=os.environ['AWS_SESSION_TOKEN']
)
async with data_session.resource('dynamodb', region_name=AWS_REGION) as dynamodb:
item_count: int = await load_from_csv()
except FileNotFoundError as e:
logger.exception('Unable to find specified file (%s): %s', args.file, e)
except csv.Error as e:
logger.exception('Unable to read CSV file (%s): %s', args.file, e)
except IOError as e:
logger.exception('Unexpected IO problem: %s', e)
except ClientError as e:
log_boto_client_error(e)
except Exception as e:
logger.exception('Unknown problem: %s', e)
await stack.aclose()
logger.info('Loaded %s items in %s', item_count, dt.timedelta(seconds=time.time()-start_time))
# main
if __name__ == "__main__":
asyncio.run(main())