Skip to content

feat: Add CLPRemoteHandler to handle Multipart Upload to AWS S3 bucket #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7bc1e8a
Create class CLPRemoteHandler
IreneLime Jan 3, 2025
0ae4a2a
Create multipart upload initiation
IreneLime Jan 3, 2025
8895b37
Create function that performs multipart upload on 5mb segments
IreneLime Jan 3, 2025
7475ed5
Create function that completes multipart upload
IreneLime Jan 3, 2025
96a831b
Handle corner case: file rotation when part number exceeds 10000
IreneLime Jan 3, 2025
055eaf0
Integrate timeout functionalities for CLPLogLevelTimeout usage and cl…
IreneLime Jan 3, 2025
b32b5f2
Avoid new upload initialization when previous upload has not completed
IreneLime Jan 3, 2025
ed3a57b
Ensure consistent string quotation
IreneLime Jan 3, 2025
964ae20
Remove print statements and fix aws segment limitation error
IreneLime Jan 5, 2025
1fa0163
Modify code to pass typing tests.
IreneLime Jan 5, 2025
85cac30
Code format & linting test
IreneLime Jan 5, 2025
065c28c
Redesign CLPS3Handler architecture and complete a functional draft of…
IreneLime Feb 9, 2025
828897e
Add aws credential configuration
IreneLime Mar 17, 2025
2267849
Add rotation after 10000 parts
IreneLime Mar 17, 2025
ba5c3dd
Fix part limit rotation
IreneLime Mar 18, 2025
a9ec06d
Fix aws credential checking
IreneLime Mar 18, 2025
0aef71d
Enable sha256 checksum on multipart upload
IreneLime Mar 18, 2025
4839528
Add error checking throughout the upload process
IreneLime Mar 18, 2025
8ab75d0
Define macros and add options to specify multipart upload size
IreneLime Mar 21, 2025
c273d0c
Always define remote file index
IreneLime Mar 21, 2025
b737d92
Use unix timestamp on file name
IreneLime Mar 21, 2025
8f94e68
Add configurable parameter on users's s3 directory
IreneLime Mar 21, 2025
b70df58
Updated pyproject.toml to include new dev and testing dependencies fo…
li-ruihao Mar 21, 2025
587d6cb
Modify naming of non-interface level variables
IreneLime Mar 25, 2025
9c8653a
Remove duplicate code
IreneLime Mar 25, 2025
be9bcba
Add PutObject option to flush to remote
IreneLime Mar 25, 2025
cfc7f6d
Merge branch 'main' into remote-handler
IreneLime Mar 25, 2025
8744baa
Add comments and error checking
IreneLime Mar 25, 2025
238d625
Add comments to explain parameters
IreneLime Mar 26, 2025
2c0a34a
Maintain consistent naming in S3 handler
IreneLime Mar 26, 2025
b448143
Fix object key naming issue
IreneLime Mar 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@ classifiers = [
[project.optional-dependencies]
dev = [
"black >= 24.4.0",
"boto3 >= 1.37.18",
"botocore >= 1.37.18",
"build >= 0.8.0",
"docformatter >= 1.7.5",
"mypy >= 1.9.0",
"ruff >= 0.3.7",
"types-python-dateutil >= 2.8.19.2",
]
test = [
"moto >= 5.1.1",
"smart_open == 6.4.0",
]

Expand Down
297 changes: 296 additions & 1 deletion src/clp_logging/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,23 @@
from signal import SIGINT, signal, SIGTERM
from threading import RLock, Thread, Timer
from types import FrameType
from typing import Any, Callable, ClassVar, Dict, IO, Optional, Tuple, Union
from typing import Any, Callable, ClassVar, Dict, List, IO, Optional, Tuple, Union


import tzlocal
from clp_ffi_py.ir import FourByteEncoder, Serializer
from clp_ffi_py.utils import serialize_dict_to_msgpack
from zstandard import FLUSH_FRAME, ZstdCompressionWriter, ZstdCompressor

import base64
import boto3
import botocore
import datetime
import hashlib
import io

from clp_logging.auto_generated_kv_pairs_utils import AutoGeneratedKeyValuePairsBuffer

from clp_logging.protocol import (
BYTE_ORDER,
EOF_CHAR,
Expand All @@ -39,6 +48,11 @@
AUTO_GENERATED_KV_PAIRS_KEY: str = "auto_generated_kv_pairs"
USER_GENERATED_KV_PAIRS_KEY: str = "user_generated_kv_pairs"

# Define the multipart upload size limits
MIN_UPLOAD_PART_SIZE = 5 * 1024 * 1024 # 5 MB
MAX_UPLOAD_PART_SIZE = 5 * 1024 * 1024 * 1024 # 5 GB
MAX_PART_NUM_PER_UPLOAD = 10000


def _init_timeinfo(fmt: Optional[str], tz: Optional[str]) -> Tuple[str, str]:
"""
Expand Down Expand Up @@ -859,10 +873,12 @@ class ClpKeyValuePairStreamHandler(logging.Handler):
:param stream: A writable byte output stream to which the handler will write the serialized IR
byte sequences.
:param enable_compression: Whether to compress the serialized IR byte sequences using Zstandard.

"""

def __init__(
self,

stream: IO[bytes],
enable_compression: bool = True,
) -> None:
Expand Down Expand Up @@ -996,3 +1012,282 @@ def _serialize_kv_pair_log_event(
serialize_dict_to_msgpack(auto_gen_kv_pairs),
serialize_dict_to_msgpack(user_gen_kv_pairs),
)


class CLPS3Handler(CLPBaseHandler):
"""
Log is written to stream in CLP IR encoding, and uploaded to s3_bucket

:param s3_bucket: S3 bucket to upload CLP encoded log messages to
:param stream: Target stream to write log messages to
:param enable_compression: Option to enable/disable stream compression
Default: True
:param timestamp_format: Timestamp format written in preamble to be
used when generating the logs with a reader.
:param timezone: Timezone written in preamble to be used when
generating the timestamp from Unix epoch time.
:param aws_access_key_id: User's public access key for the S3 bucket.
:param aws_secret_access_key: User's private access key for the S3 bucket.
:param s3_directory: S3 remote directory to upload objects to.
:param use_multipart_upload: Option to use multipart upload to upload
stream segments or use PutObject to upload the entire buffer.
Default: True
:param max_part_num: Maximum number of parts allowed for a multipart upload
session before uploading to a new object. Default: 10000
:param upload_part_size: Maximum size of a part in a multipart upload
session before writing to a new part. Default: 5MB
"""

def __init__(
self,
s3_bucket: str,
stream: Optional[IO[bytes]] = None,
enable_compression: bool = True,
timestamp_format: Optional[str] = None,
timezone: Optional[str] = None,
aws_access_key_id: Optional[str] = None,
aws_secret_access_key: Optional[str] = None,
s3_directory: Optional[str] = None,
use_multipart_upload: Optional[bool] = True,
max_part_num: Optional[int] = None,
upload_part_size: Optional[int] = MIN_UPLOAD_PART_SIZE
) -> None:
super().__init__()
self.closed: bool = False
self.enable_compression: bool = enable_compression
self._local_buffer: io.BytesIO = io.BytesIO()
if stream is None:
stream = self._local_buffer
self._ostream: IO[bytes] = stream
self.timestamp_format: str
self.timezone: str
self.timestamp_format, self.timezone = _init_timeinfo(timestamp_format, timezone)
self._init_stream(stream)

# Configure s3-related variables
self.s3_bucket: str = s3_bucket
try:
self._s3_client = boto3.client(
"s3",
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
) if aws_access_key_id and aws_secret_access_key else boto3.client("s3")
except botocore.exceptions.NoCredentialsError:
raise RuntimeError("AWS credentials not found. Please configure your credentials.")
except botocore.exceptions.ClientError as e:
raise RuntimeError(f"Failed to initialize AWS client: {e}")
self._remote_folder_path: Optional[str] = None
self._remote_file_count: int = 1
self._start_timestamp: datetime = datetime.datetime.now()
self.s3_directory: str = (s3_directory.rstrip('/') + '/') if s3_directory else ''
self._obj_key: str = self._remote_log_naming()

self.use_multipart_upload = use_multipart_upload
if self.use_multipart_upload:
# Configure size limit of a part in multipart upload
self.upload_part_size: int
if MIN_UPLOAD_PART_SIZE <= upload_part_size <= MAX_UPLOAD_PART_SIZE:
self.upload_part_size = upload_part_size
else:
raise RuntimeError(
f"Invalid upload_part_size: {upload_part_size}. "
f"It must be between {MIN_UPLOAD_PART_SIZE} and {MAX_UPLOAD_PART_SIZE}."
)
self.max_part_num: int = max_part_num if max_part_num else MAX_PART_NUM_PER_UPLOAD
self._uploaded_parts: List[Dict[str, int | str]] = []
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix type annotation for Python 3.7+ compatibility.

The type annotation uses the pipe operator (|) which is only supported in Python 3.10+. This needs to be fixed to maintain compatibility with Python 3.7+ as specified in the project requirements.

-            self._uploaded_parts: List[Dict[str, int | str]] = []
+            self._uploaded_parts: List[Dict[str, Union[int, str]]] = []
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self._uploaded_parts: List[Dict[str, int | str]] = []
self._uploaded_parts: List[Dict[str, Union[int, str]]] = []

self._upload_index: int = 1
create_ret: Dict[str, Any] = self._s3_client.create_multipart_upload(
Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256"
)
self._upload_id: int = create_ret["UploadId"]
if not self._upload_id or not isinstance(self._upload_id, str):
raise RuntimeError("Failed to obtain a valid Upload ID from S3.")

def _init_stream(self, stream: IO[bytes]) -> None:
"""
Initialize and configure output stream

:param stream: Target stream to write log messages to
"""
self.cctx: ZstdCompressor = ZstdCompressor()
self._ostream: Union[ZstdCompressionWriter, IO[bytes]] = (
self.cctx.stream_writer(self._local_buffer) if self.enable_compression else stream
)
self.last_timestamp_ms: int = floor(time.time() * 1000) # convert to ms and truncate
self._ostream.write(
FourByteEncoder.encode_preamble(
self.last_timestamp_ms, self.timestamp_format, self.timezone
)
)

def _remote_log_naming(self) -> str:
"""
Set the name of the target S3 object key to upload to
"""
self._remote_folder_path: str = f"{self.s3_directory}{self._start_timestamp.year}/{self._start_timestamp.month}/{self._start_timestamp.day}"

new_filename: str
upload_time: str = str(int(self._start_timestamp.timestamp()))

file_count: str = f"-{self._remote_file_count}"

# Compression uses zstd format
if self.enable_compression:
new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp.zst"
else:
new_filename = f"{self._remote_folder_path}/{upload_time}_log{file_count}.clp"
return new_filename

# override
def _write(self, loglevel: int, msg: str) -> None:
"""
Write the log message stream into a local buffer.
(With use_multipart_upload) Update the part number if the local buffer
exceeds a predetermined buffer size. Then clear the local buffer.
"""
if self.closed:
raise RuntimeError("Stream already closed")
clp_msg: bytearray
clp_msg, self.last_timestamp_ms = _encode_log_event(msg, self.last_timestamp_ms)

# Write log stream to a local buffer and flush to upload
self._ostream.write(clp_msg)
if not self.use_multipart_upload:
self._ostream.write(EOF_CHAR)
self._flush()

if self.use_multipart_upload and self._local_buffer.tell() >= self.upload_part_size:
# Rotate after maximum number of parts
if self._upload_index >= self.max_part_num:
self._complete_multipart_upload()
self._ostream.close()
self._local_buffer = io.BytesIO()
self._init_stream(self._local_buffer)
self._remote_file_count += 1
self._obj_key = self._remote_log_naming()
self._uploaded_parts = []
self._upload_index = 1
create_ret = self._s3_client.create_multipart_upload(
Bucket=self.s3_bucket, Key=self._obj_key, ChecksumAlgorithm="SHA256"
)
self._upload_id = create_ret["UploadId"]
if not self._upload_id:
raise RuntimeError("Failed to initialize new upload ID.")
else:
self._upload_index += 1
self._local_buffer.seek(0)
self._local_buffer.truncate(0)


def _flush(self) -> None:
"""
Upload local buffer to the S3 bucket using upload_part if
use_multipart_upload = True, otherwise use put_object.
"""
self._ostream.flush()
data: bytes = self._local_buffer.getvalue()
sha256_checksum: str = base64.b64encode(hashlib.sha256(data).digest()).decode('utf-8')

if self.use_multipart_upload:
try:
response: Dict[str, Any] = self._s3_client.upload_part(
Bucket=self.s3_bucket,
Key=self._obj_key,
Body=data,
PartNumber=self._upload_index,
UploadId=self._upload_id,
ChecksumSHA256=sha256_checksum,
)

# Verify integrity of the uploaded part using SHA256 Checksum
if response["ChecksumSHA256"] != sha256_checksum:
raise ValueError(f"Checksum mismatch for part {self._upload_index}. Upload aborted.")

# Store both ETag and SHA256 for validation
upload_status: Dict[str, int | str] = {
"PartNumber": self._upload_index,
"ETag": response["ETag"],
"ChecksumSHA256": response["ChecksumSHA256"],
}

# Determine the part to which the new upload_status belongs
if len(self._uploaded_parts) > self._upload_index - 1:
self._uploaded_parts[self._upload_index-1] = upload_status
else:
self._uploaded_parts.append(upload_status)

except Exception as e:
self._s3_client.abort_multipart_upload(
Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id
)
raise Exception(
f'Multipart Upload on Part {self._upload_index}: {e}'
) from e
else:
self._ostream.write(EOF_CHAR)
try:
self._s3_client.put_object(
Bucket=self.s3_bucket,
Key=self._obj_key,
Body=data,
ContentEncoding='zstd' if self.enable_compression else 'binary',
ChecksumSHA256=sha256_checksum
)

# Verify integrity of the upload using SHA256 Checksum
response: Dict[str, Any] = self._s3_client.head_object(
Bucket=self.s3_bucket,
Key=self._obj_key
)
if 'ChecksumSHA256' in response:
s3_checksum: str = response['ChecksumSHA256']
if s3_checksum != sha256_checksum:
raise ValueError(f"Checksum mismatch. Upload aborted.")

except Exception as e:
raise Exception(f'Failed to upload using PutObject: {e}')

def _complete_multipart_upload(self) -> None:
"""
Complete a multipart upload session and clear the local buffer.
"""
# Flush EOF marker to the local buffer and upload
self._ostream.write(EOF_CHAR)
self._flush()
self._local_buffer.seek(0)
self._local_buffer.truncate(0)

try:
self._s3_client.complete_multipart_upload(
Bucket=self.s3_bucket,
Key=self._obj_key,
UploadId=self._upload_id,
MultipartUpload={
"Parts": [
{
"PartNumber": part["PartNumber"],
"ETag": part["ETag"],
"ChecksumSHA256": part["ChecksumSHA256"],
}
for part in self._uploaded_parts
]
},
)
except Exception as e:
self._s3_client.abort_multipart_upload(
Bucket=self.s3_bucket, Key=self._obj_key, UploadId=self._upload_id
)
raise Exception(
f'Multipart Upload on Part {self._upload_index}: {e}'
) from e

# override
def close(self) -> None:
"""
Complete the upload if needed. Close the stream and the handler.
"""
if self.use_multipart_upload:
self._complete_multipart_upload()
self._ostream.close()
self.closed = True
super().close()
Comment on lines +1285 to +1293
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add unit tests for the new S3 handler.

There are no unit tests for this new handler class. Consider adding comprehensive tests to verify its functionality, especially for the S3 integration.


🌐 Web query:

How to mock AWS S3 services for unit testing in Python?

💡 Result:

To mock AWS S3 services for Python unit testing, three robust approaches are commonly used, each with distinct advantages and use cases:


1. Moto Library (Simple Mocking)

Moto provides high-level AWS service mocking with minimal setup.
Example using pytest fixture [1][4][9]:

from moto import mock_aws
import boto3
import pytest

@mock_aws  # Mocks all AWS services by default
def test_s3_upload():
    s3 = boto3.client("s3")
    s3.create_bucket(Bucket="test-bucket")
    
    # Test S3 operations
    s3.put_object(Bucket="test-bucket", Key="test.txt", Body="data")
    obj = s3.get_object(Bucket="test-bucket", Key="test.txt")
    assert obj["Body"].read().decode() == "data"

Pros:

  • Simple decorator/context manager syntax
  • Maintains state between operations (e.g., buckets persist across calls)
  • Supports 130+ AWS services beyond S3 [4][9]

2. Botocore Stubs (Granular Control)

For precise control over API responses using AWS's native testing tools [1][6]:

from botocore.stub import Stubber
import boto3

def test_s3_download():
    s3 = boto3.client("s3")
    stubber = Stubber(s3)
    
    # Program expected response
    stubber.add_response(
        "get_object",
        {"Body": StreamingBody(BytesIO(b"test"), len(b"test"))},
        {"Bucket": "test-bucket", "Key": "test.txt"}
    )
    
    with stubber:
        response = s3.get_object(Bucket="test-bucket", Key="test.txt")
        assert response["Body"].read() == b"test"

Pros:

  • Exact request/response validation
  • No external dependencies
  • Ideal for testing error handling and edge cases

3. LocalStack (Realistic Environment)

For integration-style testing with a full local S3 implementation [1][10]:

# docker-compose.yml
services:
  localstack:
    image: localstack/localstack
    ports:
      - "4566:4566"  # S3 endpoint
    environment:
      - SERVICES=s3

Test setup:

import boto3

def test_with_localstack():
    s3 = boto3.client(
        "s3",
        endpoint_url="http://localhost:4566",
        aws_access_key_id="test",
        aws_secret_access_key="test"
    )
    s3.create_bucket(Bucket="test-bucket")
    # Regular S3 operations...

Pros:

  • 100% API-compatible S3 implementation
  • Supports advanced features like versioning and lifecycle rules
  • Can test multi-service workflows [3][9]

Key Considerations

Factor Moto Botocore LocalStack
Speed ⚡️ Fastest ⚡️ Fast 🐢 Slowest
Realism Medium Low High
Network Dependency None None Required
Setup Complexity Low Medium High

Best Practices [7][9]:

  • Use Moto for most unit tests
  • Combine with pytest.mark.parametrize for data-driven tests
  • Employ LocalStack for pre-deployment integration testing
  • Disable network access during tests using pytest-socket

For full test examples and configuration details, see the Moto documentation and LocalStack guides.

Citations:


Action Required: Add Comprehensive Unit Tests for the S3 Handler

The implementation in src/clp_logging/handlers.py (lines 1279–1287) correctly finalises the handler’s close sequence. However, there are no tests validating its S3 integration behaviours—especially around multipart uploads. Please add unit tests that cover:

  • Verifying that _complete_multipart_upload() is called when use_multipart_upload is enabled.
  • Ensuring that the output stream is properly closed and the handler’s internal state is updated.
  • Testing AWS S3 interactions using a suitable mocking strategy.

Consider using the Moto library for its simple, decorator-based approach to simulating AWS S3, or Botocore Stubs for precise control over AWS API responses. Alternatively, LocalStack can be used for integration-style testing, though it typically requires a more extensive setup.

Comment on lines +1017 to +1293
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Add unit tests for the new S3 handler.

While the implementation looks solid, there are no unit tests for this new class. Consider adding comprehensive tests to verify the functionality, especially for the S3 integration, using moto or a similar library to mock AWS services.

I can help you with creating unit tests for this class. Here's a sample structure to consider:


🏁 Script executed:

#!/bin/bash
# Search for existing test patterns in the codebase
echo "Searching for existing test files..."
find . -type f -name "test_*.py" | head -5

Length of output: 152


Action: Add Unit Tests for the New S3 Handler

The new CLPS3Handler implementation currently lacks dedicated unit tests to verify its S3 integration and error management. Although there is an existing test suite (e.g. in ./tests/test_handlers.py), it does not appear to cover this new functionality. Please consider the following:

  • Test Coverage: Add tests that cover both multipart and non-multipart upload workflows.
  • Error Handling: Verify that errors such as invalid AWS credentials and checksum mismatches are handled appropriately.
  • S3 Simulation: Implement tests using a library like moto to mock AWS S3 services, ensuring that operations like part rotations and complete/abort workflows are accurately simulated.
  • Integration Scenarios: Ensure tests confirm that S3 interactions (like creating multipart uploads, uploading parts, and completing uploads) function as expected in various scenarios.
🧰 Tools
🪛 Ruff (0.8.2)

1127-1127: Line too long (148 > 100)

(E501)


1204-1204: Line too long (105 > 100)

(E501)


1245-1245: f-string without any placeholders

Remove extraneous f prefix

(F541)