Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: fast initializer for conda environments #2226

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions metaflow/metaflow_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@
# should result in an appreciable speedup in flow environment initialization.
CONDA_DEPENDENCY_RESOLVER = from_conf("CONDA_DEPENDENCY_RESOLVER", "conda")

# Conda Fast init binary url
CONDA_FAST_INIT_BIN_URL = from_conf("CONDA_FAST_INIT_BINARY_URL", None)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remove this config and introduce the path to actual binary in the install_fast_initializer method instead? also, if you can note some latency improvements in the PR, that will be great!


# Default to not using fast init binary.
CONDA_USE_FAST_INIT = from_conf("CONDA_USE_FAST_INIT", False)

###
# Escape hatch configuration
###
Expand Down
97 changes: 91 additions & 6 deletions metaflow/plugins/pypi/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import time
from urllib.error import URLError
from urllib.request import urlopen
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR
from metaflow.metaflow_config import DATASTORE_LOCAL_DIR, CONDA_USE_FAST_INIT
from metaflow.plugins import DATASTORES
from metaflow.plugins.pypi.utils import MICROMAMBA_MIRROR_URL, MICROMAMBA_URL
from metaflow.util import which
Expand Down Expand Up @@ -58,16 +58,76 @@ def wrapper(*args, **kwargs):
# fi
# fi

def run_cmd(cmd):
def run_cmd(cmd, stdin_str=None):
result = subprocess.run(
cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True
cmd,
shell=True,
input=stdin_str,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
if result.returncode != 0:
print(f"Bootstrap failed while executing: {cmd}")
print("Stdout:", result.stdout)
print("Stderr:", result.stderr)
sys.exit(1)

@timer
def install_fast_initializer(architecture):
import gzip
from metaflow.metaflow_config import CONDA_FAST_INIT_BIN_URL

fast_initializer_path = os.path.join(
os.getcwd(), "fast-initializer", "bin", "fast-initializer"
)

if which("fast-initializer"):
return which("fast-initializer")
if os.path.exists(fast_initializer_path):
os.environ["PATH"] += os.pathsep + os.path.dirname(fast_initializer_path)
return fast_initializer_path

url = CONDA_FAST_INIT_BIN_URL
if url is None:
raise Exception("URL for Binary is unset.")

# Prepare directory once
os.makedirs(os.path.dirname(fast_initializer_path), exist_ok=True)

# Download and decompress in one go
def _download_and_extract(url):
headers = {
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"User-Agent": "python-urllib",
}

max_retries = 3
for attempt in range(max_retries):
try:
req = Request(url, headers=headers)
with urlopen(req) as response:
with gzip.GzipFile(fileobj=response) as gz:
with open(fast_initializer_path, "wb") as f:
f.write(gz.read())
break
except (URLError, IOError) as e:
if attempt == max_retries - 1:
raise Exception(
f"Failed to download fast-initializer after {max_retries} attempts: {e}"
)
time.sleep(2**attempt)

_download_and_extract(url)

# Set executable permission
os.chmod(fast_initializer_path, 0o755)

# Update PATH only once at the end
os.environ["PATH"] += os.pathsep + os.path.dirname(fast_initializer_path)
return fast_initializer_path

@timer
def install_micromamba(architecture):
micromamba_dir = os.path.join(os.getcwd(), "micromamba")
Expand Down Expand Up @@ -268,6 +328,28 @@ def setup_environment(
# wait for conda environment to be created
futures["conda_env"].result()

@timer
def fast_setup_environment(architecture, storage, env, prefix, pkgs_dir):
install_fast_initializer(architecture)

# Get package urls
conda_pkgs = env["conda"]
pypi_pkgs = env.get("pypi", [])
conda_pkg_urls = [package["path"] for package in conda_pkgs]
pypi_pkg_urls = [package["path"] for package in pypi_pkgs]

# Create string with package URLs
all_package_urls = ""
for url in conda_pkg_urls:
all_package_urls += f"{storage.datastore_root}/{url}\n"
all_package_urls += "---\n"
for url in pypi_pkg_urls:
all_package_urls += f"{storage.datastore_root}/{url}\n"

# Initialize environment
cmd = f"fast-initializer --prefix {prefix} --packages-dir {pkgs_dir}"
run_cmd(cmd, all_package_urls)

if len(sys.argv) != 5:
print("Usage: bootstrap.py <flow_name> <id> <datastore_type> <architecture>")
sys.exit(1)
Expand Down Expand Up @@ -299,9 +381,12 @@ def setup_environment(
with open(os.path.join(manifest_dir, MAGIC_FILE)) as f:
env = json.load(f)[id_][architecture]

setup_environment(
architecture, storage, env, prefix, conda_pkgs_dir, pypi_pkgs_dir
)
if CONDA_USE_FAST_INIT:
fast_setup_environment(architecture, storage, env, prefix, pkgs_dir)
else:
setup_environment(
architecture, storage, env, prefix, conda_pkgs_dir, pypi_pkgs_dir
)

except Exception as e:
print(f"Error: {str(e)}", file=sys.stderr)
Expand Down
25 changes: 16 additions & 9 deletions metaflow/plugins/pypi/micromamba.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, logger=None):
_home = os.environ.get("METAFLOW_TOKEN_HOME")
else:
_home = os.environ.get("METAFLOW_HOME", "~/.metaflowconfig")
_path_to_hidden_micromamba = os.path.join(
self._path_to_hidden_micromamba = os.path.join(
os.path.expanduser(_home),
"micromamba",
)
Expand All @@ -47,19 +47,26 @@ def __init__(self, logger=None):
else:
self.logger = lambda *args, **kwargs: None # No-op logger if not provided

self.bin = (
self._bin = (
which(os.environ.get("METAFLOW_PATH_TO_MICROMAMBA") or "micromamba")
or which("./micromamba") # to support remote execution
or which("./bin/micromamba")
or which(os.path.join(_path_to_hidden_micromamba, "bin/micromamba"))
or which(os.path.join(self._path_to_hidden_micromamba, "bin/micromamba"))
)
if self.bin is None:
# Install Micromamba on the fly.
# TODO: Make this optional at some point.
_install_micromamba(_path_to_hidden_micromamba)
self.bin = which(os.path.join(_path_to_hidden_micromamba, "bin/micromamba"))

if self.bin is None:
@property
def bin(self):
"Defer installing Micromamba until when the binary path is actually requested"
if self._bin is not None:
return self._bin
# Install Micromamba on the fly.
# TODO: Make this optional at some point.
_install_micromamba(self._path_to_hidden_micromamba)
self._bin = which(
os.path.join(self._path_to_hidden_micromamba, "bin/micromamba")
)

if self._bin is None:
msg = "No installation for *Micromamba* found.\n"
msg += "Visit https://mamba.readthedocs.io/en/latest/micromamba-installation.html for installation instructions."
raise MetaflowException(msg)
Expand Down
Loading