diff --git a/metaflow/cli.py b/metaflow/cli.py index 3a8dc4ecaa9..cdce10deca2 100644 --- a/metaflow/cli.py +++ b/metaflow/cli.py @@ -134,6 +134,8 @@ def config_merge_cb(ctx, param, value): "step": "metaflow.cli_components.step_cmd.step", "run": "metaflow.cli_components.run_cmds.run", "resume": "metaflow.cli_components.run_cmds.resume", + "spin": "metaflow.cli_components.run_cmds.spin", + "spin-internal": "metaflow.cli_components.step_cmd.spin_internal", }, ) def cli(ctx): @@ -384,7 +386,6 @@ def start( # second one processed will return the actual options. The order of processing # depends on what (and in what order) the user specifies on the command line. config_options = config_file or config_value - if ( hasattr(ctx, "saved_args") and ctx.saved_args @@ -462,14 +463,10 @@ def start( ctx.obj.event_logger = LOGGING_SIDECARS[event_logger]( flow=ctx.obj.flow, env=ctx.obj.environment ) - ctx.obj.event_logger.start() - _system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger) ctx.obj.monitor = MONITOR_SIDECARS[monitor]( flow=ctx.obj.flow, env=ctx.obj.environment ) - ctx.obj.monitor.start() - _system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor) ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0]( ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor @@ -485,6 +482,47 @@ def start( ctx.obj.config_options = config_options + # Override values for spin + if hasattr(ctx, "saved_args") and ctx.saved_args and ctx.saved_args[0] == "spin": + # For spin, we will only use the local metadata provider, datastore, environment + # and null event logger and monitor + ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "local"][0]( + ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor + ) + ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"]( + flow=ctx.obj.flow, env=ctx.obj.environment + ) + ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"]( + flow=ctx.obj.flow, env=ctx.obj.environment + ) + ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "local"][0] + datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config( + ctx.obj.echo + ) + ctx.obj.datastore_impl.datastore_root = datastore_root + + FlowDataStore.default_storage_impl = ctx.obj.datastore_impl + ctx.obj.flow_datastore = FlowDataStore( + ctx.obj.flow.name, + ctx.obj.environment, + ctx.obj.metadata, + ctx.obj.event_logger, + ctx.obj.monitor, + ) + echo( + "Using local metadata provider, datastore, environment, and null event logger and monitor for spin." + ) + print(f"Using metadata provider: {ctx.obj.metadata}") + echo(f"Using Datastore root: {datastore_root}") + echo(f"Using Flow Datastore: {ctx.obj.flow_datastore}") + + # Start event logger and monitor + ctx.obj.event_logger.start() + _system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger) + + ctx.obj.monitor.start() + _system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor) + decorators._init(ctx.obj.flow) # It is important to initialize flow decorators early as some of the @@ -528,7 +566,7 @@ def start( if ( hasattr(ctx, "saved_args") and ctx.saved_args - and ctx.saved_args[0] not in ("run", "resume") + and ctx.saved_args[0] not in ("run", "resume", "spin") ): # run/resume are special cases because they can add more decorators with --with, # so they have to take care of themselves. diff --git a/metaflow/cli_components/run_cmds.py b/metaflow/cli_components/run_cmds.py index bf77d16ad1f..fc873452d47 100644 --- a/metaflow/cli_components/run_cmds.py +++ b/metaflow/cli_components/run_cmds.py @@ -9,11 +9,11 @@ from ..graph import FlowGraph from ..metaflow_current import current from ..package import MetaflowPackage -from ..runtime import NativeRuntime +from ..runtime import NativeRuntime, SpinRuntime from ..system import _system_logger from ..tagging_util import validate_tags -from ..util import get_latest_run_id, write_latest_run_id +from ..util import get_latest_run_id, write_latest_run_id, get_latest_task_pathspec def before_run(obj, tags, decospecs): @@ -70,6 +70,28 @@ def write_file(file_path, content): f.write(str(content)) +def common_runner_options(func): + @click.option( + "--run-id-file", + default=None, + show_default=True, + type=str, + help="Write the ID of this run to the file specified.", + ) + @click.option( + "--runner-attribute-file", + default=None, + show_default=True, + type=str, + help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.", + ) + @wraps(func) + def wrapper(*args, **kwargs): + return func(*args, **kwargs) + + return wrapper + + def common_run_options(func): @click.option( "--tag", @@ -110,20 +132,6 @@ def common_run_options(func): "option multiple times to attach multiple decorators " "in steps.", ) - @click.option( - "--run-id-file", - default=None, - show_default=True, - type=str, - help="Write the ID of this run to the file specified.", - ) - @click.option( - "--runner-attribute-file", - default=None, - show_default=True, - type=str, - help="Write the metadata and pathspec of this run to the file specified. Used internally for Metaflow's Runner API.", - ) @wraps(func) def wrapper(*args, **kwargs): return func(*args, **kwargs) @@ -167,6 +175,7 @@ def wrapper(*args, **kwargs): @click.argument("step-to-rerun", required=False) @click.command(help="Resume execution of a previous run of this flow.") @common_run_options +@common_runner_options @click.pass_obj def resume( obj, @@ -285,6 +294,7 @@ def resume( @click.command(help="Run the workflow locally.") @tracing.cli_entrypoint("cli/run") @common_run_options +@common_runner_options @click.option( "--namespace", "user_namespace", @@ -360,3 +370,58 @@ def run( f, ) runtime.execute() + + +@click.command(help="Spins up a step locally") +@click.argument( + "step-name", + required=True, + type=str, +) +@click.option( + "--task-pathspec", + default=None, + show_default=True, + help="Task ID to use when spinning up the step. The spinned up step will use the artifacts" + "corresponding to this task ID. If not provided, an arbitrary task ID from the latest run will be used.", +) +@common_runner_options +@click.pass_obj +def spin( + obj, + step_name, + task_pathspec=None, + run_id_file=None, + runner_attribute_file=None, + **kwargs +): + before_run(obj, [], []) + if task_pathspec is None: + task_pathspec = get_latest_task_pathspec(obj.flow.name, step_name) + + obj.echo( + f"Spinning up step *{step_name}* locally with task pathspec *{task_pathspec}*" + ) + obj.flow._set_constants(obj.graph, kwargs, obj.config_options) + step_func = getattr(obj.flow, step_name) + + spin_runtime = SpinRuntime( + obj.flow, + obj.graph, + obj.flow_datastore, + obj.metadata, + obj.environment, + obj.package, + obj.logger, + obj.entrypoint, + obj.event_logger, + obj.monitor, + step_func, + task_pathspec, + ) + + # write_latest_run_id(obj, runtime.run_id) + # write_file(run_id_file, runtime.run_id) + + spin_runtime.execute() + pass diff --git a/metaflow/cli_components/step_cmd.py b/metaflow/cli_components/step_cmd.py index 4b40c9e5e54..376937a4298 100644 --- a/metaflow/cli_components/step_cmd.py +++ b/metaflow/cli_components/step_cmd.py @@ -174,3 +174,78 @@ def step( ) echo("Success", fg="green", bold=True, indent=True) + + +@click.command(help="Internal command to spin a single task.", hidden=True) +@click.argument("step-name") +@click.option( + "--run-id", + default=None, + required=True, + help="Run ID for the step that's about to be spun", +) +@click.option( + "--task-id", + default=None, + required=True, + help="Task ID for the step that's about to be spun", +) +@click.option( + "--input-paths", + help="A comma-separated list of pathspecs specifying inputs for this step.", +) +@click.option( + "--split-index", + type=int, + default=None, + show_default=True, + help="Index of this foreach split.", +) +@click.option( + "--retry-count", + default=0, + help="How many times we have attempted to run this task.", +) +@click.option( + "--max-user-code-retries", + default=0, + help="How many times we should attempt running the user code.", +) +@click.option( + "--namespace", + "namespace", + default=None, + help="Change namespace from the default (your username) to the specified tag.", +) +@click.pass_context +def spin_internal( + ctx, + step_name, + run_id=None, + task_id=None, + input_paths=None, + split_index=None, + retry_count=None, + max_user_code_retries=None, + namespace=None, +): + if ctx.obj.is_quiet: + echo = echo_dev_null + else: + echo = echo_always + print("I am here 1") + print("I am here 2") + # echo("Spinning a task, *%s*" % step_name, fg="magenta", bold=False) + + task = MetaflowTask( + ctx.obj.flow, + ctx.obj.flow_datastore, # local datastore + ctx.obj.metadata, # local metadata provider + ctx.obj.environment, # local environment + ctx.obj.echo, + ctx.obj.event_logger, # null logger + ctx.obj.monitor, # null monitor + None, # no unbounded foreach context + ) + # echo("Task is: ", task) + # pass diff --git a/metaflow/metaflow_config.py b/metaflow/metaflow_config.py index 415a934cbe4..56be74c9592 100644 --- a/metaflow/metaflow_config.py +++ b/metaflow/metaflow_config.py @@ -47,6 +47,14 @@ "DEFAULT_FROM_DEPLOYMENT_IMPL", "argo-workflows" ) +### +# Spin configuration +### +SPIN_ALLOWED_DECORATORS = from_conf( + "SPIN_ALLOWED_DECORATORS", ["conda", "pypi", "environment"] +) + + ### # User configuration ### diff --git a/metaflow/plugins/pypi/conda_decorator.py b/metaflow/plugins/pypi/conda_decorator.py index b1b7ee833d9..f43a68425dc 100644 --- a/metaflow/plugins/pypi/conda_decorator.py +++ b/metaflow/plugins/pypi/conda_decorator.py @@ -287,6 +287,7 @@ def task_pre_step( def runtime_step_cli( self, cli_args, retry_count, max_user_code_retries, ubf_context ): + print("Let's go - I am here") if self.disabled: return # Ensure local installation of Metaflow is visible to user code diff --git a/metaflow/runtime.py b/metaflow/runtime.py index 7e9269841fb..c446266be32 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -73,6 +73,220 @@ # TODO option: output dot graph periodically about execution +class SpinRuntime(object): + def __init__( + self, + flow, + graph, + flow_datastore, + metadata, + environment, + package, + logger, + entrypoint, + event_logger, + monitor, + step_func, + task_pathspec, + max_log_size=MAX_LOG_SIZE, + ): + from metaflow import Task + + self._flow = flow + self._graph = graph + self._flow_datastore = flow_datastore + self._metadata = metadata + self._environment = environment + self._package = package + self._logger = logger + self._entrypoint = entrypoint + self._event_logger = event_logger + self._monitor = monitor + self._args = args or [] + self._kwargs = kwargs or {} + + self._step_func = step_func + self._task_pathspec = task_pathspec + self._prev_task = Task(self._task_pathspec, _namespace_check=False) + self._input_paths = None + self._split_index = None + self._whitelist_decorators = None + self._config_file_name = None + self._max_log_size = max_log_size + self._run_queue = [] + self._poll = procpoll.make_poll() + self._workers = {} # fd -> subprocess mapping + self._finished = {} + + # Create a new run_id for the spin task + self._run_id = self._metadata.new_run_id() + print( + f"New run_id for spin task: {self._run_id} and step func: {self._step_func.name}" + ) + + for deco in self._step_func.decorators: + deco.runtime_init(flow, graph, package, self._run_id) + + print(f"Input paths: {self.input_paths}") + + @property + def split_index(self): + if self._split_index: + return self._split_index + foreach_indices = self._prev_task.metadata_dict.get("foreach-indices", []) + self._split_index = foreach_indices[-1] if foreach_indices else None + return self._split_index + + @property + def input_paths(self): + def _format_input_paths(task_id): + _, run_id, step_name, task_id = task_id.split("/") + return f"{run_id}/{step_name}/{task_id}" + + if self._input_paths: + return self._input_paths + + if self._step_func.name == "start": + from metaflow import Step + + flow_name, run_id, _, _ = self._task_pathspec.split("/") + + step = Step(f"{flow_name}/{run_id}/_parameters", _namespace_check=False) + task = next(iter(step.tasks()), None) + if not task: + raise MetaflowException( + f"Task not found for {step} in the metadata store" + ) + self._input_paths = [f"{run_id}/_parameters/{task.id}"] + else: + ancestors = self._prev_task.immediate_ancestors + self._input_paths = [ + _format_input_paths(ancestor) + for i, ancestor in enumerate(chain.from_iterable(ancestors.values())) + ] + return self._input_paths + + @property + def whitelist_decorators(self): + if self._whitelist_decorators: + return self._whitelist_decorators + self._whitelist_decorators = [ + deco + for deco in self._step_func.decorators + if any(deco.name.startswith(prefix) for prefix in SPIN_ALLOWED_DECORATORS) + ] + return self._whitelist_decorators + + def _new_task(self, step, input_paths=None, **kwargs): + return Task( + self._flow_datastore, + self._flow, + step, + self._run_id, + self._metadata, + self._environment, + self._entrypoint, + self._event_logger, + self._monitor, + input_paths=self.input_paths, + decos=self.whitelist_decorators, + logger=self._logger, + split_index=self.split_index, + **kwargs, + ) + + def execute(self): + with tempfile.NamedTemporaryFile(mode="w", encoding="utf-8") as config_file: + # Configurations are passed through a file to avoid overloading the + # command-line. We only need to create this file once and it can be reused + # for any task launch + config_value = dump_config_values(self._flow) + if config_value: + json.dump(config_value, config_file) + config_file.flush() + self._config_file_name = config_file.name + else: + self._config_file_name = None + + task = self._new_task(self._step_func.name, {}) + _ds = self._flow_datastore.get_task_datastore( + self._run_id, self._step_func.name, task.task_id, attempt=0, mode="w" + ) + + for deco in self.whitelist_decorators: + deco.runtime_task_created( + _ds, + task.task_id, + self.split_index, + self.input_paths, + is_cloned=False, + ubf_context=None, + ) + + # Start a new worker to spin a step + worker = Worker(task, self._max_log_size, self._config_file_name, spin=True) + for fd in worker.fds(): + self._workers[fd] = worker + self._poll.add(fd) + + finished_tasks = list(self._poll_workers()) + try: + pass + except KeyboardInterrupt as ex: + self._logger("Workflow interrupted.", system_msg=True, bad=True) + self._killall() + exception = ex + raise + + except Exception as ex: + self._logger("Workflow failed.", system_msg=True, bad=True) + self._killall() + exception = ex + raise + finally: + # on finish clean tasks + for step in self._flow: + for deco in step.decorators: + deco.runtime_finished(exception) + + def _launch_spin(self): + args = CLIArgs(self.task, self.spin) + env = dict(os.environ) + + for deco in self.task.decos: + deco.runtime_step_cli( + args, + self.task.retries, + self.task.user_code_retries, + self.task.ubf_context, + ) + + # Add user configurations using a file to avoid using up too much space on the + # command line + if self._config_file_name: + args.top_level_options["local-config-file"] = self._config_file_name + + print(f"Args Entrypoint updated is {args.entrypoint}") + env.update(args.get_env()) + env["PYTHONUNBUFFERED"] = "x" + cmdline = args.get_args() + print(f"Command line is: {cmdline}") + + process = subprocess.Popen( + cmdline, + env=env, + bufsize=1, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Read and print subprocess output + stdout, stderr = process.communicate() + print(f"stdout: {stdout.decode()}") + print(f"stderr: {stderr.decode()}") + + class NativeRuntime(object): def __init__( self, @@ -1508,8 +1722,9 @@ class CLIArgs(object): for step execution in StepDecorator.runtime_step_cli(). """ - def __init__(self, task): + def __init__(self, task, spin=False): self.task = task + self.spin = spin self.entrypoint = list(task.entrypoint) self.top_level_options = { "quiet": True, @@ -1542,18 +1757,36 @@ def __init__(self, task): (k, ConfigInput.make_key_name(k)) for k in configs ] + if spin: + self.spin_args() + else: + self.default_args() + + def default_args(self): self.commands = ["step"] self.command_args = [self.task.step] self.command_options = { - "run-id": task.run_id, - "task-id": task.task_id, - "input-paths": compress_list(task.input_paths), - "split-index": task.split_index, - "retry-count": task.retries, - "max-user-code-retries": task.user_code_retries, - "tag": task.tags, + "run-id": self.task.run_id, + "task-id": self.task.task_id, + "input-paths": compress_list(self.task.input_paths), + "split-index": self.task.split_index, + "retry-count": self.task.retries, + "max-user-code-retries": self.task.user_code_retries, + "tag": self.task.tags, "namespace": get_namespace() or "", - "ubf-context": task.ubf_context, + "ubf-context": self.task.ubf_context, + } + self.env = {} + + def spin_args(self): + self.commands = ["spin-internal"] + self.command_args = [self.task.step] + + self.command_options = { + "run-id": self.task.run_id, + "task-id": self.task.task_id, + "input-paths": self.task.input_paths, + "split-index": self.task.split_index, } self.env = {} diff --git a/metaflow/util.py b/metaflow/util.py index cd3447d0e48..e9355e31ae1 100644 --- a/metaflow/util.py +++ b/metaflow/util.py @@ -9,6 +9,7 @@ from itertools import takewhile import re +from typing import Callable from metaflow.exception import MetaflowUnknownUser, MetaflowInternalError try: @@ -193,6 +194,49 @@ def get_latest_run_id(echo, flow_name): return None +def get_latest_task_pathspec(flow_name: str, step_name: str) -> str: + """ + Returns a task pathspec from the latest run of the flow for the queried step. + If the queried step has several tasks, the task pathspec of the first task is returned. + + Parameters + ---------- + flow_name : str + The name of the flow. + step_name : str + The name of the step. + + Returns + ------- + str + The task pathspec of the first task of the queried step. + + Raises + ------ + MetaflowNotFound + If no task or run is found for the queried step. + """ + from metaflow import Flow, Step + from metaflow.exception import MetaflowNotFound + + run = Flow(flow_name, _namespace_check=False).latest_run + + if run is None: + raise MetaflowNotFound(f"No run found for the flow {flow_name}") + + try: + step = Step(f"{flow_name}/{run.id}/{step_name}", _namespace_check=False) + except Exception: + raise MetaflowNotFound( + f"No step *{step_name}* found in run *{run.id}* for flow *{flow_name}*" + ) + + task = next(iter(step.tasks()), None) + if task: + return f"{flow_name}/{run.id}/{step_name}/{task.id}" + raise MetaflowNotFound(f"No task found for the queried step {query_step}") + + def write_latest_run_id(obj, run_id): from metaflow.plugins.datastores.local_storage import LocalStorage