6
6
from glob import glob
7
7
from pathlib import PurePath
8
8
from urllib .parse import urlencode
9
+ from socketdev import socketdev
10
+ from typing import Tuple , Dict
9
11
10
12
import requests
11
13
23
25
from socketsecurity .core .licenses import Licenses
24
26
25
27
from .cli_client import CliClient
26
- from .config import SocketConfig
28
+ from .socket_config import SocketConfig
27
29
from .utils import socket_globs
30
+ from socketdev .org import Orgs , Organization
28
31
29
32
__all__ = [
30
33
"Core" ,
@@ -139,12 +142,11 @@ def do_request(
139
142
140
143
class Core :
141
144
142
- client : CliClient
143
145
config : SocketConfig
144
-
145
- def __init__ (self , config : SocketConfig , client : CliClient ):
146
+ sdk : socketdev
147
+ def __init__ (self , config : SocketConfig , sdk : socketdev ):
146
148
self .config = config
147
- self .client = client
149
+ self .sdk = sdk
148
150
self .set_org_vars ()
149
151
150
152
@@ -165,73 +167,38 @@ def set_org_vars(self) -> None:
165
167
# Get security policy AFTER org_id is updated
166
168
self .config .security_policy = self .get_security_policy ()
167
169
168
- def get_org_id_slug (self ) -> tuple [str , str ]:
170
+ def get_org_id_slug (self ) -> Tuple [str , str ]:
169
171
"""Gets the Org ID and Org Slug for the API Token"""
170
- path = "organizations"
171
- response = self .client .request (path )
172
- data = response .json ()
173
- organizations = data .get ("organizations" )
174
- new_org_id = None
175
- new_org_slug = None
172
+ response = self .sdk .org .get ()
173
+ organizations : Dict [str , Organization ] = response .get ("organizations" , {})
174
+
176
175
if len (organizations ) == 1 :
177
- for key in organizations :
178
- new_org_id = key
179
- new_org_slug = organizations [key ].get ('slug' )
180
- return new_org_id , new_org_slug
176
+ org_id = next (iter (organizations )) # More Pythonic way to get first key
177
+ return org_id , organizations [org_id ]['slug' ]
178
+ return None , None
181
179
182
180
def get_sbom_data (self , full_scan_id : str ) -> list :
183
181
"""
184
182
Return the list of SBOM artifacts for a full scan
185
183
"""
186
- path = f"orgs/{ self .config .org_slug } /full-scans/{ full_scan_id } "
187
- response = self .client .request (path )
188
- results = []
189
184
190
- if response .status_code != 200 :
185
+ response = self .sdk .fullscans .stream (self .config .org_slug , full_scan_id )
186
+ if (response .get ("success" , False ) == False ):
191
187
log .debug (f"Failed to get SBOM data for full-scan { full_scan_id } " )
192
- log .debug (response .text )
188
+ log .debug (response .get ( "message" , "No message" ) )
193
189
return []
194
- data = response .text
195
- data .strip ('"' )
196
- data .strip ()
197
- for line in data .split ("\n " ):
198
- if line != '"' and line != "" and line is not None :
199
- item = json .loads (line )
200
- results .append (item )
201
190
202
- return results
191
+ response .pop ("success" , None )
192
+ response .pop ("status" , None )
193
+ return response
203
194
204
195
def get_security_policy (self ) -> dict :
205
- """Get the Security policy and determine the effective Org security policy"""
206
- payload = [{"organization" : self .config .org_id }]
207
-
208
- response = self .client .request (
209
- path = "settings" ,
210
- method = "POST" ,
211
- payload = json .dumps (payload )
212
- )
196
+ """Get the Security policy"""
213
197
214
- data = response .json ()
215
- defaults = data .get ("defaults" , {})
216
- default_rules = defaults .get ("issueRules" , {})
217
- entries = data .get ("entries" , [])
198
+ response = self .sdk .settings .get (self .config .org_slug )
218
199
219
- org_rules = {}
220
-
221
- # Get organization-specific rules
222
- for org_set in entries :
223
- settings = org_set .get ("settings" )
224
- if settings :
225
- org_details = settings .get ("organization" , {})
226
- org_rules .update (org_details .get ("issueRules" , {}))
227
-
228
- # Apply default rules where no org-specific rule exists
229
- for default in default_rules :
230
- if default not in org_rules :
231
- action = default_rules [default ]["action" ]
232
- org_rules [default ] = {"action" : action }
233
-
234
- return org_rules
200
+ data = response .get ("securityPolicyRules" , {})
201
+ return data
235
202
236
203
@staticmethod
237
204
def get_manifest_files (package : Package , packages : dict ) -> str :
@@ -254,11 +221,15 @@ def get_manifest_files(package: Package, packages: dict) -> str:
254
221
return manifest_files
255
222
256
223
def create_sbom_output (self , diff : Diff ) -> dict :
257
- base_path = f"orgs/{ self .config .org_slug } /export/cdx"
258
- path = f"{ base_path } /{ diff .id } "
259
- result = self .client .request (path = path )
260
224
try :
261
- sbom = result .json ()
225
+ result = self .sdk .export .cdx_bom (self .config .org_slug , diff .id )
226
+ if (result .get ("success" , False ) == False ):
227
+ log .error (f"Failed to get CycloneDX Output for full-scan { diff .id } " )
228
+ log .error (result .get ("message" , "No message" ))
229
+ return {}
230
+
231
+ result .pop ("success" , None )
232
+ return result
262
233
except Exception as error :
263
234
log .error (f"Unable to get CycloneDX Output for { diff .id } " )
264
235
log .error (error )
@@ -330,37 +301,18 @@ def create_full_scan(self, files: list, params: FullScanParams, workspace: str)
330
301
:param workspace: str - Path of workspace
331
302
:return:
332
303
"""
333
- send_files = []
304
+
334
305
create_full_start = time .time ()
335
306
log .debug ("Creating new full scan" )
336
- for file in files :
337
- if platform .system () == "Windows" :
338
- file = file .replace ("\\ " , "/" )
339
- if "/" in file :
340
- path , name = file .rsplit ("/" , 1 )
341
- else :
342
- path = "."
343
- name = file
344
- full_path = f"{ path } /{ name } "
345
- if full_path .startswith (workspace ):
346
- key = full_path [len (workspace ):]
347
- else :
348
- key = full_path
349
- key = key .lstrip ("/" )
350
- key = key .lstrip ("./" )
351
- payload = (
352
- key ,
353
- (
354
- name ,
355
- open (full_path , 'rb' )
356
- )
357
- )
358
- send_files .append (payload )
359
- query_params = urlencode (params .__dict__ )
360
- full_uri = f"{ self .config .full_scan_path } ?{ query_params } "
361
- response = self .client .request (full_uri , method = "POST" , files = send_files )
362
- results = response .json ()
363
- full_scan = FullScan (** results )
307
+ params .org_slug = self .config .org_slug
308
+ res = self .sdk .fullscans .post (files , params )
309
+
310
+ # If the response is a string, it's an error message
311
+ if isinstance (res , str ):
312
+ log .error (f"Error creating full scan: { res } " )
313
+ return FullScan ()
314
+
315
+ full_scan = FullScan (** res )
364
316
full_scan .sbom_artifacts = self .get_sbom_data (full_scan .id )
365
317
create_full_end = time .time ()
366
318
total_time = create_full_end - create_full_start
@@ -380,35 +332,29 @@ def get_license_details(package: Package) -> Package:
380
332
def get_head_scan_for_repo (self , repo_slug : str ) -> str :
381
333
"""Get the head scan ID for a repository"""
382
334
print (f"\n Getting head scan for repo: { repo_slug } " )
383
- repo_path = f"{ self .config .repository_path } /{ repo_slug } "
384
- print (f"Repository path: { repo_path } " )
385
335
386
- response = self .client . request ( repo_path )
336
+ response = self .sdk . repos . repo ( self . config . org_slug , repo_slug )
387
337
response_data = response .json ()
388
338
print (f"Raw API Response: { response_data } " ) # Debug raw response
389
- print (f"Response type: { type (response_data )} " ) # Debug response type
390
339
391
- if "repository" in response_data :
392
- print (f"Repository data: { response_data ['repository' ]} " ) # Debug repository data
393
- else :
394
- print ("No 'repository' key in response data!" )
340
+ if not response_data or "repository" not in response_data :
341
+ log .error ("Failed to get repository data from API" )
342
+ return ""
395
343
396
344
repository = Repository (** response_data ["repository" ])
397
345
print (f"Created repository object: { repository .__dict__ } " ) # Debug final object
398
346
399
347
return repository .head_full_scan_id
400
348
349
+ # TODO: this is the same as get_sbom_data. AND IT CALLS GET_SBOM_DATA. huh?
401
350
def get_full_scan (self , full_scan_id : str ) -> FullScan :
402
351
"""
403
352
Get the specified full scan and return a FullScan object
404
353
:param full_scan_id: str - ID of the full scan to pull
405
354
:return:
406
355
"""
407
- full_scan_url = f"{ self .config .full_scan_path } /{ full_scan_id } "
408
- response = self .client .request (full_scan_url )
409
- results = response .json ()
356
+ results = self .get_sbom_data (full_scan_id )
410
357
full_scan = FullScan (** results )
411
- full_scan .sbom_artifacts = self .get_sbom_data (full_scan .id )
412
358
return full_scan
413
359
414
360
def create_new_diff (self , path : str , params : FullScanParams , workspace : str , no_change : bool = False ) -> Diff :
0 commit comments