Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DR-803 Update discovery-common wheel and job graph display. #1318

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 34 additions & 30 deletions keepercommander/commands/discover/job_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def job_detail(self, job):
pass

@staticmethod
def print_job_table(jobs, max_gateway_name):
def print_job_table(jobs, max_gateway_name, show_history=False):

print("")
print(f"{bcolors.HEADER}{'Job ID'.ljust(14, ' ')} "
Expand Down Expand Up @@ -98,17 +98,17 @@ def print_job_table(jobs, max_gateway_name):
f"{(job.get('duration') or 'NA').ljust(19, ' ')} "
f"{bcolors.ENDC}")

if len(completed_jobs) > 0:
if len(completed_jobs) > 0 and show_history is False:
print("")
if len(completed_jobs) == 1:
print(f"There is one {_g('COMPLETED')} job. To process, use the following command.")
else:
print(f"There are {len(completed_jobs)} {_g('COMPLETED')} jobs. "
"To process, use one of the the following command.")
"To process, use one of the the following commands.")
for job_id in completed_jobs:
print(_g(f" pam action discover process -j {job_id}"))

if len(running_jobs) > 0:
if len(running_jobs) > 0 and show_history is False:
print("")
if len(running_jobs) == 1:
print(f"There is one {_b('RUNNING')} job. "
Expand All @@ -119,7 +119,7 @@ def print_job_table(jobs, max_gateway_name):
for job_id in running_jobs:
print(_b(f" pam action discover remove -j {job_id}"))

if len(failed_jobs) > 0:
if len(failed_jobs) > 0 and show_history is False:
print("")
if len(failed_jobs) == 1:
print(f"There is one {_f('FAILED')} job. "
Expand Down Expand Up @@ -180,31 +180,35 @@ def print_job_detail(params, gateway_context, jobs, job_id):
job_item = job.get("job_item") # type: JobItem

try:
infra.load(sync_point=job_item.sync_point)
infra.load(sync_point=0)
print("")
delta = DiscoveryDelta.model_validate(job.get('delta'))
print(f"{_h('Added')} - {len(delta.added)} count")
for item in delta.added:
vertex = infra.dag.get_vertex(item.uid)
discovery_object = DiscoveryObject.get_discovery_object(vertex)
print(f" * {discovery_object.description}")

print("")
print(f"{_h('Changed')} - {len(delta.changed)} count")
for item in delta.changed:
vertex = infra.dag.get_vertex(item.uid)
discovery_object = DiscoveryObject.get_discovery_object(vertex)
print(f" * {discovery_object.description}")
if item.changes is None:
print(f" no changed, may be a object not added in prior discoveries.")
else:
for key, value in item.changes.items():
print(f" - {key} = {value}")

print("")
print(f"{_h('Deleted')} - {len(delta.deleted)} count")
for item in delta.deleted:
print(f" * discovery vertex {item.uid}")
delta_json = job.get('delta')
if delta_json is not None:
delta = DiscoveryDelta.model_validate(delta_json)
print(f"{_h('Added')} - {len(delta.added)} count")
for item in delta.added:
vertex = infra.dag.get_vertex(item.uid)
discovery_object = DiscoveryObject.get_discovery_object(vertex)
print(f" * {discovery_object.description}")

print("")
print(f"{_h('Changed')} - {len(delta.changed)} count")
for item in delta.changed:
vertex = infra.dag.get_vertex(item.uid)
discovery_object = DiscoveryObject.get_discovery_object(vertex)
print(f" * {discovery_object.description}")
if item.changes is None:
print(f" no changed, may be a object not added in prior discoveries.")
else:
for key, value in item.changes.items():
print(f" - {key} = {value}")

print("")
print(f"{_h('Deleted')} - {len(delta.deleted)} count")
for item in delta.deleted:
print(f" * discovery vertex {item.uid}")
else:
print(f"{_f('There are no available delta changes for this job.')}")

except Exception as err:
print(f"{_f('Could not load delta from infrastructure: ' + str(err))}")
Expand Down Expand Up @@ -300,4 +304,4 @@ def execute(self, params, **kwargs):
if job_id is not None and gateway_context is not None:
self.print_job_detail(params, gateway_context, all_jobs, job_id)
else:
self.print_job_table(all_jobs, max_gateway_name)
self.print_job_table(all_jobs, max_gateway_name, show_history)
125 changes: 121 additions & 4 deletions keepercommander/commands/pam_debug/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
from discovery_common.infrastructure import Infrastructure
from discovery_common.record_link import RecordLink
from discovery_common.user_service import UserService
from discovery_common.jobs import Jobs
from discovery_common.constants import (PAM_USER, PAM_DIRECTORY, PAM_MACHINE, PAM_DATABASE, VERTICES_SORT_MAP,
DIS_INFRA_GRAPH_ID, RECORD_LINK_GRAPH_ID, USER_SERVICE_GRAPH_ID)
DIS_INFRA_GRAPH_ID, RECORD_LINK_GRAPH_ID, USER_SERVICE_GRAPH_ID,
DIS_JOBS_GRAPH_ID)
from discovery_common.types import (DiscoveryObject, DiscoveryUser, DiscoveryDirectory, DiscoveryMachine,
DiscoveryDatabase)
DiscoveryDatabase, JobContent)
from discovery_common.dag_sort import sort_infra_vertices
from keeper_dag import DAG
from keeper_dag.connection.commander import Connection as CommanderConnection
Expand All @@ -32,7 +34,7 @@ class PAMDebugGraphCommand(PAMGatewayActionDiscoverCommandBase):
# The record to base everything on.
parser.add_argument('--gateway', '-g', required=True, dest='gateway', action='store',
help='Gateway name or UID.')
parser.add_argument('--type', '-t', required=True, choices=['infra', 'rl', 'service'],
parser.add_argument('--type', '-t', required=True, choices=['infra', 'rl', 'service', 'jobs'],
dest='graph_type', action='store', help='Graph type', default='infra')
parser.add_argument('--raw', required=False, dest='raw', action='store_true',
help='Render raw graph. Will render corrupt graphs.')
Expand All @@ -59,7 +61,8 @@ class PAMDebugGraphCommand(PAMGatewayActionDiscoverCommandBase):
graph_id_map = {
"infra": DIS_INFRA_GRAPH_ID,
"rl": RECORD_LINK_GRAPH_ID,
"service": USER_SERVICE_GRAPH_ID
"service": USER_SERVICE_GRAPH_ID,
"jobs": DIS_JOBS_GRAPH_ID
}

def get_parser(self):
Expand Down Expand Up @@ -283,6 +286,102 @@ def _handle(current_vertex: DAGVertex, parent_vertex: Optional[DAGVertex] = None

_handle(current_vertex=configuration, parent_vertex=None, indent=indent)

def _do_text_list_jobs(self, params: KeeperParams, gateway_context: GatewayContext, debug_level: int = 0,
indent: int = 0):

infra = Infrastructure(record=gateway_context.configuration, params=params, logger=logging,
debug_level=debug_level, fail_on_corrupt=False)
infra.load(sync_point=0)

pad = ""
if indent > 0:
pad = "".ljust(2 * indent, ' ') + "* "

conn = get_connection(params)
graph_sync = DAG(conn=conn, record=gateway_context.configuration, logger=logging, debug_level=debug_level,
graph_id=DIS_JOBS_GRAPH_ID)
graph_sync.load(0)
configuration = graph_sync.get_root
vertices = configuration.has_vertices()
if len(vertices) == 0:
print(self._f(f"The jobs graph has not been initialized. Only has root vertex."))
return

vertex = vertices[0]
if vertex.has_data is False:
print(self._f(f"The job vertex does not contain any data"))
return

current_json = vertex.content_as_str
if current_json is None:
print(self._f(f"The current job vertex content is None"))
return

content = JobContent.model_validate_json(current_json)
print(f"{pad}{self._b('Active Job ID')}: {content.active_job_id}")
print("")
print(f"{pad}{self._h('History')}")
print("")
for job in content.job_history:
print(f"{pad} --------------------------------------")
print(f"{pad} Job Id: {job.job_id}")
print(f"{pad} Started: {job.start_ts_str}")
print(f"{pad} Ended: {job.end_ts_str}")
print(f"{pad} Duration: {job.duration_sec_str}")
print(f"{pad} Infra Sync Point: {job.sync_point}")
if job.success is True:
print(f"{pad} Status: {self._gr('Success')}")
else:
print(f"{pad} Status: {self._f('Fail')}")
if job.error is not None:
print(f"{pad} Error: {self._gr(job.error)}")

print("")

if job.delta is None:
print(f"{pad}{self._f('The job is missing a delta, never finished discovery.')}")
else:
if len(job.delta.added) > 0:
print(f"{pad} {self._h('Added')}")
for added in job.delta.added:
vertex = infra.dag.get_vertex(added.uid)
if vertex is None:
print(f"{pad} * Vertex {added.uid} does not exists.")
else:
if vertex.active is False:
print(f"{pad} * Vertex {added.uid} is inactive.")
elif vertex.corrupt is True:
print(f"{pad} * Vertex {added.uid} is corrupt.")
else:
content = DiscoveryObject.get_discovery_object(vertex)
print(f"{pad} * {content.description}; Record UID: {content.record_uid}")
print("")

if len(job.delta.changed) > 0:
print(f"{pad} {self._h('Changed')}")
for changed in job.delta.changed:
vertex = infra.dag.get_vertex(changed.uid)
if vertex is None:
print(f"{pad} * Vertex {changed.uid} does not exists.")
else:
if vertex.active is False:
print(f"{pad} * Vertex {changed.uid} is inactive.")
elif vertex.corrupt is True:
print(f"{pad} * Vertex {changed.uid} is corrupt.")
else:
content = DiscoveryObject.get_discovery_object(vertex)
print(f"{pad} * {content.description}; Record UID: {content.record_uid}")
if changed.changes is not None:
for k, v in changed.changes.items():
print(f"{pad} {k} = {v}")
print("")

if len(job.delta.deleted) > 0:
print(f"{pad} {self._h('Deleted')}")
for deleted in job.delta.deleted:
print(f"{pad} * Removed vertex {deleted.uid}.")
print("")

def _do_render_infra(self, params: KeeperParams, gateway_context: GatewayContext, filepath: str, graph_format: str,
debug_level: int = 0):

Expand Down Expand Up @@ -352,6 +451,24 @@ def _do_render_service(self, params: KeeperParams, gateway_context: GatewayConte
raise err
print("")

def _do_render_jobs(self, params: KeeperParams, gateway_context: GatewayContext, filepath: str,
graph_format: str, debug_level: int = 0):

jobs = Jobs(record=gateway_context.configuration, params=params, logger=logging, debug_level=debug_level)

print("")
dot_instance = jobs.dag.to_dot()
if graph_format == "raw":
print(dot_instance)
else:
try:
dot_instance.render(filepath)
print(f"Job graph rendered to {self._gr(filepath)}")
except Exception as err:
print(self._f(f"Could not generate graph: {err}"))
raise err
print("")

def _do_raw_text_list(self, params: KeeperParams, gateway_context: GatewayContext, graph_id: int = 0,
debug_level: int = 0):

Expand Down
Binary file not shown.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ pydantic>=2.6.4
# pip uninstall discovery-common -y
# python3 setup.py wheel --whlsrc ~/src/discovery-common --libdir $PWD/libs --reqfiles $PWD/requirements.txt
# pip install $(ls libs/discovery_common-*)
./libs/discovery_common-1.0.18-py3-none-any.whl
./libs/discovery_common-1.0.19-py3-none-any.whl
Loading