1
+ import os
1
2
import logging
2
3
from pathlib import Path
3
4
from typing import TYPE_CHECKING , Any , Dict , Optional , Union
4
5
6
+ from botocore .exceptions import ClientError
7
+ from botocore .client import Config
5
8
import boto3
6
9
7
10
from pydantic import BaseSettings , typing
@@ -21,7 +24,12 @@ def __init__(self, ssm_prefix: Union[typing.StrPath, None]):
21
24
22
25
@property
23
26
def client (self ) -> "SSMClient" :
24
- return boto3 .client ("ssm" )
27
+ return boto3 .client ("ssm" , config = self .client_config )
28
+
29
+ @property
30
+ def client_config (self ):
31
+ timeout = float (os .environ .get ("SSM_TIMEOUT" , 0.5 ))
32
+ return Config (connect_timeout = timeout , read_timeout = timeout )
25
33
26
34
def __call__ (self , settings : BaseSettings ) -> Dict [str , Any ]:
27
35
"""
@@ -39,9 +47,13 @@ def __call__(self, settings: BaseSettings) -> Dict[str, Any]:
39
47
40
48
logger .debug (f"Building SSM settings with prefix of { secrets_path = } " )
41
49
42
- params = self .client .get_parameters_by_path (
43
- Path = str (secrets_path ), WithDecryption = True
44
- )["Parameters" ]
50
+ try :
51
+ params = self .client .get_parameters_by_path (
52
+ Path = str (secrets_path ), WithDecryption = True
53
+ )["Parameters" ]
54
+ except ClientError :
55
+ logger .exception ("Failed to get parameters from %s" , secrets_path )
56
+ return {}
45
57
46
58
return {
47
59
str (Path (param ["Name" ]).relative_to (secrets_path )): param ["Value" ]
0 commit comments