Skip to content

feat: Add HTTP proxy configuration modules to CDK #630

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
ConfiguredAirbyteStreamSerializer,
ConnectorSpecificationSerializer,
)
from .http_proxy_config import HttpProxyConfig
from .well_known_types import (
BinaryData,
Boolean,
Expand Down
27 changes: 27 additions & 0 deletions airbyte_cdk/models/http_proxy_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""HTTP proxy configuration models."""

from typing import Optional

from pydantic.v1 import BaseModel, Field


class HttpProxyConfig(BaseModel):
"""Configuration model for HTTP proxy settings."""

proxy_url: str = Field(
...,
title="Proxy URL",
description="The URL of the HTTP proxy server to use for requests",
examples=["http://proxy.example.com:8080", "https://proxy.example.com:8080"],
)
proxy_ca_certificate: Optional[str] = Field(
None,
title="Proxy CA Certificate",
description="Custom CA certificate for the proxy server in PEM format",
airbyte_secret=True,
)

class Config:
title = "HTTP Proxy Configuration"
description = "Configuration for routing HTTP requests through a proxy server"
9 changes: 8 additions & 1 deletion airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,16 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from .http_proxy import configure_custom_http_proxy
from .is_cloud_environment import is_cloud_environment
from .print_buffer import PrintBuffer
from .schema_inferrer import SchemaInferrer
from .traced_exception import AirbyteTracedException

__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment", "PrintBuffer"]
__all__ = [
"AirbyteTracedException",
"SchemaInferrer",
"is_cloud_environment",
"PrintBuffer",
"configure_custom_http_proxy",
]
107 changes: 107 additions & 0 deletions airbyte_cdk/utils/http_proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""HTTP proxy configuration utilities."""

import os
import tempfile
from logging import Logger
from pathlib import Path
from typing import Optional

PROXY_PARENT_CONFIG_KEY = "http_proxy"
PROXY_URL_CONFIG_KEY = "proxy_url"
PROXY_CA_CERTIFICATE_CONFIG_KEY = "proxy_ca_certificate"


AIRBYTE_NO_PROXY_ENTRIES = [
"localhost",
"127.0.0.1",
"*.local",
"169.254.169.254",
"metadata.google.internal",
"*.airbyte.io",
"*.airbyte.com",
"connectors.airbyte.com",
"sentry.io",
"api.segment.io",
"*.sentry.io",
"*.datadoghq.com",
"app.datadoghq.com",
]


def _get_no_proxy_entries_from_env_var() -> list[str]:
"""Return a list of entries from the NO_PROXY environment variable."""
if "NO_PROXY" in os.environ:
return [x.strip() for x in os.environ["NO_PROXY"].split(",") if x.strip()]

return []


def _get_no_proxy_string() -> str:
"""Return a string to be used as the NO_PROXY environment variable.

This ensures that requests to these hosts bypass the proxy.
"""
return ",".join(
filter(
None,
list(set(_get_no_proxy_entries_from_env_var() + AIRBYTE_NO_PROXY_ENTRIES)),
)
)


def _install_ca_certificate(ca_cert_file_text: str) -> Path:
"""Install the CA certificate for the proxy.

This involves saving the text to a local file and then setting
the appropriate environment variables to use this certificate.

Returns the path to the temporary CA certificate file.
"""
with tempfile.NamedTemporaryFile(
mode="w",
delete=False,
prefix="airbyte-custom-ca-cert-",
suffix=".pem",
encoding="utf-8",
) as temp_file:
temp_file.write(ca_cert_file_text)
temp_file.flush()

os.environ["REQUESTS_CA_BUNDLE"] = temp_file.name
os.environ["CURL_CA_BUNDLE"] = temp_file.name
os.environ["SSL_CERT_FILE"] = temp_file.name

return Path(temp_file.name).absolute()


def configure_custom_http_proxy(
http_proxy_config: dict[str, str],
*,
logger: Logger,
proxy_url: Optional[str] = None,
ca_cert_file_text: Optional[str] = None,
) -> None:
"""Initialize the proxy environment variables.

If http_proxy_config is provided and contains proxy configuration settings,
this config will be used to configure the proxy.

If proxy_url and/or ca_cert_file_text are provided, they will override the values in
http_proxy_config.

The function will no-op if neither input option provides a proxy URL.
"""
proxy_url = proxy_url or http_proxy_config.get(PROXY_URL_CONFIG_KEY)
ca_cert_file_text = ca_cert_file_text or http_proxy_config.get(PROXY_CA_CERTIFICATE_CONFIG_KEY)

if proxy_url:
logger.info(f"Using custom proxy URL: {proxy_url}")

if ca_cert_file_text:
cert_file_path = _install_ca_certificate(ca_cert_file_text)
logger.info(f"Using custom installed CA certificate: {cert_file_path!s}")

os.environ["NO_PROXY"] = _get_no_proxy_string()
os.environ["HTTP_PROXY"] = proxy_url
os.environ["HTTPS_PROXY"] = proxy_url
94 changes: 94 additions & 0 deletions unit_tests/models/test_http_proxy_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

import pytest
from pydantic.v1 import ValidationError

from airbyte_cdk.models.http_proxy_config import HttpProxyConfig


class TestHttpProxyConfig:
def test_valid_config_with_required_fields_only(self):
config = HttpProxyConfig(proxy_url="http://proxy.example.com:8080")

assert config.proxy_url == "http://proxy.example.com:8080"
assert config.proxy_ca_certificate is None

def test_valid_config_with_all_fields(self):
test_cert = "-----BEGIN CERTIFICATE-----\ntest certificate\n-----END CERTIFICATE-----"
config = HttpProxyConfig(
proxy_url="https://proxy.example.com:8080", proxy_ca_certificate=test_cert
)

assert config.proxy_url == "https://proxy.example.com:8080"
assert config.proxy_ca_certificate == test_cert

def test_missing_required_proxy_url(self):
with pytest.raises(ValidationError) as exc_info:
HttpProxyConfig()

errors = exc_info.value.errors()
assert len(errors) == 1
assert errors[0]["loc"] == ("proxy_url",)
assert errors[0]["type"] == "value_error.missing"

def test_empty_proxy_url(self):
config = HttpProxyConfig(proxy_url="")
assert config.proxy_url == ""

def test_serialization(self):
test_cert = "-----BEGIN CERTIFICATE-----\ntest certificate\n-----END CERTIFICATE-----"
config = HttpProxyConfig(
proxy_url="https://proxy.example.com:8080", proxy_ca_certificate=test_cert
)

serialized = config.dict()
expected = {
"proxy_url": "https://proxy.example.com:8080",
"proxy_ca_certificate": test_cert,
}
assert serialized == expected

def test_serialization_exclude_none(self):
config = HttpProxyConfig(proxy_url="http://proxy.example.com:8080")

serialized = config.dict(exclude_none=True)
expected = {"proxy_url": "http://proxy.example.com:8080"}
assert serialized == expected

def test_json_serialization(self):
config = HttpProxyConfig(proxy_url="http://proxy.example.com:8080")

json_str = config.json()
assert '"proxy_url": "http://proxy.example.com:8080"' in json_str
assert '"proxy_ca_certificate": null' in json_str

def test_from_dict(self):
data = {"proxy_url": "https://proxy.example.com:8080", "proxy_ca_certificate": "test-cert"}

config = HttpProxyConfig(**data)
assert config.proxy_url == "https://proxy.example.com:8080"
assert config.proxy_ca_certificate == "test-cert"

def test_schema_generation(self):
schema = HttpProxyConfig.schema()

assert schema["type"] == "object"
assert "proxy_url" in schema["properties"]
assert "proxy_ca_certificate" in schema["properties"]

proxy_url_prop = schema["properties"]["proxy_url"]
assert proxy_url_prop["type"] == "string"
assert proxy_url_prop["title"] == "Proxy URL"

ca_cert_prop = schema["properties"]["proxy_ca_certificate"]
assert ca_cert_prop["type"] == "string"
assert ca_cert_prop["title"] == "Proxy CA Certificate"
assert ca_cert_prop.get("airbyte_secret") is True

def test_config_class_attributes(self):
config_class = HttpProxyConfig.Config
assert config_class.title == "HTTP Proxy Configuration"
assert (
config_class.description
== "Configuration for routing HTTP requests through a proxy server"
)
Loading
Loading