Skip to content

Commit

Permalink
DMI SM updates to use status endpoint w/ database records; run on CPU…
Browse files Browse the repository at this point in the history
… if no GPU enabled
  • Loading branch information
dale-wahl committed Jul 16, 2024
1 parent d2a787e commit eb76937
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 55 deletions.
147 changes: 106 additions & 41 deletions common/lib/dmi_service_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import datetime
import os
import json
import time
from json import JSONDecodeError
from werkzeug.utils import secure_filename
Expand Down Expand Up @@ -122,6 +123,21 @@ def check_progress(self):
self.processor.dataset.update_progress(current_completed / self.num_files_to_process)
self.processed_files = current_completed

def check_service_exists(self):
""""
Check for services created with the current dataset key.
Returns None, if none found else the related service jobs and their details
"""
# Check to see if service already created
resp = requests.get(self.server_address + "jobs/details_query/", json={"details_key": "$.request_json.4CAT_dataset_key", "details_value": self.processor.dataset.key}, timeout=30)
if resp.status_code == 200:
# Check if service is already running
if len(resp.json()["jobs"]) > 0:
return resp.json()["jobs"]
else:
return None

def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=60, check_process=True, callback=None):
"""
Send request and wait for results to be ready.
Expand All @@ -136,50 +152,69 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
else:
raise DmiServiceManagerException("dmi_service_manager.local_or_remote setting must be 'local' or 'remote'")

api_endpoint = self.server_address + service_endpoint
try:
resp = requests.post(api_endpoint, json=data, timeout=30)
except requests.exceptions.ConnectionError as e :
raise DmiServiceManagerException(f"Unable to connect to DMI Service Manager server: {str(e)}")

if resp.status_code == 202:
# New request successful
results_url = api_endpoint + "?key=" + resp.json()['key']
existing_service = self.check_service_exists()
if existing_service:
if len(existing_service) > 1:
raise Exception("Multiple services found with the same dataset key.")
else:
existing_service = existing_service[0]
if existing_service['status'] == 'complete':
# Service already completed
return True
elif existing_service['status'] == 'error':
results = json.loads(existing_service['results'])
raise DmiServiceManagerException(f"DMI Service Manager Error: {results['error']}")
else:
# Service already running
results_url = self.server_address + f"jobs/{existing_service.get('id')}"
self.processor.dataset.update_status(f"Waiting for results from {service_endpoint}...")
else:
# Create a new service
# Append dataset key to data
data["4CAT_dataset_key"] = self.processor.dataset.key
api_endpoint = self.server_address + service_endpoint
try:
resp_json = resp.json()
if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
# Request already exists
results_url = api_endpoint + "?key=" + resp_json['key']
else:
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
except JSONDecodeError:
# Unexpected Error
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")
resp = requests.post(api_endpoint, json=data, timeout=30)
except requests.exceptions.ConnectionError as e :
raise DmiServiceManagerException(f"Unable to connect to DMI Service Manager server: {str(e)}")

if resp.status_code == 202:
# New request successful
results_url = resp.json()['result_url']
else:
try:
resp_json = resp.json()
if resp.status_code == 400 and 'key' in resp_json and 'error' in resp_json and resp_json['error'] == f"future_key {resp_json['key']} already exists":
# Request already exists; get DMI SM database key
raise Exception(f"Request already exists; check that DMI SM is up to date")
elif resp.status_code == 404:
# Could be local vs remote not set correctly
raise DmiServiceManagerException(f"404: {resp.url} not found; DMI Service Manager may not be set up for this service")
else:
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp_json)}")
except JSONDecodeError:
# Unexpected Error
raise DmiServiceManagerException(f"DMI Service Manager error: {str(resp.status_code)}: {str(resp.text)}")

# Wait for results to be ready
self.processor.dataset.update_status(f"Generating results for {service_endpoint}...")
# Wait for results to be ready
self.processor.dataset.update_status(f"Generating results for {service_endpoint}...")

check_time = time.time()
check_time = 0
success = False
connection_error = 0
last_status = None
while True:
time.sleep(1)
# If interrupted is called, attempt to finish dataset while server still running
if self.processor.interrupted:
self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...",
is_final=True)
self.processor.dataset.update_status(f"4CAT interrupted; Processing successful {service_endpoint} results...", is_final=True)
break

# Send request to check status every wait_period seconds
if (time.time() - check_time) > wait_period:
check_time = time.time()
# Update progress
if check_process:
self.check_progress()

if callback:
callback(self)

try:
result = requests.get(results_url, timeout=30)
except requests.exceptions.ConnectionError as e:
Expand All @@ -189,28 +224,58 @@ def send_request_and_wait_for_results(self, service_endpoint, data, wait_period=
raise DmiServiceManagerException(f"Unable to connect to DMI Service Manager server: {str(e)}")
continue

if 'status' in result.json().keys() and result.json()['status'] == 'running':
# Still running
if result.status_code != 200 or (result.json and result.json().get('status') != "success"):
# Unexpected response from DMI SM
connection_error += 1
if connection_error > 3:
raise DmiServiceManagerException(f"Unable to connect to DMI Service Manager server: {str(result.status_code)}: {str(result.json()) if 'json' in result.headers.get('Content-Type', '') else str(result.text)}")
continue
elif 'report' in result.json().keys() and result.json()['returncode'] == 0:
# Complete without error
self.processor.dataset.update_status(f"Completed {service_endpoint}!")
success = True
break
service_status = result.json()["job"]

elif 'returncode' in result.json().keys() and int(result.json()['returncode']) == 1:
# Error
if 'error' in result.json().keys():
error = result.json()['error']
# Update progress
if check_process:
# Check for message
status_message = service_status.get('message', '')
current_completed = service_status.get('processed_records', False)
if status_message:
# Update status message if changed
if last_status != status_message:
last_status = service_status.get('message', '')
self.processor.dataset.update_status(last_status)
if current_completed and self.processed_files != int(current_completed):
self.processor.dataset.update_progress(int(current_completed) / self.num_files_to_process)
self.processed_files = int(current_completed)
else:
# This service does not provide status message; check progress via file count
self.check_progress()

if service_status['status'] in ["created", "running", "pending"]:
# Still running
continue
elif service_status['status'] in ["complete", "error"]:
results = json.loads(service_status['results'])
if not results:
# This should not be the case is the service was written well (unless the DMI SM crashed?)
#TODO test if timing issue?
connection_error += 1
if connection_error > 3:
raise DmiServiceManagerException(f"Unable to read DMI SM results: {service_status}")
if int(results['returncode']) == 0:
# Complete without error
self.processor.dataset.update_status(f"Completed {service_endpoint}!")
success = True
break
else:
error = results['error']
if "CUDA error: out of memory" in error:
raise DmiServiceManagerException("DMI Service Manager server ran out of memory; try reducing the number of files processed at once or waiting until the server is less busy.")
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + error)
else:
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
else:
# Something botched
raise DmiServiceManagerException(f"Error {service_endpoint}: " + str(result.json()))
else:
time.sleep(1)

return success

Expand Down
11 changes: 8 additions & 3 deletions processors/machine_learning/blip2_image_caption.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,13 @@ def process(self):
try:
gpu_response = dmi_service_manager.check_gpu_memory_available("blip2")
except DmiServiceManagerException as e:
return self.dataset.finish_with_error(str(e))
if "GPU not enabled on this instance of DMI Service Manager" in str(e):
self.dataset.update_status("GPU not enabled on this instance of DMI Service Manager; this may be a minute...")
gpu_response = None
else:
return self.dataset.finish_with_error(str(e))

if int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
if gpu_response and int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
self.dataset.finish_with_error(
"DMI Service Manager currently busy; no GPU memory available. Please try again later.")
return
Expand All @@ -162,7 +166,8 @@ def process(self):
"--image-folder", f"data/{path_to_files}",
"--max_new_tokens", str(self.parameters.get("max_new_tokens", 20)),
"--dataset-name", f"{self.dataset.key}"
]
],
"pass_key": True, # This tells the DMI SM there is a status update endpoint in the blip2 image
}

# If prompt, add to args
Expand Down
14 changes: 10 additions & 4 deletions processors/machine_learning/clip_categorize_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,14 @@ def process(self):
try:
gpu_response = dmi_service_manager.check_gpu_memory_available("clip")
except DmiServiceManagerException as e:
return self.dataset.finish_with_error(str(e))

if int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
if "GPU not enabled on this instance of DMI Service Manager" in str(e):
self.dataset.update_status(
"GPU not enabled on this instance of DMI Service Manager; this may be a minute...")
gpu_response = None
else:
return self.dataset.finish_with_error(str(e))

if gpu_response and int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
self.dataset.finish_with_error(
"DMI Service Manager currently busy; no GPU memory available. Please try again later.")
return
Expand All @@ -171,7 +176,8 @@ def process(self):
data = {"args": ['--output_dir', f"data/{path_to_results}",
"--model", model,
"--categories", f"{','.join(categories)}",
"--images"]
"--images"],
"pass_key": True, # This tells the DMI SM there is a status update endpoint in the clip image
}

# Finally, add image files to args
Expand Down
15 changes: 11 additions & 4 deletions processors/machine_learning/generate_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,14 @@ def process(self):
try:
gpu_response = dmi_service_manager.check_gpu_memory_available("stable_diffusion")
except DmiServiceManagerException as e:
return self.dataset.finish_with_error(str(e))

if int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
if "GPU not enabled on this instance of DMI Service Manager" in str(e):
self.dataset.update_status(
"GPU not enabled on this instance of DMI Service Manager; this may be a minute...")
gpu_response = None
else:
return self.dataset.finish_with_error(str(e))

if gpu_response and int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
self.dataset.finish_with_error(
"DMI Service Manager currently busy; no GPU memory available. Please try again later.")
return
Expand All @@ -219,7 +224,9 @@ def process(self):
# interface.py args
data = {"timeout": (86400 * 7), "args": ['--output-dir', f"data/{path_to_results}",
"--prompts-file",
f"data/{path_to_files.joinpath(dmi_service_manager.sanitize_filenames(prompts_file.name))}"]
f"data/{path_to_files.joinpath(dmi_service_manager.sanitize_filenames(prompts_file.name))}"],
"pass_key": True, # This tells the DMI SM there is a status update endpoint in the clip image

}

# Send request to DMI Service Manager
Expand Down
11 changes: 8 additions & 3 deletions processors/machine_learning/whisper_speech_to_text.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,16 @@ def process(self):

# Check connection and GPU memory available
try:
gpu_response = dmi_service_manager.check_gpu_memory_available("whisper")
gpu_response = dmi_service_manager.check_gpu_memory_available("blip2")
except DmiServiceManagerException as e:
return self.dataset.finish_with_error(str(e))
if "GPU not enabled on this instance of DMI Service Manager" in str(e):
self.dataset.update_status(
"GPU not enabled on this instance of DMI Service Manager; this may be a minute...")
gpu_response = None
else:
return self.dataset.finish_with_error(str(e))

if int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
if gpu_response and int(gpu_response.get("memory", {}).get("gpu_free_mem", 0)) < 1000000:
self.dataset.finish_with_error("DMI Service Manager currently busy; no GPU memory available. Please try again later.")
return

Expand Down

0 comments on commit eb76937

Please sign in to comment.