22
22
THE SOFTWARE.
23
23
"""
24
24
25
+ import http .client as http_client
25
26
import logging
26
27
import os
27
28
import sys
28
29
import time
30
+ import uuid
29
31
from json .decoder import JSONDecodeError
32
+
30
33
import requests
31
- import uuid
32
- import http .client as http_client
33
34
import urllib3
34
-
35
35
from pypac import PACSession
36
36
from pypac .parser import PACFile
37
37
from urllib3 .exceptions import InsecureRequestWarning
38
38
39
- from .scanossbase import ScanossBase
40
39
from . import __version__
41
-
40
+ from .constants import DEFAULT_TIMEOUT , MIN_TIMEOUT
41
+ from .scanossbase import ScanossBase
42
42
43
43
DEFAULT_URL = 'https://api.osskb.org/scan/direct' # default free service URL
44
44
DEFAULT_URL2 = 'https://api.scanoss.com/scan/direct' # default premium service URL
@@ -52,7 +52,7 @@ class ScanossApi(ScanossBase):
52
52
Currently support posting scan requests to the SCANOSS streaming API
53
53
"""
54
54
55
- def __init__ ( # noqa: PLR0913, PLR0915
55
+ def __init__ ( # noqa: PLR0912, PLR0913, PLR0915
56
56
self ,
57
57
scan_format : str = None ,
58
58
flags : str = None ,
@@ -61,7 +61,7 @@ def __init__( # noqa: PLR0913, PLR0915
61
61
debug : bool = False ,
62
62
trace : bool = False ,
63
63
quiet : bool = False ,
64
- timeout : int = 180 ,
64
+ timeout : int = DEFAULT_TIMEOUT ,
65
65
ver_details : str = None ,
66
66
ignore_cert_errors : bool = False ,
67
67
proxy : str = None ,
@@ -87,30 +87,28 @@ def __init__( # noqa: PLR0913, PLR0915
87
87
HTTPS_PROXY='http://<ip>:<port>'
88
88
"""
89
89
super ().__init__ (debug , trace , quiet )
90
- self .url = url
91
- self .api_key = api_key
92
90
self .sbom = None
93
91
self .scan_format = scan_format if scan_format else 'plain'
94
92
self .flags = flags
95
- self .timeout = timeout if timeout > 5 else 180
93
+ self .timeout = timeout if timeout > MIN_TIMEOUT else DEFAULT_TIMEOUT
96
94
self .retry_limit = retry if retry >= 0 else 5
97
95
self .ignore_cert_errors = ignore_cert_errors
98
96
self .req_headers = req_headers if req_headers else {}
99
97
self .headers = {}
100
-
98
+ # Set the correct URL/API key combination
99
+ self .url = url if url else SCANOSS_SCAN_URL
100
+ self .api_key = api_key if api_key else SCANOSS_API_KEY
101
+ if self .api_key and not url and not os .environ .get ('SCANOSS_SCAN_URL' ):
102
+ self .url = DEFAULT_URL2 # API key specific and no alternative URL, so use the default premium
101
103
if ver_details :
102
104
self .headers ['x-scanoss-client' ] = ver_details
103
105
if self .api_key :
104
106
self .headers ['X-Session' ] = self .api_key
105
107
self .headers ['x-api-key' ] = self .api_key
106
- self .headers ['User-Agent' ] = f'scanoss-py/{ __version__ } '
107
- self .headers ['user-agent' ] = f'scanoss-py/{ __version__ } '
108
- self .load_generic_headers ()
109
-
110
- self .url = url if url else SCANOSS_SCAN_URL
111
- self .api_key = api_key if api_key else SCANOSS_API_KEY
112
- if self .api_key and not url and not os .environ .get ('SCANOSS_SCAN_URL' ):
113
- self .url = DEFAULT_URL2 # API key specific and no alternative URL, so use the default premium
108
+ user_agent = f'scanoss-py/{ __version__ } '
109
+ self .headers ['User-Agent' ] = user_agent
110
+ self .headers ['user-agent' ] = user_agent
111
+ self .load_generic_headers (url )
114
112
115
113
if self .trace :
116
114
logging .basicConfig (stream = sys .stderr , level = logging .DEBUG )
@@ -133,7 +131,7 @@ def __init__( # noqa: PLR0913, PLR0915
133
131
if self .proxies :
134
132
self .session .proxies = self .proxies
135
133
136
- def scan (self , wfp : str , context : str = None , scan_id : int = None ):
134
+ def scan (self , wfp : str , context : str = None , scan_id : int = None ): # noqa: PLR0912, PLR0915
137
135
"""
138
136
Scan the specified WFP and return the JSON object
139
137
:param wfp: WFP to scan
@@ -192,7 +190,7 @@ def scan(self, wfp: str, context: str = None, scan_id: int = None):
192
190
else :
193
191
self .print_stderr (f'Warning: No response received from { self .url } . Retrying...' )
194
192
time .sleep (5 )
195
- elif r .status_code == 503 : # Service limits have most likely been reached
193
+ elif r .status_code == requests . codes . service_unavailable : # Service limits most likely reached
196
194
self .print_stderr (
197
195
f'ERROR: SCANOSS API rejected the scan request ({ request_id } ) due to '
198
196
f'service limits being exceeded'
@@ -202,7 +200,7 @@ def scan(self, wfp: str, context: str = None, scan_id: int = None):
202
200
f'ERROR: { r .status_code } - The SCANOSS API request ({ request_id } ) rejected '
203
201
f'for { self .url } due to service limits being exceeded.'
204
202
)
205
- elif r .status_code >= 400 :
203
+ elif r .status_code >= requests . codes . bad_request :
206
204
if retry > self .retry_limit : # No response retry_limit or more times, fail
207
205
self .save_bad_req_wfp (scan_files , request_id , scan_id )
208
206
raise Exception (
@@ -269,7 +267,7 @@ def set_sbom(self, sbom):
269
267
self .sbom = sbom
270
268
return self
271
269
272
- def load_generic_headers (self ):
270
+ def load_generic_headers (self , url ):
273
271
"""
274
272
Adds custom headers from req_headers to the headers collection.
275
273
@@ -279,7 +277,7 @@ def load_generic_headers(self):
279
277
if self .req_headers : # Load generic headers
280
278
for key , value in self .req_headers .items ():
281
279
if key == 'x-api-key' : # Set premium URL if x-api-key header is set
282
- if not self . url and not os .environ .get ('SCANOSS_SCAN_URL' ):
280
+ if not url and not os .environ .get ('SCANOSS_SCAN_URL' ):
283
281
self .url = DEFAULT_URL2 # API key specific and no alternative URL, so use the default premium
284
282
self .api_key = value
285
283
self .headers [key ] = value
0 commit comments