Skip to content

Updated get_available_device_id logic #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions QEfficient/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
QEFF_DIR = os.path.dirname(UTILS_DIR)
ROOT_DIR = os.path.dirname(QEFF_DIR)
QEFF_CACHE_DIR_NAME = "qeff_cache"
LOCK_DIR = "/tmp/device_locks"

ONNX_EXPORT_EXAMPLE_BATCH_SIZE = 1
ONNX_EXPORT_EXAMPLE_SEQ_LEN = 32
Expand Down
160 changes: 131 additions & 29 deletions QEfficient/utils/device_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,51 +5,153 @@
#
# -----------------------------------------------------------------------------

import fcntl
import math
import os
import re
import subprocess
import time
from typing import Optional

from QEfficient.utils.constants import Constants
from QEfficient.utils.constants import LOCK_DIR, Constants
from QEfficient.utils.logging_utils import logger


def is_networks_loaded(stdout):
# Check is the networks are loaded on the device.
network_loaded = re.search(r"Networks Active:(\d+)", stdout)
if network_loaded and int(network_loaded.group(1)) > 0:
return True
return False
def is_device_loaded(stdout: str) -> bool:
try:
match = re.search(r"Networks Loaded:(\d+)", stdout)
return int(match.group(1)) > 0 if match else False

except (ValueError, AttributeError):
return False


def release_device_lock(lock_file):
try:
fcntl.flock(lock_file, fcntl.LOCK_UN)
lock_file.close()

except Exception as e:
logger.error(f"Error releasing lock: {e}")


def get_device_count():
command = ["/opt/qti-aic/tools/qaic-util", "-q"]

try:
result = subprocess.run(command, capture_output=True, text=True)
qids = re.findall(r"QID (\d+)", result.stdout)
return max(map(int, qids)) + 1 if qids else 0

def get_available_device_id():
except OSError:
logger.warning("ERROR while fetching the device", command)
return 0


def ensure_lock_dir(lock_dir: str):
if not os.path.exists(lock_dir):
os.makedirs(lock_dir)


def acquire_device_lock(retry_interval: int = 10, retry_duration: int = 600) -> Optional[object]:
"""
API to check available device id.
Attempt to acquire a non-blocking exclusive lock on a device lock file.
Retries every 10 seconds for up to 5 minutes.

Args:
device_id (int): The device ID to lock.

Return:
:int: Available device id.
Returns:
file object if lock is acquired, else None.
"""
ensure_lock_dir(LOCK_DIR)
lock_file_path = os.path.join(LOCK_DIR, "device_check.lock")
start_time = time.time()

device_id = 0
result = None
while (time.time() - start_time) < retry_duration:
lock_file = open(lock_file_path, "w")

# FIXME: goes into infinite loop when user doesn't have permission and the command gives permission denied.
# To reproduce change the ownership of available devices.
while 1:
command = ["/opt/qti-aic/tools/qaic-util", "-q", "-d", f"{device_id}"]
try:
result = subprocess.run(command, capture_output=True, text=True)
except OSError:
logger.warning("Not a Cloud AI 100 device, Command not found", command)
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
logger.debug("Lock acquired for device check")
return lock_file

except BlockingIOError:
lock_file.close()
logger.debug(f"Device check is locked. Retrying in {retry_interval} seconds...")
time.sleep(retry_interval)

except Exception as e:
logger.error(f"Unexpected error acquiring lock for device check: {e}")
return None
if result:
if "Status:Error" in result.stdout or is_networks_loaded(result.stdout):
device_id += 1
elif "Status:Ready" in result.stdout:
logger.info("device is available.")
return [device_id]
elif "Failed to find requested device ID" in result.stdout:
logger.warning("Failed to find requested device ID")
return None

logger.warning(f"Failed to acquire lock for device check after {retry_duration//60} minutes.")
return None


def __fetch_device_id(device_count):
for device_id in range(device_count):
try:
device_query_cmd = ["/opt/qti-aic/tools/qaic-util", "-q", "-d", str(device_id)]
result = subprocess.run(device_query_cmd, capture_output=True, text=True)

