Skip to content

Commit 3a90c85

Browse files
aaunario-keepersk-keeper
authored andcommitted
compliance rar + server-side-throttling improvements + trash restore fix: API call optimizations, better throttle-handling, bug-fix for crash when deleted record shared-folder parent has no sub-folders
1 parent 8d7f6ec commit 3a90c85

File tree

3 files changed

+136
-112
lines changed

3 files changed

+136
-112
lines changed

keepercommander/api.py

+7-2
Original file line numberDiff line numberDiff line change
@@ -689,8 +689,8 @@ def communicate_rest(params, request, endpoint, *, rs_type=None, payload_version
689689
raise KeeperApiError('Error', endpoint)
690690

691691

692-
def communicate(params, request):
693-
# type: (KeeperParams, dict) -> dict
692+
def communicate(params, request, retry_on_throttle=True):
693+
# type: (KeeperParams, dict, Optional[bool]) -> dict
694694

695695
request['client_time'] = current_milli_time()
696696
request['locale'] = LOCALE
@@ -700,6 +700,11 @@ def communicate(params, request):
700700
try:
701701
response_json = run_command(params, request)
702702
if response_json['result'] != 'success':
703+
if retry_on_throttle and response_json.get('result_code') == 'throttled':
704+
logging.info('Throttled. sleeping for 10 seconds')
705+
time.sleep(10)
706+
# Allow maximum 1 retry per call
707+
return communicate(params, request, retry_on_throttle=False)
703708
raise KeeperApiError(response_json['result_code'], response_json['message'])
704709
TTK.update_time_of_last_activity()
705710
return response_json

keepercommander/commands/compliance.py

+127-108
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import argparse
22
import datetime
3+
import itertools
34
import json
45
import logging
56
import operator
67
from functools import partial
7-
from typing import Optional, Dict, Tuple, List, Any, Iterable
8+
from typing import Optional, Dict, Tuple, List, Any, Iterable, Union
89

910
from keepercommander.commands.base import GroupCommand, dump_report_data, field_to_title
1011
from keepercommander.commands.enterprise_common import EnterpriseCommand
@@ -366,52 +367,50 @@ def execute(self, params, **kwargs): # type: (KeeperParams, any) -> any
366367

367368
def generate_report_data(self, params, kwargs, sox_data, report_fmt, node, root_node):
368369
# type: (KeeperParams, Dict[str, Any], sox.sox_data.SoxData, str, int, int) -> List[List[str]]
369-
def get_records_accessed(email, records=None):
370-
records_accessed = dict() # type: Dict[str, Dict[str, Any]]
370+
def get_records_accessed_rq(email, filter_recs=None, created_max=None):
371+
# type: (str, Optional[List[str]], Optional[int]) -> Union[None, Dict[str, Any]]
371372
# Empty record filter list -> no records to search for
372-
if records is not None and not records:
373-
return records_accessed
373+
if filter_recs is not None and not filter_recs:
374+
return None
374375

375376
columns = ['record_uid', 'ip_address', 'keeper_version']
376377
rq_filter = {'username': email}
377-
if records is not None:
378-
rq_filter['record_uid'] = records
379-
380-
rq = {
381-
'command': 'get_audit_event_reports',
382-
'report_type': 'span',
383-
'scope': 'enterprise',
384-
'aggregate': ['last_created'],
385-
'limit': API_EVENT_SUMMARY_ROW_LIMIT,
386-
'filter': rq_filter,
387-
'columns': columns,
388-
}
389-
390-
def update_records_accessed(record_access_events):
391-
for event in record_access_events:
392-
r_uid = event.get('record_uid')
393-
records_accessed.setdefault(r_uid, event)
394-
395-
def get_events(period_end, filter_recs):
396-
if period_end:
397-
rq_filter['created'] = {'max': period_end}
378+
if filter_recs is not None:
398379
rq_filter['record_uid'] = filter_recs
399-
rs = api.communicate(params, rq)
400-
return rs.get('audit_event_overview_report_rows')
401-
402-
done = records is not None and not records
403-
max_ts = 0
404-
missing_records = [] if not records else [*records]
405-
while not done:
406-
chunk = missing_records[:API_EVENT_SUMMARY_ROW_LIMIT]
407-
events = get_events(max_ts, chunk)
408-
update_records_accessed(events)
409-
missing_records = [r for r in records if r not in records_accessed] if records else []
380+
if created_max:
381+
rq_filter['created'] = {'max': created_max}
382+
383+
return dict(
384+
command = 'get_audit_event_reports',
385+
report_type = 'span',
386+
scope = 'enterprise',
387+
aggregate = ['last_created'],
388+
limit = API_EVENT_SUMMARY_ROW_LIMIT,
389+
filter = rq_filter,
390+
columns = columns
391+
)
392+
393+
# Extract data and meta-data from the server response, which determines the next request and/or its filter params
394+
# 2nd value of returned Tuple (filters for the next request) is None if there are no more events to fetch
395+
def process_access_events(events, filter_recs=None):
396+
records_accessed = dict() # type: Dict[str, Dict[str, Any]]
397+
recs_set = set(filter_recs or [])
398+
399+
for event in events:
400+
r_uid = event.get('record_uid')
401+
records_accessed.setdefault(r_uid, event)
402+
recs_set.discard(r_uid)
403+
404+
queries_done = len(events) < API_EVENT_SUMMARY_ROW_LIMIT or not recs_set and filter_recs is not None
405+
filter_params = None
406+
if not queries_done:
410407
earliest_event = {} if not events else events[-1]
411-
max_ts = int(earliest_event.get('last_created', 0))
412-
done = not missing_records and records or len(events) < API_EVENT_SUMMARY_ROW_LIMIT
408+
filter_params = dict(
409+
filter_recs = list(recs_set) if filter_recs is not None else filter_recs,
410+
created_max = int(earliest_event.get('last_created', 0))
411+
)
413412

414-
return records_accessed
413+
return records_accessed, filter_params
415414

416415
def format_datetime(dt_str):
417416
if not dt_str:
@@ -423,18 +422,17 @@ def from_ts(ts):
423422
return datetime.datetime.fromtimestamp(ts) if ts else None
424423

425424
def compile_user_report(user, access_events):
426-
access_records = dict()
427-
user_access_data = {user: access_records}
425+
accessed_records = dict()
428426
rec_uids = access_events.keys() if report_type == report_type_default \
429-
else {r for r in vault_records}
427+
else sox_data.get_vault_records(user).keys()
430428

431429
for uid in rec_uids:
432430
access_event = access_events.get(uid, {})
433431
sox_rec = sox_data.get_records().get(uid)
434432
rec_info = sox_rec.data if sox_rec else {}
435433
rec_owner = sox_data.get_record_owner(uid)
436434
event_ts = access_event.get('last_created')
437-
access_record = {uid: {'record_title': rec_info.get('title'),
435+
accessed_record = {uid: {'record_title': rec_info.get('title'),
438436
'record_type': rec_info.get('record_type'),
439437
'record_url': rec_info.get('url', '').rstrip('/'),
440438
'record_owner': rec_owner and rec_owner.email,
@@ -444,8 +442,8 @@ def compile_user_report(user, access_events):
444442
'device': access_event.get('keeper_version'),
445443
'last_access': from_ts(int(event_ts)) if event_ts else None,
446444
'vault_owner': user}}
447-
access_records.update(access_record)
448-
return user_access_data
445+
accessed_records.update(accessed_record)
446+
return accessed_records
449447

450448
def get_aging_data(rec_ids):
451449
if not rec_ids:
@@ -458,66 +456,67 @@ def get_aging_data(rec_ids):
458456
stored_aging_data = {e.record_uid: {'created': from_ts(e.created), 'last_modified': from_ts(e.last_modified), 'last_rotation': from_ts(e.last_rotation)} for e in stored_entities}
459457
aging_data.update(stored_aging_data)
460458

461-
from keepercommander.commands.aram import AuditReportCommand
462-
cmd = AuditReportCommand()
463-
464-
def get_events(record_filter, type_filter, order='desc', aggregate='last_created'):
465-
events = []
466-
cmd_kwargs = {'report_type': 'span',
467-
'columns': ['record_uid'],
468-
'format': 'json',
469-
'limit': API_EVENT_SUMMARY_ROW_LIMIT,
470-
'order': order,
471-
'aggregate': [aggregate]}
472-
if type_filter:
473-
cmd_kwargs['event_type'] = type_filter
474-
while record_filter:
475-
chunk = record_filter[:API_EVENT_SUMMARY_ROW_LIMIT]
476-
record_filter = record_filter[API_EVENT_SUMMARY_ROW_LIMIT:]
477-
cmd_kwargs['record_uid'] = chunk
478-
event_data = cmd.execute(params, **cmd_kwargs)
479-
event_data = json.loads(event_data)
480-
events.extend(event_data)
481-
return events
459+
def get_requests(filter_recs, filter_type, order='desc', aggregate='last_created'):
460+
columns = ['record_uid']
461+
requests = []
462+
while filter_recs:
463+
chunk = filter_recs[:API_EVENT_SUMMARY_ROW_LIMIT]
464+
filter_recs = filter_recs[API_EVENT_SUMMARY_ROW_LIMIT:]
465+
request = dict(
466+
command = 'get_audit_event_reports',
467+
report_type = 'span',
468+
scope = 'enterprise',
469+
aggregate = [aggregate],
470+
limit = API_EVENT_SUMMARY_ROW_LIMIT,
471+
filter = dict(record_uid=chunk, audit_event_type=filter_type),
472+
columns = columns,
473+
order = order
474+
)
475+
requests.append(request)
476+
return requests
477+
478+
def get_request_params(record_aging_event):
479+
# type: (str) -> Tuple[List[str], Union[List[str], None], Optional[str], Optional[str]]
480+
known_events_map = get_known_aging_data(record_aging_event)
481+
filter_recs = [uid for uid in rec_ids if uid not in known_events_map]
482+
types_by_aging_event = dict(
483+
created = None,
484+
last_modified = ['record_update'],
485+
last_rotation = ['record_rotation_scheduled_ok', 'record_rotation_on_demand_ok']
486+
)
487+
filter_types = types_by_aging_event.get(record_aging_event)
488+
order, aggregate = ('asc', 'first_created') if record_aging_event == 'created' \
489+
else ('desc', 'last_created')
490+
return filter_recs, filter_types, order, aggregate
491+
492+
def fetch_events(requests):
493+
return list(
494+
itertools.chain.from_iterable(
495+
[rs.get('audit_event_overview_report_rows', []) for rs in api.execute_batch(params, requests)]
496+
)
497+
)
498+
499+
def get_aging_events(aging_prop):
500+
req_params = get_request_params(aging_prop)
501+
requests = get_requests(*req_params)
502+
return fetch_events(requests)
482503

483504
def get_known_aging_data(event_type):
484505
return {r: events.get(event_type) for r, events in stored_aging_data.items() if events.get(event_type) or 0 >= max_stored_age_ts}
485506

486-
def get_created_dts():
487-
known_rec_created_lookup = get_known_aging_data('created')
488-
for rec_id, dt in known_rec_created_lookup.items():
489-
aging_data[rec_id]['created'] = dt
490-
r_filter = [uid for uid in rec_ids if uid not in known_rec_created_lookup]
491-
event_data = get_events(r_filter, None, 'asc', 'first_created')
492-
record_created_lookup = {event.get('record_uid'): event.get('first_created') for event in event_data}
493-
for rec, created in record_created_lookup.items():
494-
aging_data[rec]['created'] = format_datetime(created)
495-
496-
def get_last_modified_dts():
497-
known_rec_last_modified_lookup = get_known_aging_data('last_modified')
498-
for rec_id, dt in known_rec_last_modified_lookup.items():
499-
aging_data[rec_id]['last_modified'] = dt
500-
r_filter = [uid for uid in rec_ids if uid not in known_rec_last_modified_lookup]
501-
event_data = get_events(r_filter, ['record_update'])
502-
dt_lookup = {event.get('record_uid'): event.get('last_created') for event in event_data}
503-
for rec, dt in dt_lookup.items():
504-
aging_data[rec]['last_modified'] = format_datetime(dt)
505-
for rec, events in aging_data.items():
506-
events['last_modified'] = events.get('last_modified') or events.get('created')
507-
508-
def get_last_rotation_dts():
509-
known_rec_last_rotation_lookup = get_known_aging_data('last_rotation')
510-
for rec_id, dt in known_rec_last_rotation_lookup.items():
511-
aging_data[rec_id]['last_rotation'] = dt
512-
r_filter = [uid for uid in rec_ids if uid not in known_rec_last_rotation_lookup]
513-
event_data = get_events(r_filter, ['record_rotation_scheduled_ok', 'record_rotation_on_demand_ok'])
514-
dt_lookup = {event.get('record_uid'): event.get('last_created') for event in event_data}
515-
for rec, dt in dt_lookup.items():
516-
aging_data[rec]['last_rotation'] = format_datetime(dt)
517-
518-
get_created_dts()
519-
get_last_modified_dts()
520-
get_last_rotation_dts()
507+
def get_aging_event_dts(event_type):
508+
events = get_aging_events(event_type)
509+
aggregate = 'first_created' if event_type == 'created' else 'last_created'
510+
record_timestamps = {event.get('record_uid', ''): event.get(aggregate) for event in events}
511+
return {rec: format_datetime(ts) for rec, ts in record_timestamps.items()}
512+
513+
aging_stats = ['created', 'last_modified', 'last_rotation']
514+
record_events_by_stat = {stat: get_aging_event_dts(stat) for stat in aging_stats}
515+
for stat, record_event_dts in record_events_by_stat.items():
516+
for record, dt in record_event_dts.items():
517+
aging_data.get(record, {}).update({stat: dt})
518+
stat == 'created' and aging_data.get(record, {}).setdefault('last_modified', dt)
519+
521520
save_aging_data(aging_data)
522521
return aging_data
523522

@@ -551,6 +550,30 @@ def compile_report_data(rec_ids):
551550
row.append(value)
552551
report_data.append(row)
553552

553+
def get_records_accessed(emails, limit_to_vault=False):
554+
# type: (List[str], Optional[bool]) -> Dict[str, Dict[str, List[str]]]
555+
get_rec_filter = lambda e: list(sox_data.get_vault_records(e).keys()) if limit_to_vault else None
556+
records_accessed_by_user = {e: dict() for e in emails}
557+
filters_by_user = {e: dict(filter_recs=get_rec_filter(e)) for e in emails}
558+
should_query = lambda rq_filter: rq_filter and (rq_filter.get('filter_recs') or not limit_to_vault)
559+
# Make requests in batches, walking backwards in time (w/ query filters) for all users in parallel (1 user per sub-request)
560+
while True:
561+
users_to_query = [user for user, user_filter in filters_by_user.items() if should_query(user_filter)]
562+
if not users_to_query:
563+
break
564+
requests = [get_records_accessed_rq(email, **filters_by_user.get(email)) for email in users_to_query]
565+
responses = api.execute_batch(params, requests)
566+
responses_by_user = zip(users_to_query, responses)
567+
for user, response in responses_by_user:
568+
access_events = response.get('audit_event_overview_report_rows', [])
569+
records_accessed = records_accessed_by_user.get(user, {})
570+
records_accessed_new, filters = process_access_events(access_events, filter_recs=filters_by_user.get(user, {}).get('filter_recs'))
571+
for rec_uid, event in records_accessed_new.items():
572+
records_accessed.setdefault(rec_uid, event)
573+
records_accessed_by_user.update({user: records_accessed})
574+
filters_by_user.update({user: filters})
575+
return records_accessed_by_user
576+
554577
from keepercommander.sox.storage_types import StorageRecordAging
555578
from keepercommander.commands.aram import API_EVENT_SUMMARY_ROW_LIMIT
556579
from keepercommander.commands.enterprise import EnterpriseInfoCommand
@@ -577,12 +600,8 @@ def compile_report_data(rec_ids):
577600
aging_columns = ['created', 'last_modified', 'last_rotation'] if aging else []
578601
self.report_headers = default_columns + aging_columns
579602

580-
for name in usernames:
581-
vault_records = sox_data.get_vault_records(name)
582-
filter_by_recs = None if report_type == report_type_default else {r for r in vault_records}
583-
user_access_events = get_records_accessed(name, filter_by_recs)
584-
user_access_lookup.update(compile_user_report(name, user_access_events))
585-
603+
record_access_events = get_records_accessed(usernames, report_type != report_type_default)
604+
user_access_lookup = {user: compile_user_report(user, access_events) for user, access_events in record_access_events.items()}
586605
record_ids = {r for recs in user_access_lookup.values() for r in recs} if aging else {}
587606
compile_report_data(record_ids)
588607
return report_data

keepercommander/commands/record.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1045,8 +1045,8 @@ def execute(self, params, **kwargs):
10451045
folder_table = []
10461046

10471047
if shared_folders and len(shared_folders) > 0:
1048-
folders = shared_folders.get('folders') # type: Dict[str, dict]
1049-
records = shared_folders.get('records') # type: Dict[str, dict]
1048+
folders = shared_folders.get('folders', {}) # type: Dict[str, dict]
1049+
records = shared_folders.get('records', {}) # type: Dict[str, dict]
10501050
if verbose:
10511051
for rec in records.values():
10521052
folder_uid = rec.get('folder_uid')

0 commit comments

Comments
 (0)