Skip to content

Commit 40521c8

Browse files
authored
Re-use connections when remote S3 signing (#2543)
# Rationale for this change The existing S3 remote signing hook function (`s3v4_rest_signer`) uses `requests.post` to submit `POST` requests to the REST signing endpoint. This internally creates a new `requests.Session` for every request, preventing any reuse of connections. In my profiling I saw this add overhead from repeated loading of CA certs and reestablishing of TLS connections. This change makes the signing function a callable object that wraps a `request.Session`, using this for `POST`ing, therefore achieving connection reuse. Signer callables are stored on the hook internals of the `aiobotocore` client inside the `s3fs.S3FileSystem` instance, so use and lifetime will match that of those instances. They are thread-local since: #2495. ## Are these changes tested? Tested locally. Existing unit tests updated for changes. ## Are there any user-facing changes? Yes - S3 signing requests now use connection pools (scoped to the `s3fs.S3FileSystem` object).
1 parent 8c545d9 commit 40521c8

File tree

2 files changed

+62
-37
lines changed

2 files changed

+62
-37
lines changed

pyiceberg/io/fsspec.py

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,20 @@
1616
# under the License.
1717
"""FileIO implementation for reading and writing table files that uses fsspec compatible filesystems."""
1818

19+
import abc
1920
import errno
2021
import json
2122
import logging
2223
import os
2324
import threading
2425
from copy import copy
25-
from functools import lru_cache, partial
26+
from functools import lru_cache
2627
from typing import (
2728
TYPE_CHECKING,
2829
Any,
2930
Callable,
3031
Dict,
32+
Type,
3133
Union,
3234
)
3335
from urllib.parse import urlparse
@@ -96,38 +98,58 @@
9698
from botocore.awsrequest import AWSRequest
9799

98100

99-
def s3v4_rest_signer(properties: Properties, request: "AWSRequest", **_: Any) -> "AWSRequest":
100-
signer_url = properties.get(S3_SIGNER_URI, properties[URI]).rstrip("/") # type: ignore
101-
signer_endpoint = properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
101+
class S3RequestSigner(abc.ABC):
102+
"""Abstract base class for S3 request signers."""
102103

103-
signer_headers = {}
104-
if token := properties.get(TOKEN):
105-
signer_headers = {"Authorization": f"Bearer {token}"}
106-
signer_headers.update(get_header_properties(properties))
104+
properties: Properties
107105

108-
signer_body = {
109-
"method": request.method,
110-
"region": request.context["client_region"],
111-
"uri": request.url,
112-
"headers": {key: [val] for key, val in request.headers.items()},
113-
}
106+
def __init__(self, properties: Properties) -> None:
107+
self.properties = properties
108+
109+
@abc.abstractmethod
110+
def __call__(self, request: "AWSRequest", **_: Any) -> None:
111+
pass
112+
113+
114+
class S3V4RestSigner(S3RequestSigner):
115+
"""An S3 request signer that uses an external REST signing service to sign requests."""
116+
117+
_session: requests.Session
114118

115-
response = requests.post(f"{signer_url}/{signer_endpoint.strip()}", headers=signer_headers, json=signer_body)
116-
try:
117-
response.raise_for_status()
118-
response_json = response.json()
119-
except HTTPError as e:
120-
raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e
119+
def __init__(self, properties: Properties) -> None:
120+
super().__init__(properties)
121+
self._session = requests.Session()
121122

122-
for key, value in response_json["headers"].items():
123-
request.headers.add_header(key, ", ".join(value))
123+
def __call__(self, request: "AWSRequest", **_: Any) -> None:
124+
signer_url = self.properties.get(S3_SIGNER_URI, self.properties[URI]).rstrip("/") # type: ignore
125+
signer_endpoint = self.properties.get(S3_SIGNER_ENDPOINT, S3_SIGNER_ENDPOINT_DEFAULT)
126+
127+
signer_headers = {}
128+
if token := self.properties.get(TOKEN):
129+
signer_headers = {"Authorization": f"Bearer {token}"}
130+
signer_headers.update(get_header_properties(self.properties))
131+
132+
signer_body = {
133+
"method": request.method,
134+
"region": request.context["client_region"],
135+
"uri": request.url,
136+
"headers": {key: [val] for key, val in request.headers.items()},
137+
}
138+
139+
response = self._session.post(f"{signer_url}/{signer_endpoint.strip()}", headers=signer_headers, json=signer_body)
140+
try:
141+
response.raise_for_status()
142+
response_json = response.json()
143+
except HTTPError as e:
144+
raise SignError(f"Failed to sign request {response.status_code}: {signer_body}") from e
124145

125-
request.url = response_json["uri"]
146+
for key, value in response_json["headers"].items():
147+
request.headers.add_header(key, ", ".join(value))
126148

127-
return request
149+
request.url = response_json["uri"]
128150

129151

130-
SIGNERS: Dict[str, Callable[[Properties, "AWSRequest"], "AWSRequest"]] = {"S3V4RestSigner": s3v4_rest_signer}
152+
SIGNERS: Dict[str, Type[S3RequestSigner]] = {"S3V4RestSigner": S3V4RestSigner}
131153

132154

133155
def _file(_: Properties) -> LocalFileSystem:
@@ -145,13 +167,13 @@ def _s3(properties: Properties) -> AbstractFileSystem:
145167
"region_name": get_first_property_value(properties, S3_REGION, AWS_REGION),
146168
}
147169
config_kwargs = {}
148-
register_events: Dict[str, Callable[[Properties], None]] = {}
170+
register_events: Dict[str, Callable[[AWSRequest], None]] = {}
149171

