5
5
from cycode .cli import consts
6
6
from cycode .cli .models import Document
7
7
from cycode .cli .utils .progress_bar import ScanProgressBarSection
8
+ from cycode .cyclient import logger
8
9
9
10
if TYPE_CHECKING :
10
11
from cycode .cli .models import CliError , LocalScanResult
11
12
from cycode .cli .utils .progress_bar import BaseProgressBar
12
13
13
14
15
+ def _get_max_batch_size (scan_type : str ) -> int :
16
+ logger .debug (
17
+ 'You can customize the batch size by setting the environment variable "%s"' ,
18
+ consts .SCAN_BATCH_MAX_SIZE_IN_BYTES_ENV_VAR_NAME ,
19
+ )
20
+
21
+ custom_size = os .environ .get (consts .SCAN_BATCH_MAX_SIZE_IN_BYTES_ENV_VAR_NAME )
22
+ if custom_size :
23
+ logger .debug ('Custom batch size is set, %s' , {'custom_size' : custom_size })
24
+ return int (custom_size )
25
+
26
+ return consts .SCAN_BATCH_MAX_SIZE_IN_BYTES .get (scan_type , consts .DEFAULT_SCAN_BATCH_MAX_SIZE_IN_BYTES )
27
+
28
+
29
+ def _get_max_batch_files_count (_ : str ) -> int :
30
+ logger .debug (
31
+ 'You can customize the batch files count by setting the environment variable "%s"' ,
32
+ consts .SCAN_BATCH_MAX_FILES_COUNT_ENV_VAR_NAME ,
33
+ )
34
+
35
+ custom_files_count = os .environ .get (consts .SCAN_BATCH_MAX_FILES_COUNT_ENV_VAR_NAME )
36
+ if custom_files_count :
37
+ logger .debug ('Custom batch files count is set, %s' , {'custom_files_count' : custom_files_count })
38
+ return int (custom_files_count )
39
+
40
+ return consts .DEFAULT_SCAN_BATCH_MAX_FILES_COUNT
41
+
42
+
14
43
def split_documents_into_batches (
44
+ scan_type : str ,
15
45
documents : List [Document ],
16
- max_size : int = consts .DEFAULT_SCAN_BATCH_MAX_SIZE_IN_BYTES ,
17
- max_files_count : int = consts .DEFAULT_SCAN_BATCH_MAX_FILES_COUNT ,
18
46
) -> List [List [Document ]]:
47
+ max_size = _get_max_batch_size (scan_type )
48
+ max_files_count = _get_max_batch_files_count (scan_type )
49
+
50
+ logger .debug (
51
+ 'Splitting documents into batches, %s' ,
52
+ {'document_count' : len (documents ), 'max_batch_size' : max_size , 'max_files_count' : max_files_count },
53
+ )
54
+
19
55
batches = []
20
56
21
57
current_size = 0
22
58
current_batch = []
23
59
for document in documents :
24
60
document_size = len (document .content .encode ('UTF-8' ))
25
61
26
- if (current_size + document_size > max_size ) or (len (current_batch ) >= max_files_count ):
62
+ exceeds_max_size = current_size + document_size > max_size
63
+ if exceeds_max_size :
64
+ logger .debug (
65
+ 'Going to create new batch because current batch size exceeds the limit, %s' ,
66
+ {
67
+ 'batch_index' : len (batches ),
68
+ 'current_batch_size' : current_size + document_size ,
69
+ 'max_batch_size' : max_size ,
70
+ },
71
+ )
72
+
73
+ exceeds_max_files_count = len (current_batch ) >= max_files_count
74
+ if exceeds_max_files_count :
75
+ logger .debug (
76
+ 'Going to create new batch because current batch files count exceeds the limit, %s' ,
77
+ {
78
+ 'batch_index' : len (batches ),
79
+ 'current_batch_files_count' : len (current_batch ),
80
+ 'max_batch_files_count' : max_files_count ,
81
+ },
82
+ )
83
+
84
+ if exceeds_max_size or exceeds_max_files_count :
27
85
batches .append (current_batch )
28
86
29
87
current_batch = [document ]
@@ -35,6 +93,8 @@ def split_documents_into_batches(
35
93
if current_batch :
36
94
batches .append (current_batch )
37
95
96
+ logger .debug ('Documents were split into batches %s' , {'batches_count' : len (batches )})
97
+
38
98
return batches
39
99
40
100
@@ -49,9 +109,8 @@ def run_parallel_batched_scan(
49
109
documents : List [Document ],
50
110
progress_bar : 'BaseProgressBar' ,
51
111
) -> Tuple [Dict [str , 'CliError' ], List ['LocalScanResult' ]]:
52
- max_size = consts .SCAN_BATCH_MAX_SIZE_IN_BYTES .get (scan_type , consts .DEFAULT_SCAN_BATCH_MAX_SIZE_IN_BYTES )
53
-
54
- batches = [documents ] if scan_type == consts .SCA_SCAN_TYPE else split_documents_into_batches (documents , max_size )
112
+ # batching is disabled for SCA; requested by Mor
113
+ batches = [documents ] if scan_type == consts .SCA_SCAN_TYPE else split_documents_into_batches (scan_type , documents )
55
114
56
115
progress_bar .set_section_length (ScanProgressBarSection .SCAN , len (batches )) # * 3
57
116
# TODO(MarshalX): we should multiply the count of batches in SCAN section because each batch has 3 steps:
@@ -61,9 +120,13 @@ def run_parallel_batched_scan(
61
120
# it's not possible yet because not all scan types moved to polling mechanism
62
121
# the progress bar could be significant improved (be more dynamic) in the future
63
122
123
+ threads_count = _get_threads_count ()
64
124
local_scan_results : List ['LocalScanResult' ] = []
65
125
cli_errors : Dict [str , 'CliError' ] = {}
66
- with ThreadPool (processes = _get_threads_count ()) as pool :
126
+
127
+ logger .debug ('Running parallel batched scan, %s' , {'threads_count' : threads_count , 'batches_count' : len (batches )})
128
+
129
+ with ThreadPool (processes = threads_count ) as pool :
67
130
for scan_id , err , result in pool .imap (scan_function , batches ):
68
131
if result :
69
132
local_scan_results .append (result )
0 commit comments