if "Failed to find requested device ID" in result.stdout:
logger.warning(f"Device ID {device_id} not found.")
continue

if "Status:Error" in result.stdout or not is_device_loaded(result.stdout):
logger.debug(f"Device {device_id} is not available.")
continue

logger.info(f"Device ID {device_id} is available and locked.")
return [device_id]

except subprocess.TimeoutExpired:
logger.error(f"Timeout while querying device {device_id}.")
except OSError as e:
logger.error(f"OSError while querying device {device_id}: {e}")
return None
except Exception as e:
logger.exception(f"Unexpected error while checking device {device_id}: {e}")
return None


def get_available_device_id(retry_duration: int = 600, wait_time: int = 5) -> Optional[list[int]]:
"""
Find an available Cloud AI 100 device ID using file-based locking.

Args:
max_retry_count (int): Maximum number of retries.
wait_time (int): Seconds to wait between retries.

Returns:
list[int] | None: List containing available device ID, or None if not found.
"""
device_count = get_device_count()

if device_count == 0:
logger.warning("No Cloud AI 100 devices found or platform SDK not installed.")
return None

lock_file = acquire_device_lock()

if lock_file:
start_time = time.time()

while (time.time() - start_time) < retry_duration:
device_id = __fetch_device_id(device_count)

if device_id:
release_device_lock(lock_file)
return device_id

time.sleep(wait_time)

if lock_file:
release_device_lock(lock_file)

logger.warning("No available device found after all retries.")
return None


