diff --git a/metaflow/runtime.py b/metaflow/runtime.py index c446266be32..6f32aba499c 100644 --- a/metaflow/runtime.py +++ b/metaflow/runtime.py @@ -19,12 +19,15 @@ from functools import partial from concurrent import futures + from metaflow.datastore.exceptions import DataException +from itertools import chain from contextlib import contextmanager from . import get_namespace from .metadata_provider import MetaDatum from .metaflow_config import MAX_ATTEMPTS, UI_URL +from .metaflow_config import SPIN_ALLOWED_DECORATORS from .exception import ( MetaflowException, MetaflowInternalError, @@ -102,8 +105,6 @@ def __init__( self._entrypoint = entrypoint self._event_logger = event_logger self._monitor = monitor - self._args = args or [] - self._kwargs = kwargs or {} self._step_func = step_func self._task_pathspec = task_pathspec @@ -113,10 +114,6 @@ def __init__( self._whitelist_decorators = None self._config_file_name = None self._max_log_size = max_log_size - self._run_queue = [] - self._poll = procpoll.make_poll() - self._workers = {} # fd -> subprocess mapping - self._finished = {} # Create a new run_id for the spin task self._run_id = self._metadata.new_run_id() @@ -124,8 +121,18 @@ def __init__( f"New run_id for spin task: {self._run_id} and step func: {self._step_func.name}" ) + print( + f"Decorators for {self._step_func.name}: {list(self._step_func.decorators)}" + ) + for deco in self._step_func.decorators: + print( + f"Running runtime_init for {deco.__class__.__name__} at {self._step_func.name}" + ) + print("-" * 100) deco.runtime_init(flow, graph, package, self._run_id) + if hasattr(deco, "_metaflow_home"): + print(f"Metaflow home is {deco._metaflow_home}") print(f"Input paths: {self.input_paths}") @@ -208,83 +215,68 @@ def execute(self): else: self._config_file_name = None - task = self._new_task(self._step_func.name, {}) + self.task = self._new_task(self._step_func.name, {}) _ds = self._flow_datastore.get_task_datastore( - self._run_id, self._step_func.name, task.task_id, attempt=0, mode="w" + self._run_id, self._step_func.name, self.task.task_id, attempt=0, mode="w" ) for deco in self.whitelist_decorators: deco.runtime_task_created( _ds, - task.task_id, + self.task.task_id, self.split_index, self.input_paths, is_cloned=False, ubf_context=None, ) - # Start a new worker to spin a step - worker = Worker(task, self._max_log_size, self._config_file_name, spin=True) - for fd in worker.fds(): - self._workers[fd] = worker - self._poll.add(fd) + self.launch_spin() - finished_tasks = list(self._poll_workers()) - try: - pass - except KeyboardInterrupt as ex: - self._logger("Workflow interrupted.", system_msg=True, bad=True) - self._killall() - exception = ex - raise - - except Exception as ex: - self._logger("Workflow failed.", system_msg=True, bad=True) - self._killall() - exception = ex - raise - finally: - # on finish clean tasks - for step in self._flow: - for deco in step.decorators: - deco.runtime_finished(exception) - - def _launch_spin(self): - args = CLIArgs(self.task, self.spin) - env = dict(os.environ) + # Start a new worker to spin a step + # on finish clean tasks + exception = None + for deco in self.whitelist_decorators: + deco.runtime_finished(exception) - for deco in self.task.decos: - deco.runtime_step_cli( - args, - self.task.retries, - self.task.user_code_retries, - self.task.ubf_context, - ) + def launch_spin(self): + args = CLIArgs(self.task, spin=True) + env = dict(os.environ) - # Add user configurations using a file to avoid using up too much space on the - # command line - if self._config_file_name: - args.top_level_options["local-config-file"] = self._config_file_name - - print(f"Args Entrypoint updated is {args.entrypoint}") - env.update(args.get_env()) - env["PYTHONUNBUFFERED"] = "x" - cmdline = args.get_args() - print(f"Command line is: {cmdline}") - - process = subprocess.Popen( - cmdline, - env=env, - bufsize=1, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE, - stdout=subprocess.PIPE, + for deco in self.task.decos: + deco.runtime_step_cli( + args, + self.task.retries, + self.task.user_code_retries, + self.task.ubf_context, ) - # Read and print subprocess output - stdout, stderr = process.communicate() - print(f"stdout: {stdout.decode()}") - print(f"stderr: {stderr.decode()}") + # Add user configurations using a file to avoid using up too much space on the + # command line + if self._config_file_name: + args.top_level_options["local-config-file"] = self._config_file_name + + print(f"Args Entrypoint updated is {args.entrypoint}") + env.update(args.get_env()) + env["PYTHONUNBUFFERED"] = "x" + cmdline = args.get_args() + print(f"Command line is: {cmdline}") + + process = subprocess.Popen( + cmdline, + env=env, + bufsize=1, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + # Read and print subprocess output + stdout, stderr = process.communicate() + print("STDOUT:\n") + print(f"{stdout.decode()}") + print("-" * 100) + print("STDERR:\n") + print(f"stderr: {stderr.decode()}") class NativeRuntime(object):