150172
if signer := properties.get(S3_SIGNER):
151173
logger.info("Loading signer %s", signer)
152-
if signer_func := SIGNERS.get(signer):
153-
signer_func_with_properties = partial(signer_func, properties)
154-
register_events["before-sign.s3"] = signer_func_with_properties
174+
if signer_cls := SIGNERS.get(signer):
175+
signer = signer_cls(properties)
176+
register_events["before-sign.s3"] = signer
155177

156178
# Disable the AWS Signer
157179
from botocore import UNSIGNED

tests/io/test_fsspec.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131

3232
from pyiceberg.exceptions import SignError
3333
from pyiceberg.io import fsspec
34-
from pyiceberg.io.fsspec import FsspecFileIO, s3v4_rest_signer
34+
from pyiceberg.io.fsspec import FsspecFileIO, S3V4RestSigner
3535
from pyiceberg.io.pyarrow import PyArrowFileIO
3636
from pyiceberg.typedef import Properties
3737
from tests.conftest import UNIFIED_AWS_SESSION_PROPERTIES
@@ -844,10 +844,11 @@ def test_s3v4_rest_signer(requests_mock: Mocker) -> None:
844844
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
845845
}
846846

847-
signed_request = s3v4_rest_signer({"token": "abc", "uri": TEST_URI, "header.X-Custom-Header": "value"}, request)
847+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "header.X-Custom-Header": "value"})
848+
signer(request)
848849

849-
assert signed_request.url == new_uri
850-
assert dict(signed_request.headers) == {
850+
assert request.url == new_uri
851+
assert dict(request.headers) == {
851852
"Authorization": "AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02",
852853
"Host": "bucket.s3.us-west-2.amazonaws.com",
853854
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
@@ -898,10 +899,11 @@ def test_s3v4_rest_signer_endpoint(requests_mock: Mocker) -> None:
898899
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
899900
}
900901

901-
signed_request = s3v4_rest_signer({"token": "abc", "uri": TEST_URI, "s3.signer.endpoint": endpoint}, request)
902+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI, "s3.signer.endpoint": endpoint})
903+
signer(request)
902904

903-
assert signed_request.url == new_uri
904-
assert dict(signed_request.headers) == {
905+
assert request.url == new_uri
906+
assert dict(request.headers) == {
905907
"Authorization": "AWS4-HMAC-SHA256 Credential=ASIAQPRZZYGHUT57DL3I/20221017/us-west-2/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, Signature=430582a17d61ab02c272896fa59195f277af4bdf2121c441685e589f044bbe02",
906908
"Host": "bucket.s3.us-west-2.amazonaws.com",
907909
"User-Agent": "Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0",
@@ -939,8 +941,9 @@ def test_s3v4_rest_signer_forbidden(requests_mock: Mocker) -> None:
939941
"retries": {"attempt": 1, "invocation-id": "75d143fb-0219-439b-872c-18213d1c8d54"},
940942
}
941943

944+
signer = S3V4RestSigner(properties={"token": "abc", "uri": TEST_URI})
942945
with pytest.raises(SignError) as exc_info:
943-
_ = s3v4_rest_signer({"token": "abc", "uri": TEST_URI}, request)
946+
signer(request)
944947

945948
assert (
946949
"""Failed to sign request 401: {'method': 'HEAD', 'region': 'us-west-2', 'uri': 'https://bucket/metadata/snap-8048355899640248710-1-a5c8ea2d-aa1f-48e8-89f4-1fa69db8c742.avro', 'headers': {'User-Agent': ['Botocore/1.27.59 Python/3.10.7 Darwin/21.5.0']}}"""

0 commit comments

Comments
 (0)