Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions examples/sdk_examples/action_report/action_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""Enterprise Action Report SDK Example."""

import getpass
import sqlite3

from keepersdk.authentication import configuration, endpoint, keeper_auth, login_auth
from keepersdk.constants import KEEPER_PUBLIC_HOSTS
from keepersdk.enterprise import action_report, enterprise_loader, sqlite_enterprise_storage
from keepersdk.errors import KeeperApiError

TARGET_STATUS = 'no-logon'
DAYS_SINCE = 30


def login():
config = configuration.JsonConfigurationStorage()

if not config.get().last_server:
print("Available server options:")
for region, host in KEEPER_PUBLIC_HOSTS.items():
print(f" {region}: {host}")
server = input('Enter server (default: keepersecurity.com): ').strip() or 'keepersecurity.com'
config.get().last_server = server
else:
server = config.get().last_server

keeper_endpoint = endpoint.KeeperEndpoint(config, server)
login_auth_context = login_auth.LoginAuth(keeper_endpoint)
username = config.get().last_login or input('Enter username: ')

login_auth_context.resume_session = True
login_auth_context.login(username)

while not login_auth_context.login_step.is_final():
step = login_auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print("Device approval request sent. Approve this device and press Enter.")
input()
elif isinstance(step, login_auth.LoginStepPassword):
step.verify_password(getpass.getpass('Enter password: '))
elif isinstance(step, login_auth.LoginStepTwoFactor):
channel = step.get_channels()[0]
step.send_code(channel.channel_uid, getpass.getpass(f'Enter 2FA code for {channel.channel_name}: '))
else:
raise NotImplementedError(f"Unsupported login step: {type(step).__name__}")

if isinstance(login_auth_context.login_step, login_auth.LoginStepConnected):
return login_auth_context.login_step.take_keeper_auth()
return None


def print_report(entries, target_status, days_since):
action_text = (
'\tCOMMAND: NONE (No action specified)\n'
'\tSTATUS: n/a\n'
'\tSERVER MESSAGE: n/a\n'
'\tAFFECTED: 0'
)
status_display = target_status[0].upper() + target_status[1:]

print(f'\nAdmin Action Taken:\n{action_text}\n')
print('Note: the following reflects data prior to any administrative action being applied')
print(f'{len(entries)} User(s) With "{status_display}" Status Older Than {days_since} Day(s):\n')

if not entries:
return

headers = ['User ID', 'Email', 'Name', 'Status', 'Transfer Status', 'Node']
col_widths = [14, 31, 22, 8, 17, 19]

print(' '.join(f'{h:<{w}}' for h, w in zip(headers, col_widths)))
print(' '.join('-' * w for w in col_widths))

for entry in entries:
row = [
str(entry.enterprise_user_id), entry.email, entry.full_name,
entry.status, entry.transfer_status, entry.node_path
]
print(' '.join(f'{str(v)[:w]:<{w}}' for v, w in zip(row, col_widths)))


def generate_action_report(keeper_auth_context: keeper_auth.KeeperAuth):
if not keeper_auth_context.auth_context.is_enterprise_admin:
print("ERROR: This operation requires enterprise admin privileges.")
keeper_auth_context.close()
return

enterprise = None
try:
conn = sqlite3.Connection('file::memory:', uri=True)
enterprise_id = keeper_auth_context.auth_context.enterprise_id or 0
storage = sqlite_enterprise_storage.SqliteEnterpriseStorage(lambda: conn, enterprise_id)
enterprise = enterprise_loader.EnterpriseLoader(keeper_auth_context, storage)

config = action_report.ActionReportConfig(
target_user_status=TARGET_STATUS,
days_since=DAYS_SINCE
)
generator = action_report.ActionReportGenerator(
enterprise.enterprise_data, keeper_auth_context, loader=enterprise, config=config
)
entries = generator.generate_report()
print_report(entries, TARGET_STATUS, DAYS_SINCE)

except KeeperApiError as e:
print(f"\nAPI Error: {e}")
except Exception as e:
print(f"\nError: {e}")
finally:
if enterprise:
enterprise.close()
keeper_auth_context.close()


def main():
auth = login()
if auth:
generate_action_report(auth)
else:
print("Login failed.")


if __name__ == "__main__":
main()
123 changes: 123 additions & 0 deletions examples/sdk_examples/aging_report/aging_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""Example: Password aging report using Keeper SDK."""

