From fcd59c818c74067b64b2a2c44fa56460df9e6151 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Sun, 5 Mar 2023 16:10:39 -0600 Subject: [PATCH] Add support for injecting truststore.SSLContext into the ssl module --- README.md | 67 +++++++------ docs/source/index.md | 142 +++++++++++++++++---------- noxfile.py | 11 ++- src/truststore/__init__.py | 12 ++- src/truststore/_api.py | 77 ++++++++++++--- src/truststore/_macos.py | 16 ++-- src/truststore/_ssl_constants.py | 12 +++ src/truststore/_windows.py | 11 ++- tests/conftest.py | 160 +++++++++++++++++++++++++++++++ tests/test_custom_ca.py | 149 +--------------------------- tests/test_inject.py | 108 +++++++++++++++++++++ tests/test_sslcontext.py | 1 - 12 files changed, 513 insertions(+), 253 deletions(-) create mode 100644 src/truststore/_ssl_constants.py create mode 100644 tests/conftest.py create mode 100644 tests/test_inject.py diff --git a/README.md b/README.md index 390d150..fada6f1 100644 --- a/README.md +++ b/README.md @@ -3,49 +3,60 @@ [![PyPI](https://img.shields.io/pypi/v/truststore)](https://pypi.org/project/truststore) [![CI](https://github.com/sethmlarson/truststore/actions/workflows/ci.yml/badge.svg?branch=main)](https://github.com/sethmlarson/truststore/actions/workflows/ci.yml) -Verify certificates using OS trust stores. Supports macOS, Windows, and Linux (with OpenSSL). **This project should be considered experimental.** +Verify certificates using OS trust stores. This is useful when your system contains +custom certificate authorities such as when using a corporate proxy or using test certificates. +Supports macOS, Windows, and Linux (with OpenSSL). -## Usage +## Installation -```python -# The following code works on Linux, macOS, and Windows without dependencies. -import socket -import ssl -import truststore +Truststore is installed from [PyPI](https://pypi.org/project/truststore) with pip: -# Create an SSLContext for the system trust store -ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +```{code-block} shell +$ python -m pip install truststore +``` -# Connect to the peer and initiate a TLS handshake -sock = socket.create_connection(("example.com", 443)) -sock = ctx.wrap_socket(sock, server_hostname="example.com") +Truststore **requires Python 3.10 or later** and supports the following platforms: +- macOS 10.8+ via [Security framework](https://developer.apple.com/documentation/security) +- Windows via [CryptoAPI](https://docs.microsoft.com/en-us/windows/win32/seccrypto/cryptography-functions#certificate-verification-functions) +- Linux via OpenSSL -# Also works with libraries that accept an SSLContext object -import urllib3 +## User Guide -http = urllib3.PoolManager(ssl_context=ctx) -http.request("GET", "https://example.com") +You can inject `truststore` into the standard library `ssl` module so the functionality is used +by every library by default. To do so use the `truststore.inject_into_ssl()` function: -# Works with ssl.MemoryBIO objects for async I/O -import aiohttp +```python +import truststore +truststore.inject_into_ssl() +# Automatically works with urllib3, requests, aiohttp, and more: +import urllib3 +http = urllib3.PoolManager() +resp = http.request("GET", "https://example.com") + +import aiohttp http = aiohttp.ClientSession() -await http.request("GET", "https://example.com", ssl=ctx) +resp = await http.request("GET", "https://example.com") + +import requests +resp = requests.get("https://example.com") ``` -## Platforms +If you'd like finer-grained control you can create your own `truststore.SSLContext` instance +and use it anywhere you'd use an `ssl.SSLContext`: -Works in the following configurations: +```python +import ssl +import truststore -- macOS 10.8+ via [Security framework](https://developer.apple.com/documentation/security) -- Windows via [CryptoAPI](https://docs.microsoft.com/en-us/windows/win32/seccrypto/cryptography-functions#certificate-verification-functions) -- Linux via OpenSSL +ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) -## Prior art +import urllib3 +http = urllib3.PoolManager(ssl_context=ctx) +resp = http.request("GET", "https://example.com") +``` -- [The future of trust stores in Python (PyCon US 2022 lightning talk)](https://youtu.be/1IiL31tUEVk?t=698) ([slides](https://speakerdeck.com/sethmlarson/the-future-of-trust-stores-in-python)) -- [Experimental APIs in Python 3.10 and the future of trust stores](https://sethmlarson.dev/blog/2021-11-27/experimental-python-3.10-apis-and-trust-stores) -- [PEP 543: A Unified TLS API for Python](https://www.python.org/dev/peps/pep-0543) +You can read more in the [user guide in the documentation](https://truststore.readthedocs.io/en/latest/#user-guide). ## License diff --git a/docs/source/index.md b/docs/source/index.md index acc9706..0793121 100644 --- a/docs/source/index.md +++ b/docs/source/index.md @@ -5,55 +5,101 @@ :caption: Contents ``` -Verify certificates using OS trust stores. Supports macOS, Windows, and Linux (with OpenSSL). +Verify certificates using OS trust stores. This is useful when your system contains +custom certificate authorities such as when using a corporate proxy or using test certificates. +Supports macOS, Windows, and Linux (with OpenSSL). ```{warning} This project should be considered experimental so shouldn't be used in production. ``` -## Platforms +## Installation -- Requires Python 3.10 or later -- Supports macOS 10.8+ via [Security framework](https://developer.apple.com/documentation/security) -- Supports Windows via [CryptoAPI](https://docs.microsoft.com/en-us/windows/win32/seccrypto/cryptography-functions#certificate-verification-functions) -- Supports Linux via OpenSSL +Truststore can be installed from [PyPI](https://pypi.org/project/truststore) with pip: -## Usage +```{code-block} shell +$ python -m pip install truststore +``` -The `truststore` module has a single API: `truststore.SSLContext` +Truststore **requires Python 3.10 or later** and supports the following platforms: +- macOS 10.8+ via [Security framework](https://developer.apple.com/documentation/security) +- Windows via [CryptoAPI](https://docs.microsoft.com/en-us/windows/win32/seccrypto/cryptography-functions#certificate-verification-functions) +- Linux via OpenSSL -```{code-block} python - import truststore +## User Guide + +You can inject `truststore` into the standard library `ssl` module so the functionality is used +by every library by default. To do so use the `truststore.inject_into_ssl()` function. + +The call to `truststore.inject_into_ssl()` should be called as early as possible in +your program as modules that have already imported `ssl.SSLContext` won't be affected. + +```python +import truststore +truststore.inject_into_ssl() + +# Automatically works with urllib3, requests, aiohttp, and more: +import urllib3 +http = urllib3.PoolManager() +resp = http.request("GET", "https://example.com") + +import aiohttp +http = aiohttp.ClientSession() +resp = await http.request("GET", "https://example.com") + +import requests +resp = requests.get("https://example.com") +``` + +If you'd like finer-grained control you can create your own `truststore.SSLContext` instance +and use it anywhere you'd use an `ssl.SSLContext`: + +```python +import ssl +import truststore + +ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - ctx = truststore.SSLContext() +import urllib3 +http = urllib3.PoolManager(ssl_context=ctx) +resp = http.request("GET", "https://example.com") ``` ### Using truststore with pip -Pip v22.2 includes experimental support for verifying certificates with system trust stores using `truststore`. To enable the feature, use the flag `--use-feature=truststore` when installing a package like so: +[Pip v22.2](https://discuss.python.org/t/announcement-pip-22-2-release/17543) includes experimental support for verifying certificates with system trust stores using `truststore`. To enable the feature, use the flag `--use-feature=truststore` when installing a package like so: ```{code-block} bash - # Install Django using system trust stores - $ python -m pip install --use-feature=truststore Django +# Install Django using system trust stores +$ python -m pip install --use-feature=truststore Django ``` This requires `truststore` to be installed in the same environment as the one running pip and to be running Python 3.10 or later. For more information you can [read the pip documentation about the feature](https://pip.pypa.io/en/stable/user_guide/#using-system-trust-stores-for-verifying-https). ### Using truststore with urllib3 -This `SSLContext` works the same as an {py:class}`ssl.SSLContext`. -You can use it anywhere you would use an {py:class}`ssl.SSLContext` and -system trust stores are automatically used to verify peer certificates: +```{code-block} python +import urllib3 +import truststore + +truststore.inject_into_ssl() + +http = urllib3.PoolManager() +resp = http.request("GET", "https://example.com") +``` + +If you'd like to use the `truststore.SSLContext` directly you can pass +the instance via the `ssl_context` parameter: ```{code-block} python - import ssl - import urllib3 - import truststore +import ssl +import urllib3 +import truststore - ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) - http = urllib3.PoolManager(ssl_context=ctx) - resp = http.request("GET", "https://example.com") +http = urllib3.PoolManager(ssl_context=ctx) +resp = http.request("GET", "https://example.com") ``` ### Using truststore with aiohttp @@ -61,47 +107,45 @@ system trust stores are automatically used to verify peer certificates: Truststore supports wrapping either {py:class}`socket.socket` or {py:class}`ssl.MemoryBIO` which means both synchronous and asynchronous I/O can be used: ```{code-block} python - import ssl - import aiohttp - import truststore +import aiohttp +import truststore - ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +truststore.inject_into_ssl() - http = aiohttp.ClientSession(ssl=ctx) - resp = await http.request("GET", "https://example.com") +http = aiohttp.ClientSession() +resp = await http.request("GET", "https://example.com") ``` -### Using truststore with Requests - -Requests doesn't support passing an {py:class}`ssl.SSLContext` object to a `requests.Session` object directly so there's an additional class you need to inject the `truststore.SSLContext` instance to the lower-level {py:class}`urllib3.PoolManager` instance: +If you'd like to use the `truststore.SSLContext` directly you can pass +the instance via the `ssl` parameter: ```{code-block} python - import ssl - import requests - import requests.adapters - import truststore +import ssl +import aiohttp +import truststore + +ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + +http = aiohttp.ClientSession(ssl=ctx) +resp = await http.request("GET", "https://example.com") +``` - class SSLContextAdapter(requests.adapters.HTTPAdapter): - def __init__(self, *, ssl_context=None, **kwargs): - self._ssl_context = ssl_context - super().__init__(**kwargs) +### Using truststore with Requests - def init_poolmanager(self, *args, **kwargs): - if self._ssl_context is not None: - kwargs.setdefault("ssl_context", self._ssl_context) - return super().init_poolmanager(*args, **kwargs) +Just like with `urllib3` using `truststore.inject_into_ssl()` is the easiest method for using Truststore with Requests: - ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) +```{code-block} python +import requests +import truststore - http = requests.Session() - adapter = SSLContextAdapter(ssl_context=ctx) - http.mount("https://", adapter) +truststore.inject_into_ssl() - resp = http.request("GET", "https://example.com") +resp = requests.request("GET", "https://example.com") ``` ## Prior art +* [pip v22.2 with support for `--use-feature=truststore`](https://discuss.python.org/t/announcement-pip-22-2-release/17543) * [The future of trust stores in Python (PyCon US 2022 lightning talk)](https://youtu.be/1IiL31tUEVk?t=698) ([slides](https://speakerdeck.com/sethmlarson/the-future-of-trust-stores-in-python)) * [Experimental APIs in Python 3.10 and the future of trust stores](https://sethmlarson.dev/blog/2021-11-27/experimental-python-3.10-apis-and-trust-stores) * [PEP 543: A Unified TLS API for Python](https://www.python.org/dev/peps/pep-0543) diff --git a/noxfile.py b/noxfile.py index ff11337..fbcf0cf 100644 --- a/noxfile.py +++ b/noxfile.py @@ -33,7 +33,14 @@ def lint(session): session.run("flake8", "--ignore=E501,W503", *SOURCE_PATHS) session.run("black", "--check", *SOURCE_PATHS) session.run("isort", "--check", "--profile=black", *SOURCE_PATHS) - session.run("mypy", "--strict", "--show-error-codes", "src/") + session.run( + "mypy", + "--strict", + "--show-error-codes", + "--install-types", + "--non-interactive", + "src/", + ) @nox.session(python=PYTHONS) @@ -52,7 +59,7 @@ def test(session): "-rs", "--no-flaky-report", "--max-runs=3", - *(session.posargs or ("tests/",)) + *(session.posargs or ("tests/",)), ) diff --git a/src/truststore/__init__.py b/src/truststore/__init__.py index 07290a0..a00212d 100644 --- a/src/truststore/__init__.py +++ b/src/truststore/__init__.py @@ -1,12 +1,16 @@ -"""Verify certificates using OS trust stores""" +"""Verify certificates using OS trust stores. This is useful when your system contains +custom certificate authorities such as when using a corporate proxy or using test certificates. +Supports macOS, Windows, and Linux (with OpenSSL). +""" import sys as _sys if _sys.version_info < (3, 10): raise ImportError("truststore requires Python 3.10 or later") -del _sys -from ._api import SSLContext # noqa: E402 +from ._api import SSLContext, extract_from_ssl, inject_into_ssl # noqa: E402 -__all__ = ["SSLContext"] +del _api, _sys # type: ignore[name-defined] # noqa: F821 + +__all__ = ["SSLContext", "inject_into_ssl", "extract_from_ssl"] __version__ = "0.5.0" diff --git a/src/truststore/_api.py b/src/truststore/_api.py index 18f7e33..2831c3c 100644 --- a/src/truststore/_api.py +++ b/src/truststore/_api.py @@ -1,10 +1,16 @@ +import array +import ctypes +import mmap import os +import pickle import platform import socket import ssl import typing -from _ssl import ENCODING_DER # type: ignore[import] +import _ssl # type: ignore[import] + +from ._ssl_constants import _original_SSLContext, _original_super_SSLContext if platform.system() == "Windows": from ._windows import _configure_context, _verify_peercerts_impl @@ -13,16 +19,53 @@ else: from ._openssl import _configure_context, _verify_peercerts_impl - +# From typeshed/stdlib/ssl.pyi _StrOrBytesPath: typing.TypeAlias = str | bytes | os.PathLike[str] | os.PathLike[bytes] _PasswordType: typing.TypeAlias = str | bytes | typing.Callable[[], str | bytes] +# From typeshed/stdlib/_typeshed/__init__.py +_ReadableBuffer: typing.TypeAlias = typing.Union[ + bytes, + memoryview, + bytearray, + "array.array[typing.Any]", + mmap.mmap, + "ctypes._CData", + pickle.PickleBuffer, +] + + +def inject_into_ssl() -> None: + """Injects the :class:`truststore.SSLContext` into the ``ssl`` + module by replacing :class:`ssl.SSLContext`. + """ + setattr(ssl, "SSLContext", SSLContext) + # urllib3 holds on to its own reference of ssl.SSLContext + # so we need to replace that reference too. + try: + import urllib3.util.ssl_ as urllib3_ssl + + setattr(urllib3_ssl, "SSLContext", SSLContext) + except ImportError: + pass + + +def extract_from_ssl() -> None: + """Restores the :class:`ssl.SSLContext` class to its original state""" + setattr(ssl, "SSLContext", _original_SSLContext) + try: + import urllib3.util.ssl_ as urllib3_ssl + + urllib3_ssl.SSLContext = _original_SSLContext + except ImportError: + pass + class SSLContext(ssl.SSLContext): """SSLContext API that uses system certificates on all platforms""" - def __init__(self, protocol: int = ssl.PROTOCOL_TLS) -> None: - self._ctx = ssl.SSLContext(protocol) + def __init__(self, protocol: int = None) -> None: # type: ignore[assignment] + self._ctx = _original_SSLContext(protocol) class TruststoreSSLObject(ssl.SSLObject): # This object exists because wrap_bio() doesn't @@ -59,7 +102,7 @@ def wrap_socket( ) try: _verify_peercerts(ssl_sock, server_hostname=server_hostname) - except ssl.SSLError: + except Exception: ssl_sock.close() raise return ssl_sock @@ -86,7 +129,7 @@ def load_verify_locations( self, cafile: str | bytes | os.PathLike[str] | os.PathLike[bytes] | None = None, capath: str | bytes | os.PathLike[str] | os.PathLike[bytes] | None = None, - cadata: str | bytes | None = None, + cadata: str | _ReadableBuffer | None = None, ) -> None: return self._ctx.load_verify_locations( cafile=cafile, capath=capath, cadata=cadata @@ -188,7 +231,9 @@ def options(self) -> ssl.Options: @options.setter def options(self, value: ssl.Options) -> None: - self._ctx.options = value + _original_super_SSLContext.options.__set__( # type: ignore[attr-defined] + self._ctx, value + ) @property def post_handshake_auth(self) -> bool: @@ -212,7 +257,9 @@ def verify_flags(self) -> ssl.VerifyFlags: @verify_flags.setter def verify_flags(self, value: ssl.VerifyFlags) -> None: - self._ctx.verify_flags = value + _original_super_SSLContext.verify_flags.__set__( # type: ignore[attr-defined] + self._ctx, value + ) @property def verify_mode(self) -> ssl.VerifyMode: @@ -220,7 +267,9 @@ def verify_mode(self) -> ssl.VerifyMode: @verify_mode.setter def verify_mode(self, value: ssl.VerifyMode) -> None: - self._ctx.verify_mode = value + _original_super_SSLContext.verify_mode.__set__( # type: ignore[attr-defined] + self._ctx, value + ) def _verify_peercerts( @@ -237,9 +286,13 @@ def _verify_peercerts( except AttributeError: pass - cert_bytes = [ - cert.public_bytes(ENCODING_DER) for cert in sslobj.get_unverified_chain() # type: ignore[attr-defined] - ] + # SSLObject.get_unverified_chain() returns 'None' + # if the peer sends no certificates. This is common + # for the server-side scenario. + unverified_chain: typing.Sequence[_ssl.Certificate] = ( + sslobj.get_unverified_chain() or () # type: ignore[attr-defined] + ) + cert_bytes = [cert.public_bytes(_ssl.ENCODING_DER) for cert in unverified_chain] _verify_peercerts_impl( sock_or_sslobj.context, cert_bytes, server_hostname=server_hostname ) diff --git a/src/truststore/_macos.py b/src/truststore/_macos.py index b8fd57e..e78c0af 100644 --- a/src/truststore/_macos.py +++ b/src/truststore/_macos.py @@ -16,6 +16,8 @@ ) from ctypes.util import find_library +from ._ssl_constants import _set_ssl_context_verify_mode + _mac_version = platform.mac_ver()[0] _mac_version_info = tuple(map(int, _mac_version.split("."))) if _mac_version_info < (10, 8): @@ -255,9 +257,9 @@ def _handle_osstatus(result: OSStatus, _: typing.Any, args: typing.Any) -> typin raise ssl.SSLError(message) -Security.SecTrustCreateWithCertificates.errcheck = _handle_osstatus # type: ignore[assignment,misc] -Security.SecTrustSetAnchorCertificates.errcheck = _handle_osstatus # type: ignore[assignment,misc] -Security.SecTrustGetTrustResult.errcheck = _handle_osstatus # type: ignore[assignment,misc] +Security.SecTrustCreateWithCertificates.errcheck = _handle_osstatus # type: ignore[assignment] +Security.SecTrustSetAnchorCertificates.errcheck = _handle_osstatus # type: ignore[assignment] +Security.SecTrustGetTrustResult.errcheck = _handle_osstatus # type: ignore[assignment] class CFConst: @@ -346,13 +348,15 @@ def _der_certs_to_cf_cert_array(certs: list[bytes]) -> CFMutableArrayRef: # typ @contextlib.contextmanager def _configure_context(ctx: ssl.SSLContext) -> typing.Iterator[None]: - values = ctx.check_hostname, ctx.verify_mode + check_hostname = ctx.check_hostname + verify_mode = ctx.verify_mode ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + _set_ssl_context_verify_mode(ctx, ssl.CERT_NONE) try: yield finally: - ctx.check_hostname, ctx.verify_mode = values + ctx.check_hostname = check_hostname + _set_ssl_context_verify_mode(ctx, verify_mode) def _verify_peercerts_impl( diff --git a/src/truststore/_ssl_constants.py b/src/truststore/_ssl_constants.py new file mode 100644 index 0000000..be60f83 --- /dev/null +++ b/src/truststore/_ssl_constants.py @@ -0,0 +1,12 @@ +import ssl + +# Hold on to the original class so we can create it consistently +# even if we inject our own SSLContext into the ssl module. +_original_SSLContext = ssl.SSLContext +_original_super_SSLContext = super(_original_SSLContext, _original_SSLContext) + + +def _set_ssl_context_verify_mode( + ssl_context: ssl.SSLContext, verify_mode: ssl.VerifyMode +) -> None: + _original_super_SSLContext.verify_mode.__set__(ssl_context, verify_mode) # type: ignore[attr-defined] diff --git a/src/truststore/_windows.py b/src/truststore/_windows.py index 9596570..3de4960 100644 --- a/src/truststore/_windows.py +++ b/src/truststore/_windows.py @@ -29,6 +29,8 @@ ) from typing import TYPE_CHECKING, Any +from ._ssl_constants import _set_ssl_context_verify_mode + HCERTCHAINENGINE = HANDLE HCERTSTORE = HANDLE HCRYPTPROV_LEGACY = HANDLE @@ -458,7 +460,6 @@ def _get_and_verify_cert_chain( # Check status error_code = policy_status.dwError if error_code: - # Try getting a human readable message for an error code. error_message_buf = create_unicode_buffer(1024) error_message_chars = FormatMessageW( @@ -542,10 +543,12 @@ def _verify_using_custom_ca_certs( @contextlib.contextmanager def _configure_context(ctx: ssl.SSLContext) -> typing.Iterator[None]: - values = ctx.check_hostname, ctx.verify_mode + check_hostname = ctx.check_hostname + verify_mode = ctx.verify_mode ctx.check_hostname = False - ctx.verify_mode = ssl.CERT_NONE + _set_ssl_context_verify_mode(ctx, ssl.CERT_NONE) try: yield finally: - ctx.check_hostname, ctx.verify_mode = values + ctx.check_hostname = check_hostname + _set_ssl_context_verify_mode(ctx, verify_mode) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c9b23a2 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,160 @@ +import asyncio +import logging +import pathlib +import ssl +import typing +from dataclasses import dataclass +from tempfile import TemporaryDirectory + +import pytest +import pytest_asyncio +from aiohttp import web + +MKCERT_CA_NOT_INSTALLED = b"local CA is not installed in the system trust store" +MKCERT_CA_ALREADY_INSTALLED = b"local CA is now installed in the system trust store" +SUBPROCESS_TIMEOUT = 5 + +# To avoid getting the SSLContext injected by truststore. +original_SSLContext = ssl.SSLContext + + +successful_hosts = pytest.mark.parametrize("host", ["example.com", "1.1.1.1"]) + +logger = logging.getLogger("aiohttp.web") + + +@pytest_asyncio.fixture +async def mkcert() -> typing.AsyncIterator[None]: + async def is_mkcert_available() -> bool: + try: + p = await asyncio.create_subprocess_exec( + "mkcert", + "-help", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + except FileNotFoundError: + return False + await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT) + return p.returncode == 0 + + # Checks to see if mkcert is available at all. + if not await is_mkcert_available(): + pytest.skip("Install mkcert to run custom CA tests") + + # Now we attempt to install the root certificate + # to the system trust store. Keep track if we should + # call mkcert -uninstall at the end. + should_mkcert_uninstall = False + try: + p = await asyncio.create_subprocess_exec( + "mkcert", + "-install", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + await p.wait() + assert p.returncode == 0 + + # See if the root cert was installed for the first + # time, if so we want to leave no trace. + stdout, _ = await p.communicate() + should_mkcert_uninstall = MKCERT_CA_ALREADY_INSTALLED in stdout + + yield + + finally: + # Only uninstall mkcert root cert if it wasn't + # installed before our attempt to install. + if should_mkcert_uninstall: + p = await asyncio.create_subprocess_exec( + "mkcert", + "-uninstall", + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.DEVNULL, + ) + await p.wait() + + +@dataclass +class CertFiles: + key_file: pathlib.Path + cert_file: pathlib.Path + + +@pytest_asyncio.fixture +async def mkcert_certs(mkcert: None) -> typing.AsyncIterator[CertFiles]: + with TemporaryDirectory() as tmp_dir: + # Create the structure we'll eventually return + # as long as mkcert succeeds in creating the certs. + tmpdir_path = pathlib.Path(tmp_dir) + certs = CertFiles( + cert_file=tmpdir_path / "localhost.pem", + key_file=tmpdir_path / "localhost-key.pem", + ) + + cmd = ( + "mkcert" + f" -cert-file {certs.cert_file}" + f" -key-file {certs.key_file}" + " localhost" + ) + p = await asyncio.create_subprocess_shell( + cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT) + + # Check for any signs that mkcert wasn't able to issue certs + # or that the CA isn't installed + stdout, _ = await p.communicate() + if MKCERT_CA_NOT_INSTALLED in stdout or p.returncode != 0: + raise RuntimeError( + f"mkcert couldn't issue certificates " + f"(exited with {p.returncode}): {stdout.decode()}" + ) + + yield certs + + +@dataclass +class Server: + host: str + port: int + + @property + def base_url(self) -> str: + return f"https://{self.host}:{self.port}" + + +@pytest_asyncio.fixture(scope="function") +async def server(mkcert_certs: CertFiles) -> typing.AsyncIterator[Server]: + async def handler(request: web.Request) -> web.Response: + # Check the request was served over HTTPS. + assert request.scheme == "https" + + return web.Response(status=200) + + app = web.Application() + app.add_routes([web.get("/", handler)]) + + ctx = original_SSLContext(ssl.PROTOCOL_TLS_SERVER) + ctx.load_cert_chain( + certfile=mkcert_certs.cert_file, + keyfile=mkcert_certs.key_file, + ) + + # we need keepalive_timeout=0 + # see https://github.com/aio-libs/aiohttp/issues/5426 + runner = web.AppRunner(app, keepalive_timeout=0) + await runner.setup() + port = 9999 # Arbitrary choice. + site = web.TCPSite(runner, ssl_context=ctx, port=port) + + await site.start() + try: + yield Server(host="localhost", port=port) + finally: + await site.stop() + await runner.cleanup() diff --git a/tests/test_custom_ca.py b/tests/test_custom_ca.py index f6679bf..254a21e 100644 --- a/tests/test_custom_ca.py +++ b/tests/test_custom_ca.py @@ -1,159 +1,14 @@ import asyncio -import pathlib import ssl -import typing -from dataclasses import dataclass -from tempfile import TemporaryDirectory import pytest -import pytest_asyncio import requests import urllib3 -from aiohttp import ClientSession, web +from aiohttp import ClientSession import truststore from tests import SSLContextAdapter - -MKCERT_CA_NOT_INSTALLED = b"local CA is not installed in the system trust store" -MKCERT_CA_ALREADY_INSTALLED = b"local CA is now installed in the system trust store" -SUBPROCESS_TIMEOUT = 5 - - -@pytest_asyncio.fixture -async def mkcert() -> typing.AsyncIterator[None]: - async def is_mkcert_available() -> bool: - try: - p = await asyncio.create_subprocess_exec( - "mkcert", - "-help", - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, - ) - except FileNotFoundError: - return False - await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT) - return p.returncode == 0 - - # Checks to see if mkcert is available at all. - if not await is_mkcert_available(): - pytest.skip("Install mkcert to run custom CA tests") - - # Now we attempt to install the root certificate - # to the system trust store. Keep track if we should - # call mkcert -uninstall at the end. - should_mkcert_uninstall = False - try: - p = await asyncio.create_subprocess_exec( - "mkcert", - "-install", - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - await p.wait() - assert p.returncode == 0 - - # See if the root cert was installed for the first - # time, if so we want to leave no trace. - stdout, _ = await p.communicate() - should_mkcert_uninstall = MKCERT_CA_ALREADY_INSTALLED in stdout - - yield - - finally: - # Only uninstall mkcert root cert if it wasn't - # installed before our attempt to install. - if should_mkcert_uninstall: - p = await asyncio.create_subprocess_exec( - "mkcert", - "-uninstall", - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, - ) - await p.wait() - - -@dataclass -class CertFiles: - key_file: pathlib.Path - cert_file: pathlib.Path - - -@pytest_asyncio.fixture -async def mkcert_certs(mkcert: None) -> typing.AsyncIterator[CertFiles]: - with TemporaryDirectory() as tmp_dir: - - # Create the structure we'll eventually return - # as long as mkcert succeeds in creating the certs. - tmpdir_path = pathlib.Path(tmp_dir) - certs = CertFiles( - cert_file=tmpdir_path / "localhost.pem", - key_file=tmpdir_path / "localhost-key.pem", - ) - - cmd = ( - "mkcert" - f" -cert-file {certs.cert_file}" - f" -key-file {certs.key_file}" - " localhost" - ) - p = await asyncio.create_subprocess_shell( - cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - await asyncio.wait_for(p.wait(), timeout=SUBPROCESS_TIMEOUT) - - # Check for any signs that mkcert wasn't able to issue certs - # or that the CA isn't installed - stdout, _ = await p.communicate() - if MKCERT_CA_NOT_INSTALLED in stdout or p.returncode != 0: - raise RuntimeError( - f"mkcert couldn't issue certificates " - f"(exited with {p.returncode}): {stdout.decode()}" - ) - - yield certs - - -@dataclass -class Server: - host: str - port: int - - @property - def base_url(self) -> str: - return f"https://{self.host}:{self.port}" - - -@pytest_asyncio.fixture -async def server(mkcert_certs: CertFiles) -> typing.AsyncIterator[Server]: - async def handler(request: web.Request) -> web.Response: - # Check the request was served over HTTPS. - assert request.scheme == "https" - - return web.Response(status=200) - - app = web.Application() - app.add_routes([web.get("/", handler)]) - - ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) - ctx.load_cert_chain( - certfile=mkcert_certs.cert_file, - keyfile=mkcert_certs.key_file, - ) - - # we need keepalive_timeout=0 - # see https://github.com/aio-libs/aiohttp/issues/5426 - runner = web.AppRunner(app, keepalive_timeout=0) - await runner.setup() - port = 9999 # Arbitrary choice. - site = web.TCPSite(runner, ssl_context=ctx, port=port) - await site.start() - try: - yield Server(host="localhost", port=port) - finally: - await site.stop() - await runner.cleanup() +from tests.conftest import Server @pytest.mark.asyncio diff --git a/tests/test_inject.py b/tests/test_inject.py new file mode 100644 index 0000000..1a016e8 --- /dev/null +++ b/tests/test_inject.py @@ -0,0 +1,108 @@ +import asyncio +import ssl + +import pytest +import requests +import urllib3 +from aiohttp import ClientSession + +import truststore +from tests.conftest import Server, successful_hosts + + +@pytest.fixture(scope="function") +def inject_truststore(): + truststore.inject_into_ssl() + try: + yield + finally: + truststore.extract_from_ssl() + + +def test_inject_and_extract(): + assert ssl.SSLContext is not truststore.SSLContext + try: + original_SSLContext = ssl.SSLContext + + ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + assert isinstance(ctx._ctx, original_SSLContext) + + truststore.inject_into_ssl() + assert ssl.SSLContext is truststore.SSLContext + assert urllib3.util.ssl_.SSLContext is truststore.SSLContext + + ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + assert isinstance(ctx._ctx, original_SSLContext) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + assert isinstance(ctx, truststore.SSLContext) + + ctx = ssl.create_default_context() + assert isinstance(ctx, truststore.SSLContext) + + truststore.extract_from_ssl() + assert ssl.SSLContext is original_SSLContext + assert urllib3.util.ssl_.SSLContext is original_SSLContext + + ctx = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + assert isinstance(ctx._ctx, original_SSLContext) + + ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) + assert isinstance(ctx, original_SSLContext) + + ctx = ssl.create_default_context() + assert isinstance(ctx, original_SSLContext) + finally: + truststore.extract_from_ssl() + + +@successful_hosts +@pytest.mark.usefixtures("inject_truststore") +def test_success_with_inject(host): + with urllib3.PoolManager() as http: + resp = http.request("GET", f"https://{host}") + assert resp.status == 200 + + +@pytest.mark.usefixtures("inject_truststore") +def test_inject_set_values(): + ctx = ssl.create_default_context() + assert isinstance(ctx, truststore.SSLContext) + + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + + assert ctx.check_hostname is False + assert ctx.verify_mode == ssl.CERT_NONE + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("inject_truststore") +async def test_urllib3_works_with_inject(server: Server) -> None: + def test_urllib3(): + with urllib3.PoolManager() as client: + resp = client.request("GET", server.base_url) + assert resp.status == 200 + + thread = asyncio.to_thread(test_urllib3) + await thread + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("inject_truststore") +async def test_aiohttp_works_with_inject(server: Server) -> None: + async with ClientSession() as client: + resp = await client.get(server.base_url) + assert resp.status == 200 + + +@pytest.mark.asyncio +@pytest.mark.usefixtures("inject_truststore") +async def test_requests_works_with_inject(server: Server) -> None: + def test_requests(): + with requests.Session() as http: + resp = http.request("GET", server.base_url) + assert resp.status_code == 200 + + thread = asyncio.to_thread(test_requests) + await thread diff --git a/tests/test_sslcontext.py b/tests/test_sslcontext.py index af84294..ea78d28 100644 --- a/tests/test_sslcontext.py +++ b/tests/test_sslcontext.py @@ -13,7 +13,6 @@ def test_minimum_maximum_version(): ctx.maximum_version = ssl.TLSVersion.TLSv1_2 with urllib3.PoolManager(ssl_context=ctx) as http: - resp = http.request("GET", "https://howsmyssl.com/a/check") data = json.loads(resp.data) assert data["tls_version"] == "TLS 1.2"