From 2ff9ba1229eb8d727ae833eafd37b4844b43a460 Mon Sep 17 00:00:00 2001 From: chuckbutkus Date: Thu, 23 Jan 2025 13:10:11 -0500 Subject: [PATCH] AWS necessary changes only (#6375) Co-authored-by: Engel Nyst --- openhands/server/listen_socket.py | 9 ++- openhands/storage/__init__.py | 2 +- openhands/storage/s3.py | 126 ++++++++++++++++++++++++------ 3 files changed, 111 insertions(+), 26 deletions(-) diff --git a/openhands/server/listen_socket.py b/openhands/server/listen_socket.py index 7e0f93066914..7b5d6b63870e 100644 --- a/openhands/server/listen_socket.py +++ b/openhands/server/listen_socket.py @@ -1,6 +1,7 @@ from urllib.parse import parse_qs import jwt +from pydantic import SecretStr from socketio.exceptions import ConnectionRefusedError from openhands.core.logger import openhands_logger as logger @@ -39,9 +40,13 @@ async def connect(connection_id: str, environ, auth): raise ConnectionRefusedError('No github_auth cookie') if not config.jwt_secret: raise RuntimeError('JWT secret not found') - decoded = jwt.decode( - signed_token, config.jwt_secret.get_secret_value(), algorithms=['HS256'] + + jwt_secret = ( + config.jwt_secret.get_secret_value() + if isinstance(config.jwt_secret, SecretStr) + else config.jwt_secret ) + decoded = jwt.decode(signed_token, jwt_secret, algorithms=['HS256']) user_id = decoded['github_user_id'] logger.info(f'User {user_id} is connecting to conversation {conversation_id}') diff --git a/openhands/storage/__init__.py b/openhands/storage/__init__.py index 98ab31aa5229..4da70ddbe405 100644 --- a/openhands/storage/__init__.py +++ b/openhands/storage/__init__.py @@ -11,7 +11,7 @@ def get_file_store(file_store: str, file_store_path: str | None = None) -> FileS raise ValueError('file_store_path is required for local file store') return LocalFileStore(file_store_path) elif file_store == 's3': - return S3FileStore() + return S3FileStore(file_store_path) elif file_store == 'google_cloud': return GoogleCloudFileStore(file_store_path) return InMemoryFileStore() diff --git a/openhands/storage/s3.py b/openhands/storage/s3.py index 76db87dc638a..86be398c5172 100644 --- a/openhands/storage/s3.py +++ b/openhands/storage/s3.py @@ -1,50 +1,130 @@ -import io import os -from minio import Minio +import boto3 +import botocore from openhands.storage.files import FileStore class S3FileStore(FileStore): - def __init__(self) -> None: + def __init__(self, bucket_name: str | None) -> None: access_key = os.getenv('AWS_ACCESS_KEY_ID') secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') - endpoint = os.getenv('AWS_S3_ENDPOINT', 's3.amazonaws.com') secure = os.getenv('AWS_S3_SECURE', 'true').lower() == 'true' - self.bucket = os.getenv('AWS_S3_BUCKET') - self.client = Minio(endpoint, access_key, secret_key, secure=secure) + endpoint = self._ensure_url_scheme(secure, os.getenv('AWS_S3_ENDPOINT')) + if bucket_name is None: + bucket_name = os.environ['AWS_S3_BUCKET'] + self.bucket = bucket_name + self.client = boto3.client( + 's3', + aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + endpoint_url=endpoint, + use_ssl=secure, + ) def write(self, path: str, contents: str | bytes) -> None: - as_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents - stream = io.BytesIO(as_bytes) try: - self.client.put_object(self.bucket, path, stream, len(as_bytes)) - except Exception as e: - raise FileNotFoundError(f'Failed to write to S3 at path {path}: {e}') + as_bytes = ( + contents.encode('utf-8') if isinstance(contents, str) else contents + ) + self.client.put_object(Bucket=self.bucket, Key=path, Body=as_bytes) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == 'AccessDenied': + raise FileNotFoundError( + f"Error: Access denied to bucket '{self.bucket}'." + ) + elif e.response['Error']['Code'] == 'NoSuchBucket': + raise FileNotFoundError( + f"Error: The bucket '{self.bucket}' does not exist." + ) + raise FileNotFoundError( + f"Error: Failed to write to bucket '{self.bucket}' at path {path}: {e}" + ) def read(self, path: str) -> str: try: - return self.client.get_object(self.bucket, path).data.decode('utf-8') + response = self.client.get_object(Bucket=self.bucket, Key=path) + return response['Body'].read().decode('utf-8') + except botocore.exceptions.ClientError as e: + # Catch all S3-related errors + if e.response['Error']['Code'] == 'NoSuchBucket': + raise FileNotFoundError( + f"Error: The bucket '{self.bucket}' does not exist." + ) + elif e.response['Error']['Code'] == 'NoSuchKey': + raise FileNotFoundError( + f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'." + ) + else: + raise FileNotFoundError( + f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" + ) except Exception as e: - raise FileNotFoundError(f'Failed to read from S3 at path {path}: {e}') + raise FileNotFoundError( + f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" + ) def list(self, path: str) -> list[str]: if path and path != '/' and not path.endswith('/'): path += '/' try: - return [ - obj.object_name for obj in self.client.list_objects(self.bucket, path) - ] + response = self.client.list_objects_v2(Bucket=self.bucket, Prefix=path) + # Check if 'Contents' exists in the response + if 'Contents' in response: + objects = [obj['Key'] for obj in response['Contents']] + return objects + else: + return list() + except botocore.exceptions.ClientError as e: + # Catch all S3-related errors + if e.response['Error']['Code'] == 'NoSuchBucket': + raise FileNotFoundError( + f"Error: The bucket '{self.bucket}' does not exist." + ) + elif e.response['Error']['Code'] == 'AccessDenied': + raise FileNotFoundError( + f"Error: Access denied to bucket '{self.bucket}'." + ) + else: + raise FileNotFoundError(f"Error: {e.response['Error']['Message']}") except Exception as e: - raise FileNotFoundError(f'Failed to list S3 objects at path {path}: {e}') + raise FileNotFoundError( + f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" + ) def delete(self, path: str) -> None: try: - client = self.client - bucket = self.bucket - objects_to_delete = client.list_objects(bucket, prefix=path, recursive=True) - for obj in objects_to_delete: - client.remove_object(bucket, obj.object_name) + self.client.delete_object(Bucket=self.bucket, Key=path) + except botocore.exceptions.ClientError as e: + if e.response['Error']['Code'] == 'NoSuchBucket': + raise FileNotFoundError( + f"Error: The bucket '{self.bucket}' does not exist." + ) + elif e.response['Error']['Code'] == 'AccessDenied': + raise FileNotFoundError( + f"Error: Access denied to bucket '{self.bucket}'." + ) + elif e.response['Error']['Code'] == 'NoSuchKey': + raise FileNotFoundError( + f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'." + ) + else: + raise FileNotFoundError( + f"Error: Failed to delete key '{path}' from bucket '{self.bucket}': {e}" + ) except Exception as e: - raise FileNotFoundError(f'Failed to delete S3 object at path {path}: {e}') + raise FileNotFoundError( + f"Error: Failed to delete key '{path}' from bucket '{self.bucket}: {e}" + ) + + def _ensure_url_scheme(self, secure: bool, url: str | None) -> str | None: + if not url: + return None + if secure: + if not url.startswith('https://'): + url = 'https://' + url.removeprefix('http://') + else: + if not url.startswith('http://'): + url = 'http://' + url.removeprefix('https://') + return url