-
Notifications
You must be signed in to change notification settings - Fork 48
Feature/multi model artifact handler #869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 21 commits
6d9fa4a
6a930f1
5f3b316
802e438
35e8464
e625107
74a235e
848972e
d35b917
3ac5338
822c7e8
3d4d950
d4ef023
c40ea8b
d0309f4
19ec921
9089028
464cb37
49be8f8
a2894a5
5d327cd
b7e7d74
4418bf8
e2e7099
4cc8823
7f3cc7f
32f4826
9f69e35
ad0ce2e
eec2b4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,7 @@ | |
ModelProvenanceNotFoundError, | ||
OCIDataScienceModel, | ||
) | ||
from ads.common import oci_client as oc | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
@@ -1466,3 +1467,183 @@ def _download_file_description_artifact(self) -> Tuple[Union[str, List[str]], in | |
bucket_uri.append(uri) | ||
|
||
return bucket_uri[0] if len(bucket_uri) == 1 else bucket_uri, artifact_size | ||
|
||
def add_artifact( | ||
self, | ||
uri: str, | ||
files: Optional[List[str]] = None, | ||
): | ||
""" | ||
Adds information about objects in a specified bucket to the model description JSON. | ||
|
||
Parameters | ||
---------- | ||
uri (str): The URI representing the location of the artifact in OCI object storage. | ||
files (list of str, optional): A list of file names to include in the model description. | ||
If provided, only objects with matching file names will be included. Defaults to None. | ||
|
||
Returns | ||
------- | ||
None | ||
|
||
Raises | ||
------ | ||
ValueError: If no files are found to add to the model description. | ||
|
||
Note | ||
---- | ||
- If `files` is not provided, it retrieves information about all objects in the bucket. | ||
If `files` is provided, it only retrieves information about objects with matching file names. | ||
- If no objects are found to add to the model description, a ValueError is raised. | ||
""" | ||
|
||
bucket, namespace, prefix = self._extract_oci_uri_components(uri) | ||
|
||
if self.model_file_description == None: | ||
self.empty_json = { | ||
"version": "1.0", | ||
"type": "modelOSSReferenceDescription", | ||
"models": [], | ||
} | ||
self.set_spec(self.CONST_MODEL_FILE_DESCRIPTION, self.empty_json) | ||
|
||
# Get object storage client | ||
self.object_storage_client = oc.OCIClientFactory(**(self.dsc_model.auth)).object_storage | ||
|
||
# Remove if the model already exists | ||
self.remove_artifact(uri=uri) | ||
|
||
def check_if_file_exists(fileName): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: I would not recommend to create nested functions, it would be hard to write unit tests for such methods. I would rather recommend to move such methods into utils or outside the class. |
||
isExists = False | ||
try: | ||
headResponse = self.object_storage_client.head_object( | ||
namespace, bucket, object_name=fileName | ||
) | ||
if headResponse.status == 200: | ||
isExists = True | ||
except Exception as e: | ||
if hasattr(e, "status") and e.status == 404: | ||
logger.error(f"File not found in bucket: {fileName}") | ||
else: | ||
logger.error(f"An error occured: {e}") | ||
return isExists | ||
|
||
# Function to un-paginate the api call with while loop | ||
def list_obj_versions_unpaginated(): | ||
objectStorageList = [] | ||
has_next_page, opc_next_page = True, None | ||
while has_next_page: | ||
response = self.object_storage_client.list_object_versions( | ||
namespace_name=namespace, | ||
bucket_name=bucket, | ||
prefix=prefix, | ||
fields="name,size", | ||
page=opc_next_page, | ||
) | ||
objectStorageList.extend(response.data.items) | ||
has_next_page = response.has_next_page | ||
opc_next_page = response.next_page | ||
return objectStorageList | ||
|
||
# Fetch object details and put it into the objects variable | ||
objectStorageList = [] | ||
if files == None: | ||
objectStorageList = list_obj_versions_unpaginated() | ||
else: | ||
for fileName in files: | ||
if check_if_file_exists(fileName=fileName): | ||
objectStorageList.append( | ||
self.object_storage_client.list_object_versions( | ||
namespace_name=namespace, | ||
bucket_name=bucket, | ||
prefix=fileName, | ||
fields="name,size", | ||
).data.items[0] | ||
) | ||
|
||
objects = [ | ||
{"name": obj.name, "version": obj.version_id, "sizeInBytes": obj.size} | ||
for obj in objectStorageList | ||
if obj.size > 0 | ||
] | ||
|
||
if len(objects) == 0: | ||
error_message = ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT:
|
||
f"No files to add in the bucket: {bucket} with namespace: {namespace} " | ||
f"and prefix: {prefix}. File names: {files}" | ||
) | ||
logger.error(error_message) | ||
raise ValueError(error_message) | ||
|
||
tmp_model_file_description = self.model_file_description | ||
tmp_model_file_description["models"].append( | ||
{ | ||
"namespace": namespace, | ||
"bucketName": bucket, | ||
"prefix": prefix, | ||
"objects": objects, | ||
} | ||
) | ||
self.set_spec(self.CONST_MODEL_FILE_DESCRIPTION, tmp_model_file_description) | ||
|
||
def remove_artifact(self, uri: str): | ||
""" | ||
Removes information about objects in a specified bucket from the model description JSON. | ||
|
||
Parameters | ||
---------- | ||
uri (str): The URI representing the location of the artifact in OCI object storage. | ||
|
||
Returns | ||
------- | ||
None | ||
|
||
Raises | ||
------ | ||
ValueError: If the model description JSON is None. | ||
""" | ||
|
||
bucket, namespace, prefix = self._extract_oci_uri_components(uri) | ||
|
||
def findModelIdx(): | ||
for idx, model in enumerate(self.model_file_description["models"]): | ||
if ( | ||
model["namespace"], | ||
model["bucketName"], | ||
(model["prefix"] if ("prefix" in model) else None), | ||
) == (namespace, bucket, prefix): | ||
return idx | ||
return -1 | ||
|
||
if self.model_file_description == None: | ||
return | ||
|
||
modelSearchIdx = findModelIdx() | ||
if modelSearchIdx == -1: | ||
return | ||
else: | ||
# model found case | ||
self.model_file_description["models"].pop(modelSearchIdx) | ||
|
||
def _extract_oci_uri_components(self, uri: str): | ||
if not uri.startswith("oci://"): | ||
raise ValueError("Invalid URI format") | ||
|
||
# Remove the "oci://" prefix | ||
uri = uri[len("oci://"):] | ||
|
||
# Split by "@" to get bucket_name and the rest | ||
bucket_and_rest = uri.split("@", 1) | ||
if len(bucket_and_rest) != 2: | ||
raise ValueError("Invalid URI format") | ||
|
||
bucket_name = bucket_and_rest[0] | ||
|
||
# Split the rest by "/" to get namespace and prefix | ||
namespace_and_prefix = bucket_and_rest[1].split("/", 1) | ||
if len(namespace_and_prefix) == 1: | ||
namespace, prefix = namespace_and_prefix[0], "" | ||
else: | ||
namespace, prefix = namespace_and_prefix | ||
|
||
return bucket_name, namespace, None if prefix == '' else prefix |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Could you please add an examples section for the add and delete artifacts feature?