Skip to content

feat!: adapt to pydantic 2.0 #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3,434 changes: 1,949 additions & 1,485 deletions poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pydantic_ssm_settings/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .settings import AwsSsmSourceConfig
from .settings import AwsSsmSourceConfig, SsmSettingsConfigDict
from .source import AwsSsmSettingsSource

__all__ = ("AwsSsmSourceConfig",)
__all__ = ("AwsSsmSourceConfig", "SsmSettingsConfigDict", "AwsSsmSettingsSource")
__version__ = "0.2.4"
54 changes: 38 additions & 16 deletions pydantic_ssm_settings/settings.py
Original file line number Diff line number Diff line change
@@ -1,39 +1,61 @@
import logging
from typing import Tuple
from typing import Any, Tuple, Type

from pydantic.env_settings import (
from pydantic_settings import (
BaseSettings,
EnvSettingsSource,
InitSettingsSource,
PydanticBaseSettingsSource,
SecretsSettingsSource,
SettingsSourceCallable,
SettingsConfigDict,
)

from .source import AwsSsmSettingsSource

logger = logging.getLogger(__name__)


class AwsSsmSourceConfig:
@classmethod
def customise_sources(
cls,
class SsmSettingsConfigDict(SettingsConfigDict):
ssm_prefix: str


class BaseSettingsSsmWrapper(BaseSettings):
"""
Wrapper to store the _ssm_prefix parameter as an instance attribute.
Need a direct access to the attributes dictionary to avoid raising an AttributeError:
__pydantic_private__ exception
"""

def __init__(self, *args, _ssm_prefix: str = None, **kwargs: Any) -> None:
"""
Args:
_ssm_prefix: Prefix for all ssm parameters. Must be an absolute path,
separated by "/". NB:unlike its _env_prefix counterpart, _ssm_prefix
is treated case sensitively regardless of the _case_sensitive
parameter value.
"""
self.__dict__["__ssm_prefix"] = _ssm_prefix
super().__init__(self, *args, **kwargs)


class AwsSsmSourceConfig(BaseSettingsSsmWrapper):
def settings_customise_sources(
self,
settings_cls: Type[BaseSettings],
init_settings: InitSettingsSource,
env_settings: EnvSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: SecretsSettingsSource,
) -> Tuple[SettingsSourceCallable, ...]:

) -> Tuple[PydanticBaseSettingsSource, ...]:
ssm_settings = AwsSsmSettingsSource(
ssm_prefix=file_secret_settings.secrets_dir,
env_nested_delimiter=env_settings.env_nested_delimiter,
settings_cls=settings_cls,
ssm_prefix=self.__dict__["__ssm_prefix"],
)

return (
init_settings,
env_settings,
# Usurping the `secrets_dir` arg. We can't expect any other args to
# be passed to # the Settings module because Pydantic will complain
# about unexpected arguments. `secrets_dir` comes from `_secrets_dir`,
# one of the few special kwargs that Pydantic will allow:
# https://github.com/samuelcolvin/pydantic/blob/45db4ad3aa558879824a91dd3b011d0449eb2977/pydantic/env_settings.py#L33
dotenv_settings,
file_secret_settings,
ssm_settings,
)
223 changes: 109 additions & 114 deletions pydantic_ssm_settings/source.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import os
from __future__ import annotations as _annotations

import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, Mapping, Optional, Tuple
from typing import TYPE_CHECKING, Any

from botocore.exceptions import ClientError
from botocore.client import Config
import boto3

from pydantic import BaseSettings
from pydantic.typing import StrPath, get_origin, is_union
from pydantic.utils import deep_update
from pydantic.fields import ModelField
from botocore.client import Config
from botocore.exceptions import ClientError
from pydantic import BaseModel
from pydantic._internal._utils import lenient_issubclass
from pydantic.fields import FieldInfo
from pydantic_settings import BaseSettings
from pydantic_settings.sources import (
EnvSettingsSource,
)

if TYPE_CHECKING:
from mypy_boto3_ssm.client import SSMClient
Expand All @@ -23,16 +27,28 @@ class SettingsError(ValueError):
pass


class AwsSsmSettingsSource:
__slots__ = ("ssm_prefix", "env_nested_delimiter")

class AwsSsmSettingsSource(EnvSettingsSource):
def __init__(
self,
ssm_prefix: Optional[StrPath],
env_nested_delimiter: Optional[str] = None,
settings_cls: type[BaseSettings],
case_sensitive: bool = None,
ssm_prefix: str = None,
):
self.ssm_prefix: Optional[StrPath] = ssm_prefix
self.env_nested_delimiter: Optional[str] = env_nested_delimiter
# Ideally would retrieve ssm_prefix from self.config
# but need the superclass to be initialized for that
ssm_prefix_ = (
ssm_prefix
if ssm_prefix is not None
else settings_cls.model_config.get("ssm_prefix", "/")
)
super().__init__(
settings_cls,
case_sensitive=case_sensitive,
env_prefix=ssm_prefix_,
env_nested_delimiter="/", # SSM only accepts / as a delimiter
)
self.ssm_prefix = ssm_prefix_
assert self.ssm_prefix == self.env_prefix

@property
def client(self) -> "SSMClient":
Expand All @@ -43,124 +59,103 @@ def client_config(self) -> Config:
timeout = float(os.environ.get("SSM_TIMEOUT", 0.5))
return Config(connect_timeout=timeout, read_timeout=timeout)

def load_from_ssm(self, secrets_path: Path, case_sensitive: bool):

if not secrets_path.is_absolute():
def _load_env_vars(
self,
):
"""
Access env_prefix instead of ssm_prefix
"""
if not Path(self.env_prefix).is_absolute():
raise ValueError("SSM prefix must be absolute path")

logger.debug(f"Building SSM settings with prefix of {secrets_path=}")
logger.debug(f"Building SSM settings with prefix of {self.env_prefix=}")

output = {}
try:
paginator = self.client.get_paginator("get_parameters_by_path")
response_iterator = paginator.paginate(
Path=str(secrets_path), WithDecryption=True
Path=self.env_prefix, WithDecryption=True, Recursive=True
)

for page in response_iterator:
for parameter in page["Parameters"]:
key = Path(parameter["Name"]).relative_to(secrets_path).as_posix()
output[key if case_sensitive else key.lower()] = parameter["Value"]
key = (
Path(parameter["Name"]).relative_to(self.env_prefix).as_posix()
)
output[
self.env_prefix + key
if self.case_sensitive
else self.env_prefix.lower() + key.lower()
] = parameter["Value"]

except ClientError:
logger.exception("Failed to get parameters from %s", secrets_path)
logger.exception("Failed to get parameters from %s", self.env_prefix)

return output

def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
"""
Returns SSM values for all settings.
"""
d: Dict[str, Optional[Any]] = {}

if self.ssm_prefix is None:
return d

ssm_values = self.load_from_ssm(
secrets_path=Path(self.ssm_prefix),
case_sensitive=settings.__config__.case_sensitive,
)
def __repr__(self) -> str:
return f"AwsSsmSettingsSource(ssm_prefix={self.env_prefix!r})"

# The following was lifted from https://github.com/samuelcolvin/pydantic/blob/a21f0763ee877f0c86f254a5d60f70b1002faa68/pydantic/env_settings.py#L165-L237 # noqa
for field in settings.__fields__.values():
env_val: Optional[str] = None
for env_name in field.field_info.extra["env_names"]:
env_val = ssm_values.get(env_name)
if env_val is not None:
break

is_complex, allow_json_failure = self._field_is_complex(field)
if is_complex:
if env_val is None:
# field is complex but no value found so far, try explode_env_vars
env_val_built = self._explode_ssm_values(field, ssm_values)
if env_val_built:
d[field.alias] = env_val_built
else:
# field is complex and there's a value, decode that as JSON, then
# add explode_env_vars
try:
env_val = settings.__config__.json_loads(env_val)
except ValueError as e:
if not allow_json_failure:
raise SettingsError(
f'error parsing JSON for "{env_name}"'
) from e

if isinstance(env_val, dict):
d[field.alias] = deep_update(
env_val, self._explode_ssm_values(field, ssm_values)
)
else:
d[field.alias] = env_val
elif env_val is not None:
# simplest case, field is not complex, we only need to add the
# value if it was found
d[field.alias] = env_val

return d

def _field_is_complex(self, field: ModelField) -> Tuple[bool, bool]:
"""
Find out if a field is complex, and if so whether JSON errors should be ignored
def get_field_value(
self, field: FieldInfo, field_name: str
) -> tuple[Any, str, bool]:
"""
if field.is_complex():
allow_json_failure = False
elif (
is_union(get_origin(field.type_))
and field.sub_fields
and any(f.is_complex() for f in field.sub_fields)
):
allow_json_failure = True
else:
return False, False
Gets the value for field from environment variables and a flag to
determine whether value is complex.

return True, allow_json_failure
Args:
field: The field.
field_name: The field name.

def _explode_ssm_values(
self, field: ModelField, env_vars: Mapping[str, Optional[str]]
) -> Dict[str, Any]:
Returns:
A tuple contains the key, value if the file exists otherwise `None`, and
a flag to determine whether value is complex.
"""
Process env_vars and extract the values of keys containing
env_nested_delimiter into nested dictionaries.

This is applied to a single field, hence filtering by env_var prefix.
"""
prefixes = [
f"{env_name}{self.env_nested_delimiter}"
for env_name in field.field_info.extra["env_names"]
]
result: Dict[str, Any] = {}
for env_name, env_val in env_vars.items():
if not any(env_name.startswith(prefix) for prefix in prefixes):
continue
_, *keys, last_key = env_name.split(self.env_nested_delimiter)
env_var = result
for key in keys:
env_var = env_var.setdefault(key, {})
env_var[last_key] = env_val

return result
# env_name = /asdf/foo
# env_vars = {foo:xyz}
env_val: str | None = None
for field_key, env_name, value_is_complex in self._extract_field_info(
field, field_name
):
env_val = self.env_vars.get(env_name)
if env_val is not None:
break

return env_val, field_key, value_is_complex

def __call__(self) -> dict[str, Any]:
data: dict[str, Any] = {}

for field_name, field in self.settings_cls.model_fields.items():
try:
field_value, field_key, value_is_complex = self.get_field_value(
field, field_name
)
except Exception as e:
raise SettingsError(
f'error getting value for field "{field_name}" from source "{self.__class__.__name__}"' # noqa
) from e

try:
field_value = self.prepare_field_value(
field_name, field, field_value, value_is_complex
)
except ValueError as e:
raise SettingsError(
f'error parsing value for field "{field_name}" from source "{self.__class__.__name__}"' # noqa
) from e

if field_value is not None:
if (
not self.case_sensitive
and lenient_issubclass(field.annotation, BaseModel)
and isinstance(field_value, dict)
):
data[field_key] = self._replace_field_names_case_insensitively(
field, field_value
)
else:
data[field_key] = field_value

def __repr__(self) -> str:
return f"AwsSsmSettingsSource(ssm_prefix={self.ssm_prefix!r})"
return data
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ requires = ["poetry>=0.12"]
[tool.poetry]
authors = ["Anthony Lukach <[email protected]>"]
description = "Replace Pydantic's builtin Secret Support with a configuration provider that loads parameters from AWS Systems Manager Parameter Store."
homepage = ""
license = "MIT"
maintainers = ["Anthony Lukach <[email protected]>"]
name = "pydantic-ssm-settings"
Expand All @@ -15,8 +14,9 @@ version = "0.2.4"

[tool.poetry.dependencies]
"boto3" = "^1.21.45"
pydantic = "^1.6.2"
python = "^3.7"
pydantic = "^2.5.2"
python = "^3.9"
pydantic-settings = ">=2.1.0"

[tool.poetry.dev-dependencies]
black = "^22.3.0"
Expand Down
Loading