import datetime
import getpass
import sqlite3
import sys
import traceback

from keepersdk.authentication import login_auth, configuration, endpoint, keeper_auth
from keepersdk.enterprise import enterprise_loader, sqlite_enterprise_storage, aging_report
from keepersdk.errors import KeeperApiError


TABLE_WIDTH = 140
COL_WIDTHS = (30, 30, 25, 10, 45)
HEADERS = ['Owner', 'Title', 'Password Changed', 'Shared', 'Record URL']


def login():
config = configuration.JsonConfigurationStorage()
server = config.get().last_server or 'keepersecurity.com'

keeper_endpoint = endpoint.KeeperEndpoint(config, server)
auth_context = login_auth.LoginAuth(keeper_endpoint)
auth_context.resume_session = True

username = config.get().last_login
if not username:
print("Error: No saved login found. Please run with interactive login first.")
return None, None

auth_context.login(username)

while not auth_context.login_step.is_final():
step = auth_context.login_step
if isinstance(step, login_auth.LoginStepDeviceApproval):
step.send_push(login_auth.DeviceApprovalChannel.KeeperPush)
print("Device approval required. Approve and press Enter.")
input()
elif isinstance(step, login_auth.LoginStepPassword):
step.verify_password(getpass.getpass('Enter password: '))
elif isinstance(step, login_auth.LoginStepTwoFactor):
channel = step.get_channels()[0]
code = getpass.getpass(f'Enter 2FA code for {channel.channel_name}: ')
step.send_code(channel.channel_uid, code)
else:
raise NotImplementedError(f"Unsupported login step: {type(step).__name__}")

if isinstance(auth_context.login_step, login_auth.LoginStepConnected):
return auth_context.login_step.take_keeper_auth(), server
return None, None


def format_row(values):
return ' '.join(
f"{str(val or '')[:w-1]:<{w}}"
for val, w in zip(values, COL_WIDTHS + (20,) * (len(values) - len(COL_WIDTHS)))
)


def print_report(rows, title):
print(f"\n{title}")
print('=' * TABLE_WIDTH)
print(format_row(HEADERS))
print('-' * TABLE_WIDTH)

for row in rows:
display_row = list(row)
display_row[3] = 'True' if display_row[3] else 'False'
print(format_row(display_row))

print('=' * TABLE_WIDTH)
print(f"\nFound {len(rows)} record(s) with aging passwords")


def generate_report(auth: keeper_auth.KeeperAuth, server: str):
if not auth.auth_context.is_enterprise_admin:
print("ERROR: This operation requires enterprise admin privileges.")
return 1

enterprise = None
try:
conn = sqlite3.Connection('file::memory:', uri=True)
enterprise_id = auth.auth_context.enterprise_id or 0
storage = sqlite_enterprise_storage.SqliteEnterpriseStorage(lambda: conn, enterprise_id)
enterprise = enterprise_loader.EnterpriseLoader(auth, storage)

print('\nThe default password aging period is 3 months\n')
print('Loading record password change information...')

config = aging_report.AgingReportConfig(server=server)
generator = aging_report.AgingReportGenerator(enterprise.enterprise_data, auth, config)
rows = list(generator.generate_report_rows())

cutoff_dt = datetime.datetime.now() - datetime.timedelta(days=aging_report.DEFAULT_PERIOD_DAYS)
title = f'Aging Report: Records With Passwords Last Modified Before {cutoff_dt.strftime("%Y/%m/%d %H:%M:%S")}'

print_report(rows, title)
return 0

except KeeperApiError as e:
print(f"API Error: {e}")
return 1
except Exception as e:
print(f"Error generating aging report: {e}")
traceback.print_exc()
return 1
finally:
if enterprise:
enterprise.close()
auth.close()


def main():
auth, server = login()
if not auth:
print("Login failed. Unable to generate aging report.")
return 1
return generate_report(auth, server)


if __name__ == "__main__":
sys.exit(main() or 0)
Loading