Skip to content

Commit b987569

Browse files
add unit test for proxy cert validation
1 parent e33b77c commit b987569

File tree

2 files changed

+215
-0
lines changed

2 files changed

+215
-0
lines changed

tests/01-unit/api/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Unit tests for API client functionality
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
import os
2+
import tempfile
3+
from unittest.mock import MagicMock, mock_open, patch
4+
5+
import pytest
6+
7+
from pycti import OpenCTIApiClient
8+
9+
10+
class TestOpenCTIApiClient:
11+
"""Test OpenCTIApiClient certificate handling functionality."""
12+
13+
SAMPLE_CERTIFICATE = """-----BEGIN CERTIFICATE-----
14+
MIIDXTCCAkWgAwIBAgIJAKLdQVPy90WjMA0GCSqGSIb3DQEBCwUAMEUxCzAJBgNV
15+
BAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEwHwYDVQQKDBhJbnRlcm5ldCBX
16+
aWRnaXRzIFB0eSBMdGQwHhcNMjQwMTAxMDAwMDAwWhcNMjUwMTAxMDAwMDAwWjBF
17+
MQswCQYDVQQGEwJBVTETMBEGA1UECAwKU29tZS1TdGF0ZTEhMB8GA1UECgwYSW50
18+
ZXJuZXQgV2lkZ2l0cyBQdHkgTHRkMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIB
19+
CgKCAQEA0Z3VS5JJcds3xfn/ygWyF0qJDr9oYRH/9dMfqHCOq45DqMVJLJBJnMzN
20+
-----END CERTIFICATE-----"""
21+
22+
INVALID_CONTENT = "This is not a certificate"
23+
24+
@pytest.fixture
25+
def api_client(self):
26+
"""Create an API client instance without performing health check."""
27+
with patch.object(OpenCTIApiClient, "_setup_proxy_certificates"):
28+
client = OpenCTIApiClient(
29+
url="http://localhost:4000",
30+
token="test-token",
31+
ssl_verify=False,
32+
perform_health_check=False,
33+
)
34+
# Mock the logger
35+
client.app_logger = MagicMock()
36+
return client
37+
38+
def test_get_certificate_content_inline_pem(self, api_client):
39+
"""Test _get_certificate_content with inline PEM certificate."""
40+
result = api_client._get_certificate_content(self.SAMPLE_CERTIFICATE)
41+
42+
assert result == self.SAMPLE_CERTIFICATE
43+
api_client.app_logger.debug.assert_called_with(
44+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
45+
)
46+
47+
def test_get_certificate_content_file_path(self, api_client):
48+
"""Test _get_certificate_content with a file path containing certificate."""
49+
# Create a temporary file with certificate content
50+
with tempfile.NamedTemporaryFile(
51+
mode="w", suffix=".crt", delete=False
52+
) as cert_file:
53+
cert_file.write(self.SAMPLE_CERTIFICATE)
54+
cert_file_path = cert_file.name
55+
56+
try:
57+
result = api_client._get_certificate_content(cert_file_path)
58+
59+
assert result == self.SAMPLE_CERTIFICATE
60+
api_client.app_logger.debug.assert_called_with(
61+
"HTTPS_CA_CERTIFICATES contains valid certificate file path",
62+
{"file_path": cert_file_path},
63+
)
64+
finally:
65+
# Clean up
66+
os.unlink(cert_file_path)
67+
68+
def test_get_certificate_content_invalid_file_content(self, api_client):
69+
"""Test _get_certificate_content with a file containing invalid certificate."""
70+
# Create a temporary file with invalid content
71+
with tempfile.NamedTemporaryFile(
72+
mode="w", suffix=".txt", delete=False
73+
) as invalid_file:
74+
invalid_file.write(self.INVALID_CONTENT)
75+
invalid_file_path = invalid_file.name
76+
77+
try:
78+
result = api_client._get_certificate_content(invalid_file_path)
79+
80+
assert result is None
81+
api_client.app_logger.warning.assert_called_with(
82+
"File at HTTPS_CA_CERTIFICATES path does not contain valid certificate",
83+
{"file_path": invalid_file_path},
84+
)
85+
finally:
86+
# Clean up
87+
os.unlink(invalid_file_path)
88+
89+
def test_get_certificate_content_nonexistent_file(self, api_client):
90+
"""Test _get_certificate_content with a nonexistent file path."""
91+
nonexistent_path = "/tmp/nonexistent_certificate.crt"
92+
93+
result = api_client._get_certificate_content(nonexistent_path)
94+
95+
assert result is None
96+
97+
def test_get_certificate_content_invalid_content(self, api_client):
98+
"""Test _get_certificate_content with invalid content (not PEM, not file)."""
99+
result = api_client._get_certificate_content(self.INVALID_CONTENT)
100+
101+
assert result is None
102+
103+
def test_get_certificate_content_whitespace_handling(self, api_client):
104+
"""Test _get_certificate_content handles whitespace correctly."""
105+
# Test with certificate content with leading/trailing whitespace
106+
cert_with_whitespace = f" \n{self.SAMPLE_CERTIFICATE} \n"
107+
result = api_client._get_certificate_content(cert_with_whitespace)
108+
109+
assert result == cert_with_whitespace # Should return as-is
110+
api_client.app_logger.debug.assert_called_with(
111+
"HTTPS_CA_CERTIFICATES contains inline certificate content"
112+
)
113+
114+
def test_get_certificate_content_file_read_permission_error(self, api_client):
115+
"""Test _get_certificate_content when file exists but can't be read."""
116+
# Create a temporary file
117+
with tempfile.NamedTemporaryFile(
118+
mode="w", suffix=".crt", delete=False
119+
) as cert_file:
120+
cert_file.write(self.SAMPLE_CERTIFICATE)
121+
cert_file_path = cert_file.name
122+
123+
try:
124+
# Make file unreadable (on Unix systems)
125+
os.chmod(cert_file_path, 0o000)
126+
127+
result = api_client._get_certificate_content(cert_file_path)
128+
129+
assert result is None
130+
# Check that warning was logged about failed read
131+
assert any(
132+
call[0][0] == "Failed to read certificate file"
133+
for call in api_client.app_logger.warning.call_args_list
134+
)
135+
finally:
136+
# Restore permissions and clean up
137+
os.chmod(cert_file_path, 0o644)
138+
os.unlink(cert_file_path)
139+
140+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": ""})
141+
def test_setup_proxy_certificates_no_env(self, api_client):
142+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES is not set."""
143+
api_client._setup_proxy_certificates()
144+
145+
# Should return early without setting ssl_verify
146+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
147+
148+
@patch.dict(os.environ, {})
149+
def test_setup_proxy_certificates_env_not_present(self, api_client):
150+
"""Test _setup_proxy_certificates when HTTPS_CA_CERTIFICATES env var doesn't exist."""
151+
api_client._setup_proxy_certificates()
152+
153+
# Should return early without setting ssl_verify
154+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
155+
156+
@patch("tempfile.mkdtemp")
157+
@patch("os.path.isfile")
158+
@patch("builtins.open", new_callable=mock_open)
159+
@patch.dict(
160+
os.environ,
161+
{
162+
"HTTPS_CA_CERTIFICATES": "-----BEGIN CERTIFICATE-----\ntest\n-----END CERTIFICATE-----"
163+
},
164+
)
165+
def test_setup_proxy_certificates_with_inline_cert(
166+
self, mock_file, mock_isfile, mock_mkdtemp, api_client
167+
):
168+
"""Test _setup_proxy_certificates with inline certificate content."""
169+
# Setup mocks
170+
mock_mkdtemp.return_value = "/tmp/test_certs"
171+
mock_isfile.side_effect = (
172+
lambda path: path == "/etc/ssl/certs/ca-certificates.crt"
173+
)
174+
175+
# Mock system certificate content
176+
system_cert_content = (
177+
"-----BEGIN CERTIFICATE-----\nsystem\n-----END CERTIFICATE-----"
178+
)
179+
180+
def open_side_effect(path, mode="r"):
181+
if path == "/etc/ssl/certs/ca-certificates.crt" and mode == "r":
182+
return mock_open(read_data=system_cert_content)()
183+
return mock_file()
184+
185+
with patch("builtins.open", side_effect=open_side_effect):
186+
api_client._setup_proxy_certificates()
187+
188+
# Verify proxy certificates were processed
189+
api_client.app_logger.info.assert_called()
190+
191+
@patch("tempfile.mkdtemp")
192+
@patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": "/path/to/cert.crt"})
193+
def test_setup_proxy_certificates_with_invalid_path(self, mock_mkdtemp, api_client):
194+
"""Test _setup_proxy_certificates with invalid certificate file path."""
195+
mock_mkdtemp.return_value = "/tmp/test_certs"
196+
197+
# Mock _get_certificate_content to return None (invalid)
198+
with patch.object(api_client, "_get_certificate_content", return_value=None):
199+
api_client._setup_proxy_certificates()
200+
201+
# Should log warning and return early
202+
api_client.app_logger.warning.assert_called()
203+
assert not hasattr(api_client, "ssl_verify") or api_client.ssl_verify is False
204+
205+
def test_setup_proxy_certificates_exception_handling(self, api_client):
206+
"""Test _setup_proxy_certificates handles exceptions gracefully."""
207+
with patch.dict(os.environ, {"HTTPS_CA_CERTIFICATES": self.SAMPLE_CERTIFICATE}):
208+
with patch("tempfile.mkdtemp", side_effect=Exception("Mock error")):
209+
api_client._setup_proxy_certificates()
210+
211+
# Should log warning and continue
212+
api_client.app_logger.warning.assert_called_with(
213+
"Failed to setup proxy certificates", {"error": "Mock error"}
214+
)

0 commit comments

Comments
 (0)