-
Notifications
You must be signed in to change notification settings - Fork 4.9k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Engel Nyst <[email protected]>
- Loading branch information
1 parent
a7e6068
commit 2ff9ba1
Showing
3 changed files
with
111 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,50 +1,130 @@ | ||
import io | ||
import os | ||
|
||
from minio import Minio | ||
import boto3 | ||
import botocore | ||
|
||
from openhands.storage.files import FileStore | ||
|
||
|
||
class S3FileStore(FileStore): | ||
def __init__(self) -> None: | ||
def __init__(self, bucket_name: str | None) -> None: | ||
access_key = os.getenv('AWS_ACCESS_KEY_ID') | ||
secret_key = os.getenv('AWS_SECRET_ACCESS_KEY') | ||
endpoint = os.getenv('AWS_S3_ENDPOINT', 's3.amazonaws.com') | ||
secure = os.getenv('AWS_S3_SECURE', 'true').lower() == 'true' | ||
self.bucket = os.getenv('AWS_S3_BUCKET') | ||
self.client = Minio(endpoint, access_key, secret_key, secure=secure) | ||
endpoint = self._ensure_url_scheme(secure, os.getenv('AWS_S3_ENDPOINT')) | ||
if bucket_name is None: | ||
bucket_name = os.environ['AWS_S3_BUCKET'] | ||
self.bucket = bucket_name | ||
self.client = boto3.client( | ||
's3', | ||
aws_access_key_id=access_key, | ||
aws_secret_access_key=secret_key, | ||
endpoint_url=endpoint, | ||
use_ssl=secure, | ||
) | ||
|
||
def write(self, path: str, contents: str | bytes) -> None: | ||
as_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents | ||
stream = io.BytesIO(as_bytes) | ||
try: | ||
self.client.put_object(self.bucket, path, stream, len(as_bytes)) | ||
except Exception as e: | ||
raise FileNotFoundError(f'Failed to write to S3 at path {path}: {e}') | ||
as_bytes = ( | ||
contents.encode('utf-8') if isinstance(contents, str) else contents | ||
) | ||
self.client.put_object(Bucket=self.bucket, Key=path, Body=as_bytes) | ||
except botocore.exceptions.ClientError as e: | ||
if e.response['Error']['Code'] == 'AccessDenied': | ||
raise FileNotFoundError( | ||
f"Error: Access denied to bucket '{self.bucket}'." | ||
) | ||
elif e.response['Error']['Code'] == 'NoSuchBucket': | ||
raise FileNotFoundError( | ||
f"Error: The bucket '{self.bucket}' does not exist." | ||
) | ||
raise FileNotFoundError( | ||
f"Error: Failed to write to bucket '{self.bucket}' at path {path}: {e}" | ||
) | ||
|
||
def read(self, path: str) -> str: | ||
try: | ||
return self.client.get_object(self.bucket, path).data.decode('utf-8') | ||
response = self.client.get_object(Bucket=self.bucket, Key=path) | ||
return response['Body'].read().decode('utf-8') | ||
except botocore.exceptions.ClientError as e: | ||
# Catch all S3-related errors | ||
if e.response['Error']['Code'] == 'NoSuchBucket': | ||
raise FileNotFoundError( | ||
f"Error: The bucket '{self.bucket}' does not exist." | ||
) | ||
elif e.response['Error']['Code'] == 'NoSuchKey': | ||
raise FileNotFoundError( | ||
f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'." | ||
) | ||
else: | ||
raise FileNotFoundError( | ||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" | ||
) | ||
except Exception as e: | ||
raise FileNotFoundError(f'Failed to read from S3 at path {path}: {e}') | ||
raise FileNotFoundError( | ||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" | ||
) | ||
|
||
def list(self, path: str) -> list[str]: | ||
if path and path != '/' and not path.endswith('/'): | ||
path += '/' | ||
try: | ||
return [ | ||
obj.object_name for obj in self.client.list_objects(self.bucket, path) | ||
] | ||
response = self.client.list_objects_v2(Bucket=self.bucket, Prefix=path) | ||
# Check if 'Contents' exists in the response | ||
if 'Contents' in response: | ||
objects = [obj['Key'] for obj in response['Contents']] | ||
return objects | ||
else: | ||
return list() | ||
except botocore.exceptions.ClientError as e: | ||
# Catch all S3-related errors | ||
if e.response['Error']['Code'] == 'NoSuchBucket': | ||
raise FileNotFoundError( | ||
f"Error: The bucket '{self.bucket}' does not exist." | ||
) | ||
elif e.response['Error']['Code'] == 'AccessDenied': | ||
raise FileNotFoundError( | ||
f"Error: Access denied to bucket '{self.bucket}'." | ||
) | ||
else: | ||
raise FileNotFoundError(f"Error: {e.response['Error']['Message']}") | ||
except Exception as e: | ||
raise FileNotFoundError(f'Failed to list S3 objects at path {path}: {e}') | ||
raise FileNotFoundError( | ||
f"Error: Failed to read from bucket '{self.bucket}' at path {path}: {e}" | ||
) | ||
|
||
def delete(self, path: str) -> None: | ||
try: | ||
client = self.client | ||
bucket = self.bucket | ||
objects_to_delete = client.list_objects(bucket, prefix=path, recursive=True) | ||
for obj in objects_to_delete: | ||
client.remove_object(bucket, obj.object_name) | ||
self.client.delete_object(Bucket=self.bucket, Key=path) | ||
except botocore.exceptions.ClientError as e: | ||
if e.response['Error']['Code'] == 'NoSuchBucket': | ||
raise FileNotFoundError( | ||
f"Error: The bucket '{self.bucket}' does not exist." | ||
) | ||
elif e.response['Error']['Code'] == 'AccessDenied': | ||
raise FileNotFoundError( | ||
f"Error: Access denied to bucket '{self.bucket}'." | ||
) | ||
elif e.response['Error']['Code'] == 'NoSuchKey': | ||
raise FileNotFoundError( | ||
f"Error: The object key '{path}' does not exist in bucket '{self.bucket}'." | ||
) | ||
else: | ||
raise FileNotFoundError( | ||
f"Error: Failed to delete key '{path}' from bucket '{self.bucket}': {e}" | ||
) | ||
except Exception as e: | ||
raise FileNotFoundError(f'Failed to delete S3 object at path {path}: {e}') | ||
raise FileNotFoundError( | ||
f"Error: Failed to delete key '{path}' from bucket '{self.bucket}: {e}" | ||
) | ||
|
||
def _ensure_url_scheme(self, secure: bool, url: str | None) -> str | None: | ||
if not url: | ||
return None | ||
if secure: | ||
if not url.startswith('https://'): | ||
url = 'https://' + url.removeprefix('http://') | ||
else: | ||
if not url.startswith('http://'): | ||
url = 'http://' + url.removeprefix('https://') | ||
return url |