Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 167 additions & 35 deletions keepercommander/commands/aram.py
Original file line number Diff line number Diff line change
Expand Up @@ -1176,6 +1176,152 @@ def resolve_node_lookup(self, params, lookup_type, node_id, field):
def get_parser(self):
return audit_report_parser

@staticmethod
def fetch_audit_events(params, audit_filter, columns=None, aggregate=None, report_type='span', limit=None, order=None):
# type: (KeeperParams, Dict[str, Any], Optional[List[str]], Optional[List[str]], str, Optional[int], Optional[str]) -> List[Dict[str, Any]]
"""
Fetch audit events with pagination support.

This method handles pagination for both raw and span/consolidated reports,
making it suitable for enterprises with large numbers of users/events.

Args:
params: KeeperParams object
audit_filter: dict with filter criteria (audit_event_type, created, username, etc.)
columns: list of columns for non-raw reports (default: ['username'])
aggregate: list of aggregates for non-raw reports (default: ['last_created'] for span)
report_type: 'raw', 'span', 'hour', 'day', 'week', 'month' (default: 'span')
limit: maximum number of events to return (None for all)
order: 'ascending' or 'descending' (default: None, uses API default)

Returns:
list of event dictionaries
"""
rq = {
'command': 'get_audit_event_reports',
'report_type': report_type,
'scope': 'enterprise' if params.enterprise else 'user'
}

if columns:
rq['columns'] = columns
elif report_type != 'raw':
rq['columns'] = ['username']

if aggregate:
rq['aggregate'] = aggregate
elif report_type == 'span':
rq['aggregate'] = ['last_created']

api_row_limit = API_EVENT_SUMMARY_ROW_LIMIT if report_type != 'raw' else API_EVENT_RAW_ROW_LIMIT
rq_limit = api_row_limit if limit is None else min(limit, api_row_limit) if limit > 0 else api_row_limit
rq['limit'] = rq_limit

if order:
rq['order'] = order

if audit_filter:
rq['filter'] = copy.deepcopy(audit_filter)

events = []
reqs = [rq]
max_iterations = 1000
iteration_count = 0

while reqs and iteration_count < max_iterations:
iteration_count += 1
rss = api.execute_batch(params, reqs)
next_reqs = []
should_stop = False

for idx, rs in enumerate(rss):
batch_events = rs.get('audit_event_overview_report_rows', [])
if not batch_events or not isinstance(batch_events, list):
continue

events.extend(batch_events)

# Check if user limit reached
if limit is not None and limit > 0 and len(events) >= limit:
events = events[:limit]
should_stop = True
break

# Check if we need to paginate
batch_limit = api_row_limit if report_type != 'raw' else API_EVENT_RAW_ROW_LIMIT
if len(batch_events) >= batch_limit:
if idx >= len(reqs):
continue

current_req = reqs[idx]

# Determine timestamp field based on report type
timestamp_field = None
if report_type == 'span':
if batch_events and 'last_created' in batch_events[-1]:
timestamp_field = 'last_created'
elif batch_events and 'first_created' in batch_events[-1]:
timestamp_field = 'first_created'
elif batch_events and 'created' in batch_events[-1]:
timestamp_field = 'created'
else:
timestamp_field = 'created'

if timestamp_field and batch_events and timestamp_field in batch_events[-1]:
try:
asc = current_req.get('order') == 'ascending'
first_key, last_key = ('min', 'max') if asc else ('max', 'min')
req_filter = copy.deepcopy(current_req.get('filter', {}))
req_period = req_filter.get('created', {})

timestamp_value = batch_events[-1][timestamp_field]
period = {first_key: int(timestamp_value) if isinstance(timestamp_value, (int, float, str)) else timestamp_value}

if not isinstance(req_period, dict) or req_period.get(last_key) is None:
# Get the boundary timestamp
last_rq = {**current_req}
reverse = 'descending' if asc else 'ascending'
last_rq['order'] = reverse
last_rq['limit'] = 1
rs_last = api.communicate(params, last_rq)
last_row_events = rs_last.get('audit_event_overview_report_rows')
if last_row_events:
last_row = last_row_events[0]
last_timestamp = last_row.get(timestamp_field, last_row.get('created', 0))
period[last_key] = int(last_timestamp) if isinstance(last_timestamp, (int, float, str)) else last_timestamp
else:
period[last_key] = req_period.get(last_key)

req_filter['created'] = period
next_req = {**current_req}
next_req['filter'] = req_filter

if limit is not None and limit > 0:
missing = limit - len(events)
if missing <= 0:
should_stop = True
break
elif missing < batch_limit:
next_req['limit'] = missing

next_reqs.append(next_req)
except (ValueError, TypeError, KeyError) as e:
logging.warning(f"Error processing pagination timestamp: {e}")
continue

if should_stop:
break

if should_stop:
break

reqs = next_reqs

if iteration_count >= max_iterations:
logging.warning(f"Pagination stopped after reaching maximum iterations ({max_iterations})")

return events

@staticmethod
def convert_value(field, value, **kwargs):
if not value:
Expand Down Expand Up @@ -2036,46 +2182,32 @@ def get_parser(self): # type: () -> Optional[argparse.ArgumentParser]
return action_report_parser

def execute(self, params, **kwargs):
def cmd_rq(cmd):
return {'command': cmd, 'scope': 'enterprise'}

def report_rq(query_filter, limit, cols=None, report_type='span'):
rq = {
**cmd_rq('get_audit_event_reports'),
'report_type': report_type,
'filter': query_filter,
'limit': limit
}

if report_type == 'span':
rq['columns'] = ['username'] if cols is None else cols
rq['aggregate'] = ['last_created']

return rq

def get_excluded(candidate_usernames, query_filter, username_field='username'):
# type: (Set[str], Dict[str, Any], Optional[str]) -> Set[str]
excluded = set()
req_limit = API_EVENT_SUMMARY_ROW_LIMIT
"""
Get usernames that should be excluded (users who HAVE performed the action).

Uses AuditReportCommand.fetch_audit_events for proper pagination support,
which handles enterprises with more than 2000 users correctly.
"""
if not candidate_usernames:
return set()

cols = [username_field]

def adjust_filter(q_filter, max_ts=0):
if max_ts:
q_filter['created']['max'] = max_ts
return q_filter
# Use the centralized fetch_audit_events method with pagination support
events = AuditReportCommand.fetch_audit_events(
params,
audit_filter=query_filter,
columns=cols,
aggregate=['last_created'],
report_type='span',
limit=None # Get all matching events
)

done = not candidate_usernames
while not done:
rq = report_rq(query_filter, req_limit, cols, report_type='span')
rs = api.communicate(params, rq)
events = rs['audit_event_overview_report_rows']
to_exclude = {event.get(username_field, '').lower() for event in events}
excluded.update(to_exclude.intersection(candidate_usernames))
end = int(events[-1]['last_created']) if events else 0
done = (len(events) < req_limit
or len(candidate_usernames) == len(excluded)
or query_filter.get('created', {}).get('min', end) >= end)
query_filter = adjust_filter(query_filter, end + 1) if not done else None
# Extract usernames from events that match the candidate usernames
to_exclude = {event.get(username_field, '').lower() for event in events}
excluded = to_exclude.intersection(candidate_usernames)

return excluded

Expand Down