def is_qpc_size_gt_32gb(params: int, mxfp6: bool) -> bool:
Expand Down
10 changes: 5 additions & 5 deletions scripts/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ pipeline {
mkdir -p $PWD/Non_qaic &&
export TOKENIZERS_PARALLELISM=false &&
export QEFF_HOME=$PWD/Non_qaic &&
pytest tests -m '(not cli) and (on_qaic) and (not multimodal) and (not qnn)' --ignore tests/vllm -n 4 --junitxml=tests/tests_log2.xml &&
pytest tests -m '(not cli) and (on_qaic) and (not multimodal) and (not qnn)' --ignore tests/vllm -n auto --junitxml=tests/tests_log2.xml &&
junitparser merge tests/tests_log2.xml tests/tests_log.xml &&
deactivate"
'''
Expand All @@ -77,7 +77,7 @@ pipeline {
mkdir -p $PWD/Non_cli_qaic_multimodal &&
export TOKENIZERS_PARALLELISM=false &&
export QEFF_HOME=$PWD/Non_cli_qaic_multimodal &&
pytest tests -m '(not cli) and (on_qaic) and (multimodal) and (not qnn)' --ignore tests/vllm -n 4 --junitxml=tests/tests_log6.xml &&
pytest tests -m '(not cli) and (on_qaic) and (multimodal) and (not qnn)' --ignore tests/vllm -n auto --junitxml=tests/tests_log6.xml &&
junitparser merge tests/tests_log6.xml tests/tests_log.xml &&
deactivate"
'''
Expand All @@ -96,7 +96,7 @@ pipeline {
mkdir -p $PWD/cli &&
export TOKENIZERS_PARALLELISM=false &&
export QEFF_HOME=$PWD/cli &&
pytest tests -m '(cli and not qnn)' --ignore tests/vllm --junitxml=tests/tests_log3.xml &&
pytest tests -m '(cli and not qnn)' --ignore tests/vllm -n auto --junitxml=tests/tests_log3.xml &&
junitparser merge tests/tests_log3.xml tests/tests_log.xml &&
deactivate"
'''
Expand Down Expand Up @@ -125,7 +125,7 @@ pipeline {
mkdir -p $PWD/Qnn_cli &&
export TOKENIZERS_PARALLELISM=false &&
export QEFF_HOME=$PWD/Qnn_cli &&
pytest tests -m '(cli and qnn)' --ignore tests/vllm --junitxml=tests/tests_log4.xml &&
pytest tests -m '(cli and qnn)' --ignore tests/vllm -n auto --junitxml=tests/tests_log4.xml &&
junitparser merge tests/tests_log4.xml tests/tests_log.xml &&
deactivate"
'''
Expand All @@ -144,7 +144,7 @@ pipeline {
mkdir -p $PWD/Qnn_non_cli &&
export TOKENIZERS_PARALLELISM=false &&
export QEFF_HOME=$PWD/Qnn_non_cli &&
pytest tests -m '(not cli) and (qnn) and (on_qaic) and (not multimodal)' --ignore tests/vllm --junitxml=tests/tests_log5.xml &&
pytest tests -m '(not cli) and (qnn) and (on_qaic) and (not multimodal)' -n auto --ignore tests/vllm --junitxml=tests/tests_log5.xml &&
junitparser merge tests/tests_log5.xml tests/tests_log.xml &&
deactivate"
'''
Expand Down
3 changes: 3 additions & 0 deletions tests/peft/lora/test_lora_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from QEfficient import QEffAutoPeftModelForCausalLM
from QEfficient.peft.lora import QEffAutoLoraModelForCausalLM
from QEfficient.utils import load_hf_tokenizer
from QEfficient.utils.device_utils import get_available_device_id

configs = [
pytest.param(
Expand Down Expand Up @@ -235,6 +236,7 @@ def test_auto_lora_model_for_causal_lm_noncb_export_compile_generate(
tokenizer=load_hf_tokenizer(pretrained_model_name_or_path=base_model_name),
prompts=prompts,
prompt_to_adapter_mapping=["adapter_0", "adapter_1", "adapter_0", "base"],
device_ids=get_available_device_id(),
)


Expand All @@ -260,4 +262,5 @@ def test_auto_lora_model_for_causal_lm_cb_compile_generate(base_model_name, adap
tokenizer=load_hf_tokenizer(pretrained_model_name_or_path=base_model_name),
prompts=prompts,
prompt_to_adapter_mapping=["adapter_0", "adapter_1", "adapter_0", "base"],
device_ids=get_available_device_id(),
)
2 changes: 2 additions & 0 deletions tests/peft/test_peft_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from transformers import AutoConfig, AutoModelForCausalLM

from QEfficient import QEffAutoPeftModelForCausalLM
from QEfficient.utils.device_utils import get_available_device_id

configs = [
pytest.param(
Expand Down Expand Up @@ -181,6 +182,7 @@ def test_auto_peft_model_for_causal_lm_compile_generate(base_config, adapter_con
axis=1,
),
max_new_tokens=10,
device_ids=get_available_device_id(),
)

start = perf_counter()
Expand Down
14 changes: 6 additions & 8 deletions tests/text_generation/test_text_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,6 @@ def test_generate_text_stream(
qeff_model = QEFFAutoModelForCausalLM(model_hf)

qeff_model.export()
device_id = get_available_device_id()

if not device_id:
pytest.skip("No available devices to run model on Cloud AI 100")

qpc_path = qeff_model.compile(
prefill_seq_len=prompt_len,
Expand All @@ -86,7 +82,9 @@ def test_generate_text_stream(
full_batch_size=full_batch_size,
)

exec_info = qeff_model.generate(tokenizer, prompts=Constants.INPUT_STR, generation_len=max_gen_len)
exec_info = qeff_model.generate(
tokenizer, prompts=Constants.INPUT_STR, generation_len=max_gen_len, device_ids=get_available_device_id()
)
cloud_ai_100_tokens = exec_info.generated_ids[0] # Because we always run for single input and single batch size
cloud_ai_100_output = [tokenizer.decode(token, skip_special_tokens=True) for token in cloud_ai_100_tokens[0]]

Expand All @@ -100,7 +98,7 @@ def test_generate_text_stream(
for decoded_tokens in text_generator.generate_stream_tokens(Constants.INPUT_STR, generation_len=max_gen_len):
stream_tokens.extend(decoded_tokens)

assert cloud_ai_100_output == stream_tokens, (
f"Deviation in output observed while comparing regular execution and streamed output: {cloud_ai_100_output} != {stream_tokens}"
)
assert (
cloud_ai_100_output == stream_tokens
), f"Deviation in output observed while comparing regular execution and streamed output: {cloud_ai_100_output} != {stream_tokens}"
assert os.path.isfile(os.path.join(os.path.dirname(qpc_path), "qconfig.json"))
34 changes: 14 additions & 20 deletions tests/transformers/models/test_causal_lm_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,19 +135,16 @@ def check_causal_lm_pytorch_vs_kv_vs_ort_vs_ai100(

pytorch_kv_tokens = api_runner.run_kv_model_on_pytorch(qeff_model.model)

assert (pytorch_hf_tokens == pytorch_kv_tokens).all(), (
"Tokens don't match for HF PyTorch model output and KV PyTorch model output"
)
assert (
pytorch_hf_tokens == pytorch_kv_tokens
).all(), "Tokens don't match for HF PyTorch model output and KV PyTorch model output"

onnx_model_path = qeff_model.export()
ort_tokens = api_runner.run_kv_model_on_ort(onnx_model_path, is_tlm=is_tlm)
gen_len = ort_tokens.shape[-1]

assert (pytorch_kv_tokens == ort_tokens).all(), "Tokens don't match for ONNXRT output and PyTorch output."

if not get_available_device_id():
pytest.skip("No available devices to run model on Cloud AI 100")

qpc_path = qeff_model.compile(
prefill_seq_len=prompt_len,
ctx_len=ctx_len,
Expand All @@ -159,18 +156,18 @@ def check_causal_lm_pytorch_vs_kv_vs_ort_vs_ai100(
enable_qnn=enable_qnn,
qnn_config=qnn_config,
)
exec_info = qeff_model.generate(tokenizer, prompts=Constants.INPUT_STR)
exec_info = qeff_model.generate(tokenizer, prompts=Constants.INPUT_STR, device_ids=get_available_device_id())
cloud_ai_100_tokens = exec_info.generated_ids[0][
:, :gen_len
] # Because we always run for single input and single batch size
if prefill_only:
assert (ort_tokens[0][0] == cloud_ai_100_tokens[0][0]).all(), (
"prefill run output tokens don't match for ONNXRT output and Cloud AI 100 output."
)
assert (
ort_tokens[0][0] == cloud_ai_100_tokens[0][0]
).all(), "prefill run output tokens don't match for ONNXRT output and Cloud AI 100 output."
else:
assert (ort_tokens == cloud_ai_100_tokens).all(), (
"Tokens don't match for ONNXRT output and Cloud AI 100 output."
)
assert (
ort_tokens == cloud_ai_100_tokens
).all(), "Tokens don't match for ONNXRT output and Cloud AI 100 output."
assert os.path.isfile(os.path.join(os.path.dirname(qpc_path), "qconfig.json"))
if prefill_only is not None:
return
Expand All @@ -196,9 +193,6 @@ def check_causal_lm_pytorch_vs_kv_vs_ort_vs_ai100(
)
onnx_model_path = qeff_model.export()

if not get_available_device_id():
pytest.skip("No available devices to run model on Cloud AI 100")

# TODO: add prefill_only tests
qpc_path = qeff_model.compile(
prefill_seq_len=prompt_len,
Expand All @@ -211,7 +205,7 @@ def check_causal_lm_pytorch_vs_kv_vs_ort_vs_ai100(
enable_qnn=enable_qnn,
qnn_config=qnn_config,
)
exec_info_fbs = qeff_model.generate(tokenizer, prompts=fbs_prompts)
exec_info_fbs = qeff_model.generate(tokenizer, prompts=fbs_prompts, device_ids=get_available_device_id())

assert all(
[
Expand Down Expand Up @@ -247,9 +241,9 @@ def test_causal_lm_export_with_deprecated_api(model_name):
new_api_ort_tokens = api_runner.run_kv_model_on_ort(new_api_onnx_model_path)
old_api_ort_tokens = api_runner.run_kv_model_on_ort(old_api_onnx_model_path)

assert (new_api_ort_tokens == old_api_ort_tokens).all(), (
"New API output does not match old API output for ONNX export function"
)
assert (
new_api_ort_tokens == old_api_ort_tokens
).all(), "New API output does not match old API output for ONNX export function"


@pytest.mark.on_qaic
Expand Down
Loading
Loading