25
25
import os
26
26
import sys
27
27
import datetime
28
+ from typing import Any , Dict , List , Optional
28
29
import importlib_resources
29
30
30
31
from progress .bar import Bar
@@ -490,66 +491,41 @@ def __run_scan_threaded(self, scan_started: bool, file_count: int) -> bool:
490
491
success = False
491
492
return success
492
493
493
- def __finish_scan_threaded (self , file_map : dict = None ) -> bool :
494
- """
495
- Wait for the threaded scans to complete
496
- :param file_map: mapping of obfuscated files back into originals
497
- :return: True if successful, False otherwise
494
+ def __finish_scan_threaded (self , file_map : Optional [Dict [Any , Any ]] = None ) -> bool :
495
+ """Wait for the threaded scan to complete and process the results
496
+
497
+ Args:
498
+ file_map: Mapping of obfuscated files back to originals
499
+
500
+ Returns:
501
+ bool: True if successful, False otherwise
502
+
503
+ Raises:
504
+ ValueError: If output format is invalid
498
505
"""
499
- success = True
500
- responses = None
506
+ success : bool = True
507
+ scan_responses = None
501
508
dep_responses = None
502
509
if self .is_file_or_snippet_scan ():
503
510
if not self .threaded_scan .complete (): # Wait for the scans to complete
504
511
self .print_stderr (f'Warning: Scanning analysis ran into some trouble.' )
505
512
success = False
506
513
self .threaded_scan .complete_bar ()
507
- responses = self .threaded_scan .responses
514
+ scan_responses = self .threaded_scan .responses
508
515
if self .is_dependency_scan ():
509
516
self .print_msg ('Retrieving dependency data...' )
510
517
if not self .threaded_deps .complete ():
511
- self .print_stderr (f'Warning: Dependency analysis ran into some trouble.' )
518
+ self .print_stderr (
519
+ f'Warning: Dependency analysis ran into some trouble.'
520
+ )
512
521
success = False
513
522
dep_responses = self .threaded_deps .responses
514
- # self.print_stderr(f'Dep Data: {dep_responses}')
515
- # TODO change to dictionary
516
- raw_output = "{\n "
517
- # TODO look into merging the two dictionaries. See https://favtutor.com/blogs/merge-dictionaries-python
518
- if responses or dep_responses :
519
- first = True
520
- if responses :
521
- for scan_resp in responses :
522
- if scan_resp is not None :
523
- for key , value in scan_resp .items ():
524
- if file_map : # We have a map for obfuscated files. Check if we can revert it
525
- fm = file_map .get (key )
526
- if fm :
527
- key = fm # Replace the obfuscated filename
528
- if first :
529
- raw_output += " \" %s\" :%s" % (key , json .dumps (value , indent = 2 ))
530
- first = False
531
- else :
532
- raw_output += ",\n \" %s\" :%s" % (key , json .dumps (value , indent = 2 ))
533
- # End for loop
534
- if dep_responses :
535
- dep_files = dep_responses .get ("files" )
536
- if dep_files and len (dep_files ) > 0 :
537
- for dep_file in dep_files :
538
- file = dep_file .pop ("file" , None )
539
- if file is not None :
540
- if first :
541
- raw_output += " \" %s\" :[%s]" % (file , json .dumps (dep_file , indent = 2 ))
542
- first = False
543
- else :
544
- raw_output += ",\n \" %s\" :[%s]" % (file , json .dumps (dep_file , indent = 2 ))
545
- # End for loop
546
- raw_output += "\n }"
547
- try :
548
- raw_results = json .loads (raw_output )
549
- except Exception as e :
550
- raise Exception (f'ERROR: Problem decoding parsed json: { e } ' )
551
523
552
- results = self .post_processor .load_results (raw_results ).post_process ()
524
+ raw_scan_results = self ._merge_scan_results (
525
+ scan_responses , dep_responses , file_map
526
+ )
527
+
528
+ results = self .post_processor .load_results (raw_scan_results ).post_process ()
553
529
554
530
if self .output_format == 'plain' :
555
531
self .__log_result (json .dumps (results , indent = 2 , sort_keys = True ))
@@ -567,6 +543,42 @@ def __finish_scan_threaded(self, file_map: dict = None) -> bool:
567
543
success = False
568
544
return success
569
545
546
+ def _merge_scan_results (
547
+ self ,
548
+ scan_responses : Optional [List ],
549
+ dep_responses : Optional [Dict [str ,Any ]],
550
+ file_map : Optional [Dict [str , Any ]],
551
+ ) -> Dict [str , Any ]:
552
+ """Merge scan and dependency responses into a single dictionary"""
553
+ results : Dict [str , Any ] = {}
554
+
555
+ if scan_responses :
556
+ for response in scan_responses :
557
+ if response is not None :
558
+ if file_map :
559
+ response = self ._deobfuscate_filenames (response , file_map )
560
+ results .update (response )
561
+
562
+ dep_files = dep_responses .get ("files" , None ) if dep_responses else None
563
+ if dep_files :
564
+ for dep_file in dep_files :
565
+ file = dep_file .pop ("file" , None )
566
+ if file :
567
+ results [file ] = dep_file
568
+
569
+ return results
570
+
571
+ def _deobfuscate_filenames (self , response : dict , file_map : dict ) -> dict :
572
+ """Convert obfuscated filenames back to original names"""
573
+ deobfuscated = {}
574
+ for key , value in response .items ():
575
+ deobfuscated_name = file_map .get (key , None )
576
+ if deobfuscated_name :
577
+ deobfuscated [deobfuscated_name ] = value
578
+ else :
579
+ deobfuscated [key ] = value
580
+ return deobfuscated
581
+
570
582
def scan_file_with_options (self , file : str , deps_file : str = None , file_map : dict = None , dep_scope : SCOPE = None ,
571
583
dep_scope_include : str = None , dep_scope_exclude : str = None ) -> bool :
572
584
"""
0 commit comments