Skip to content

Commit cfb301b

Browse files
authored
Merge pull request #60 from SocketDev/doug/add-integration-settings-endpoints
feat: Add comprehensive SDK enhancements and new endpoint modules
2 parents e518f11 + ca719d4 commit cfb301b

File tree

13 files changed

+637
-13
lines changed

13 files changed

+637
-13
lines changed

socketdev/__init__.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import os
12
from socketdev.core.api import API
23
from socketdev.dependencies import Dependencies
34
from socketdev.diffscans import DiffScans
@@ -26,7 +27,14 @@
2627
from socketdev.alerttypes import AlertTypes
2728
from socketdev.basics import Basics
2829
from socketdev.uploadmanifests import UploadManifests
30+
from socketdev.alertfullscansearch import AlertFullScanSearch
31+
from socketdev.alerts import Alerts
32+
from socketdev.fixes import Fixes
33+
from socketdev.supportedfiles import SupportedFiles
34+
from socketdev.webhooks import Webhooks
35+
from socketdev.telemetry import Telemetry
2936
from socketdev.log import log
37+
from typing import Optional
3038

3139
__author__ = "socket.dev"
3240
__version__ = __version__
@@ -44,7 +52,22 @@
4452

4553

4654
class socketdev:
47-
def __init__(self, token: str, timeout: int = 1200, allow_unverified: bool = False):
55+
def __init__(self, token: Optional[str] = None, timeout: int = 1200, allow_unverified: bool = False):
56+
# Try to get token from environment variables if not provided
57+
if token is None:
58+
token = (
59+
os.getenv("SOCKET_SECURITY_API_TOKEN") or
60+
os.getenv("SOCKET_SECURITY_API_KEY") or
61+
os.getenv("SOCKET_API_KEY") or
62+
os.getenv("SOCKET_API_TOKEN")
63+
)
64+
65+
if token is None:
66+
raise ValueError(
67+
"API token is required. Provide it as a parameter or set one of these environment variables: "
68+
"SOCKET_SECURITY_API_TOKEN, SOCKET_SECURITY_API_KEY, SOCKET_API_KEY, SOCKET_API_TOKEN"
69+
)
70+
4871
self.api = API()
4972
self.token = token + ":"
5073
self.api.encode_key(self.token)
@@ -77,6 +100,12 @@ def __init__(self, token: str, timeout: int = 1200, allow_unverified: bool = Fal
77100
self.alerttypes = AlertTypes(self.api)
78101
self.basics = Basics(self.api)
79102
self.uploadmanifests = UploadManifests(self.api)
103+
self.alertfullscansearch = AlertFullScanSearch(self.api)
104+
self.alerts = Alerts(self.api)
105+
self.fixes = Fixes(self.api)
106+
self.supportedfiles = SupportedFiles(self.api)
107+
self.webhooks = Webhooks(self.api)
108+
self.telemetry = Telemetry(self.api)
80109

81110
@staticmethod
82111
def set_timeout(timeout: int):
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import logging
2+
from urllib.parse import urlencode
3+
4+
log = logging.getLogger("socketdev")
5+
6+
7+
class AlertFullScanSearch:
8+
def __init__(self, api):
9+
self.api = api
10+
11+
def search(self, org_slug: str, **query_params) -> dict:
12+
"""
13+
Search alerts across full scans.
14+
15+
Args:
16+
org_slug: Organization slug
17+
**query_params: Optional query parameters for filtering
18+
19+
Returns:
20+
dict containing search results
21+
"""
22+
path = f"orgs/{org_slug}/alert-full-scan-search"
23+
if query_params:
24+
path += "?" + urlencode(query_params)
25+
26+
response = self.api.do_request(path=path)
27+
28+
if response.status_code == 200:
29+
return response.json()
30+
31+
log.error(f"Error searching alerts: {response.status_code}")
32+
log.error(response.text)
33+
return {}

socketdev/alerts/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import logging
2+
from urllib.parse import urlencode
3+
4+
log = logging.getLogger("socketdev")
5+
6+
7+
class Alerts:
8+
def __init__(self, api):
9+
self.api = api
10+
11+
def get(self, org_slug: str, **query_params) -> dict:
12+
"""
13+
Get alerts for an organization.
14+
15+
Args:
16+
org_slug: Organization slug
17+
**query_params: Optional query parameters for filtering
18+
19+
Returns:
20+
dict containing alerts data
21+
"""
22+
path = f"orgs/{org_slug}/alerts"
23+
if query_params:
24+
path += "?" + urlencode(query_params)
25+
26+
response = self.api.do_request(path=path)
27+
28+
if response.status_code == 200:
29+
return response.json()
30+
31+
log.error(f"Error getting alerts: {response.status_code}")
32+
log.error(response.text)
33+
return {}

socketdev/export/__init__.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,28 @@ def spdx_bom(
7373
log.error(f"Error exporting SPDX BOM: {response.status_code}")
7474
log.error(response.text)
7575
return {}
76+
77+
def openvex_bom(
78+
self, org_slug: str, id: str, query_params: Optional[ExportQueryParams] = None, use_types: bool = False
79+
) -> dict:
80+
"""
81+
Export a Socket SBOM as an OpenVEX SBOM
82+
:param org_slug: String - The slug of the organization
83+
:param id: String - The id of either a full scan or an sbom report
84+
:param query_params: Optional[ExportQueryParams] - Query parameters for filtering
85+
:param use_types: Optional[bool] - Whether to return typed responses
86+
:return: dict
87+
"""
88+
path = f"orgs/{org_slug}/export/openvex/{id}"
89+
if query_params:
90+
path += query_params.to_query_params()
91+
response = self.api.do_request(path=path)
92+
93+
if response.status_code == 200:
94+
return response.json()
95+
# TODO: Add typed response when types are defined
96+
97+
log.error(f"Error exporting OpenVEX BOM: {response.status_code}")
98+
log.error(response.text)
99+
return {}
100+

socketdev/fixes/__init__.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import logging
2+
from urllib.parse import urlencode
3+
4+
log = logging.getLogger("socketdev")
5+
6+
7+
class Fixes:
8+
def __init__(self, api):
9+
self.api = api
10+
11+
def get(self, org_slug: str, **query_params) -> dict:
12+
"""
13+
Get available fixes for an organization.
14+
15+
Args:
16+
org_slug: Organization slug
17+
**query_params: Optional query parameters for filtering
18+
19+
Returns:
20+
dict containing available fixes
21+
"""
22+
path = f"orgs/{org_slug}/fixes"
23+
if query_params:
24+
path += "?" + urlencode(query_params)
25+
26+
response = self.api.do_request(path=path)
27+
28+
if response.status_code == 200:
29+
return response.json()
30+
31+
log.error(f"Error getting fixes: {response.status_code}")
32+
log.error(response.text)
33+
return {}

0 commit comments

Comments
 (0)