-
Notifications
You must be signed in to change notification settings - Fork 789
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feature: fast initializer for conda environments #2226
base: master
Are you sure you want to change the base?
Changes from 4 commits
9555fe8
42d400f
013016a
51dcfc0
3ad7614
b41bf87
66222f7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,7 +14,7 @@ | |
from urllib.parse import unquote, urlparse | ||
|
||
from metaflow.exception import MetaflowException | ||
from metaflow.metaflow_config import get_pinned_conda_libs | ||
from metaflow.metaflow_config import get_pinned_conda_libs, CONDA_USE_FAST_INIT | ||
from metaflow.metaflow_environment import MetaflowEnvironment | ||
|
||
from . import MAGIC_FILE, _datastore_packageroot | ||
|
@@ -421,15 +421,16 @@ def bootstrap_commands(self, step_name, datastore_type): | |
# Bootstrap conda and execution environment for step | ||
step = next(step for step in self.flow if step.name == step_name) | ||
id_ = self.get_environment(step).get("id_") | ||
bootstrap_module_name = "fast_bootstrap" if CONDA_USE_FAST_INIT else "bootstrap" | ||
if id_: | ||
return [ | ||
"echo 'Bootstrapping virtual environment...'", | ||
# We have to prevent the tracing module from loading, | ||
# as the bootstrapping process uses the internal S3 client which would fail to import tracing | ||
# due to the required dependencies being bundled into the conda environment, | ||
# which is yet to be initialized at this point. | ||
'DISABLE_TRACING=True python -m metaflow.plugins.pypi.bootstrap "%s" %s "%s" linux-64' | ||
% (self.flow.name, id_, self.datastore_type), | ||
'DISABLE_TRACING=True python -m metaflow.plugins.pypi.%s "%s" %s "%s" linux-64' | ||
% (bootstrap_module_name, self.flow.name, id_, self.datastore_type), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit - can we maintain the singular - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done. |
||
"echo 'Environment bootstrapped.'", | ||
# To avoid having to install micromamba in the PATH in micromamba.py, we add it to the PATH here. | ||
"export PATH=$PATH:$(pwd)/micromamba/bin", | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
import gzip | ||
import json | ||
import os | ||
import shutil | ||
import subprocess | ||
import sys | ||
import time | ||
from urllib.error import URLError | ||
from urllib.request import urlopen | ||
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, CONDA_FAST_INIT_BIN_URL | ||
from metaflow.plugins import DATASTORES | ||
from metaflow.util import which | ||
from urllib.request import Request | ||
|
||
from . import MAGIC_FILE, _datastore_packageroot | ||
|
||
# Bootstraps a valid conda virtual environment composed of conda and pypi packages | ||
|
||
|
||
def timer(func): | ||
def wrapper(*args, **kwargs): | ||
start_time = time.time() | ||
result = func(*args, **kwargs) | ||
duration = time.time() - start_time | ||
# print(f"Time taken for {func.__name__}: {duration:.2f} seconds") | ||
return result | ||
|
||
return wrapper | ||
|
||
|
||
if __name__ == "__main__": | ||
# TODO: Detect architecture on the fly when dealing with arm architectures. | ||
def run_cmd(cmd, stdin_str): | ||
result = subprocess.run( | ||
cmd, | ||
shell=True, | ||
input=stdin_str, | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
text=True, | ||
) | ||
if result.returncode != 0: | ||
print(f"Bootstrap failed while executing: {cmd}") | ||
print("Stdout:", result.stdout) | ||
print("Stderr:", result.stderr) | ||
sys.exit(1) | ||
|
||
@timer | ||
def install_fast_initializer(architecture): | ||
fast_initializer_path = os.path.join( | ||
os.getcwd(), "fast-initializer", "bin", "fast-initializer" | ||
) | ||
|
||
if which("fast-initializer"): | ||
return which("fast-initializer") | ||
if os.path.exists(fast_initializer_path): | ||
os.environ["PATH"] += os.pathsep + os.path.dirname(fast_initializer_path) | ||
return fast_initializer_path | ||
|
||
# TODO: take architecture into account | ||
url = CONDA_FAST_INIT_BIN_URL | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With this change, does the TODO above still make sense? Or are we going to handle the architecture at a different level, by setting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point. The client needs to have knowledge of the target architecture either way as it is the one doing the solving for packages. The client also provides the architecture input for the bootstrap script, so we might as well keep the URL defining logic completely client-side. |
||
if url is None: | ||
raise Exception("URL for Binary is unset.") | ||
|
||
# Prepare directory once | ||
os.makedirs(os.path.dirname(fast_initializer_path), exist_ok=True) | ||
|
||
# Download and decompress in one go | ||
def _download_and_extract(url): | ||
headers = { | ||
"Accept-Encoding": "gzip, deflate, br", | ||
"Connection": "keep-alive", | ||
"User-Agent": "python-urllib", | ||
} | ||
|
||
max_retries = 3 | ||
for attempt in range(max_retries): | ||
try: | ||
req = Request(url, headers=headers) | ||
with urlopen(req) as response: | ||
with gzip.GzipFile(fileobj=response) as gz: | ||
with open(fast_initializer_path, "wb") as f: | ||
f.write(gz.read()) | ||
break | ||
except (URLError, IOError) as e: | ||
if attempt == max_retries - 1: | ||
raise Exception( | ||
f"Failed to download fast-initializer after {max_retries} attempts: {e}" | ||
) | ||
time.sleep(2**attempt) | ||
|
||
_download_and_extract(url) | ||
|
||
# Set executable permission | ||
os.chmod(fast_initializer_path, 0o755) | ||
|
||
# Update PATH only once at the end | ||
os.environ["PATH"] += os.pathsep + os.path.dirname(fast_initializer_path) | ||
return fast_initializer_path | ||
|
||
@timer | ||
def setup_environment(architecture, storage, env, prefix, pkgs_dir): | ||
install_fast_initializer(architecture) | ||
|
||
# Get package urls | ||
conda_pkgs = env["conda"] | ||
pypi_pkgs = env.get("pypi", []) | ||
conda_pkg_urls = [package["path"] for package in conda_pkgs] | ||
pypi_pkg_urls = [package["path"] for package in pypi_pkgs] | ||
|
||
# Create string with package URLs | ||
all_package_urls = "" | ||
for url in conda_pkg_urls: | ||
all_package_urls += f"{storage.datastore_root}/{url}\n" | ||
all_package_urls += "---\n" | ||
for url in pypi_pkg_urls: | ||
all_package_urls += f"{storage.datastore_root}/{url}\n" | ||
|
||
# Initialize environment | ||
cmd = f"fast-initializer --prefix {prefix} --packages-dir {pkgs_dir}" | ||
run_cmd(cmd, all_package_urls) | ||
|
||
if len(sys.argv) != 5: | ||
print( | ||
"Usage: fast_bootstrap.py <flow_name> <id> <datastore_type> <architecture>" | ||
) | ||
sys.exit(1) | ||
|
||
try: | ||
_, flow_name, id_, datastore_type, architecture = sys.argv | ||
|
||
prefix = os.path.join(os.getcwd(), architecture, id_) | ||
pkgs_dir = os.path.join(os.getcwd(), ".pkgs") | ||
manifest_dir = os.path.join(os.getcwd(), DATASTORE_LOCAL_DIR, flow_name) | ||
|
||
datastores = [d for d in DATASTORES if d.TYPE == datastore_type] | ||
if not datastores: | ||
print(f"No datastore found for type: {datastore_type}") | ||
sys.exit(1) | ||
|
||
storage = datastores[0]( | ||
_datastore_packageroot(datastores[0], lambda *args, **kwargs: None) | ||
) | ||
|
||
# Move MAGIC_FILE inside local datastore. | ||
os.makedirs(manifest_dir, exist_ok=True) | ||
shutil.move( | ||
os.path.join(os.getcwd(), MAGIC_FILE), | ||
os.path.join(manifest_dir, MAGIC_FILE), | ||
) | ||
with open(os.path.join(manifest_dir, MAGIC_FILE)) as f: | ||
env = json.load(f)[id_][architecture] | ||
|
||
setup_environment(architecture, storage, env, prefix, pkgs_dir) | ||
|
||
except Exception as e: | ||
print(f"Error: {str(e)}", file=sys.stderr) | ||
sys.exit(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove this config and introduce the path to actual binary in the
install_fast_initializer
method instead? also, if you can note some latency improvements in the PR, that will be great!