Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,20 +458,24 @@ def start(
# We can now set the the CONFIGS value in the flow properly. This will overwrite
# anything that may have been passed in by default and we will use exactly what
# the original flow had. Note that these are accessed through the parameter name
# We need to save the "plain-ness" flag to carry it over
config_plain_flags = {
k: v[1] for k, v in ctx.obj.flow._flow_state[FlowStateItems.CONFIGS].items()
}
ctx.obj.flow._flow_state[FlowStateItems.CONFIGS].clear()
d = ctx.obj.flow._flow_state[FlowStateItems.CONFIGS]
for param_name, var_name in zip(config_param_names, config_var_names):
val = param_ds[var_name]
debug.userconf_exec("Loaded config %s as: %s" % (param_name, val))
d[param_name] = val
d[param_name] = (val, config_plain_flags[param_name])

elif getattr(ctx.obj, "delayed_config_exception", None):
# If we are not doing a resume, any exception we had parsing configs needs to
# be raised. For resume, since we ignore those options, we ignore the error.
raise ctx.obj.delayed_config_exception

# Init all values in the flow mutators and then process them
for decorator in ctx.obj.flow._flow_state[FlowStateItems.FLOW_MUTATORS]:
for decorator in ctx.obj.flow._flow_mutators:
decorator.external_init()

new_cls = ctx.obj.flow._process_config_decorators(config_options)
Expand Down Expand Up @@ -592,8 +596,8 @@ def start(
ctx.obj.echo,
ctx.obj.flow_datastore,
{
k: ConfigValue(v) if v is not None else None
for k, v in ctx.obj.flow.__class__._flow_state[
k: v if plain_flag or v is None else ConfigValue(v)
for k, (v, plain_flag) in ctx.obj.flow.__class__._flow_state[
FlowStateItems.CONFIGS
].items()
},
Expand Down
2 changes: 1 addition & 1 deletion metaflow/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ def _init_step_decorators(
# and then the step level ones to maintain a consistent order with how
# other decorators are run.

for deco in cls._flow_state[FlowStateItems.FLOW_MUTATORS]:
for deco in cls._flow_mutators:
if isinstance(deco, FlowMutator):
inserted_by_value = [deco.decorator_name] + (deco.inserted_by or [])
mutable_flow = MutableFlow(
Expand Down
8 changes: 5 additions & 3 deletions metaflow/flowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ def _flow_decorators(self):
# Backward compatible method to access flow decorators
return self._flow_state[FlowStateItems.FLOW_DECORATORS]

@property
def _flow_mutators(self):
return self._flow_state[FlowStateItems.FLOW_MUTATORS]

@classmethod
def _check_parameters(cls, config_parameters=False):
seen = set()
Expand Down Expand Up @@ -478,9 +482,7 @@ def _set_constants(self, graph, kwargs, config_options):
seen.add(var)
if param.IS_CONFIG_PARAMETER:
# Use computed value if already evaluated, else get from config_options
val = param._computed_value or config_options.get(
param.name.replace("-", "_").lower()
)
val = param._computed_value or config_options.get(param.name)
else:
val = kwargs[param.name.replace("-", "_").lower()]
# Support for delayed evaluation of parameters.
Expand Down
2 changes: 1 addition & 1 deletion metaflow/includefile.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ def load_parameter(self, v):
# If a parser is specified, use it to parse the content
if self._parser is not None:
try:
return ConfigInput._call_parser(self._parser, content)
return ConfigInput._call_parser(self._parser, content, True)
except Exception as e:
raise MetaflowException(
"Failed to parse content in parameter '%s' using parser: %s"
Expand Down
2 changes: 1 addition & 1 deletion metaflow/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ def __init__(
help: Optional[str] = None,
required: Optional[bool] = None,
show_default: Optional[bool] = None,
**kwargs: Dict[str, Any]
**kwargs: Dict[str, Any],
):
self.name = name
self.kwargs = kwargs
Expand Down
2 changes: 1 addition & 1 deletion metaflow/runner/click_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ def _compute_flow_parameters(self):
# We ignore any errors if we don't check the configs in the click API.

# Init all values in the flow mutators and then process them
for decorator in self._flow_cls._flow_state[FlowStateItems.FLOW_MUTATORS]:
for decorator in self._flow_cls._flow_mutators:
decorator.external_init()

new_cls = self._flow_cls._process_config_decorators(
Expand Down
72 changes: 46 additions & 26 deletions metaflow/user_configs/config_options.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import json
import os

from collections.abc import Mapping
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from metaflow._vendor import click
Expand Down Expand Up @@ -157,7 +157,7 @@ def make_key_name(name: str) -> str:
# Special mark to indicate that the configuration value is not content or a file
# name but a value that should be read in the config file (effectively where
# the value has already been materialized).
return "kv." + name.lower()
return "kv." + name

@classmethod
def set_config_file(cls, config_file: str):
Expand Down Expand Up @@ -225,13 +225,13 @@ def process_configs(
# and is clearer
if param_name == "config_value":
self._value_values = {
k.lower(): v
k: v
for k, v in param_value.items()
if v is not None and not v.startswith(_CONVERTED_DEFAULT)
}
else:
self._path_values = {
k.lower(): v
k: v
for k, v in param_value.items()
if v is not None and not v.startswith(_CONVERTED_DEFAULT)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ def process_configs(
merged_configs = {}
# Now look at everything (including defaults)
for name, (val, is_path) in self._defaults.items():
n = name.lower()
n = name
if n in all_values:
# We have the value provided by the user -- use that.
merged_configs[n] = all_values[n]
Expand Down Expand Up @@ -331,7 +331,10 @@ def process_configs(
if val is None:
missing_configs.add(name)
to_return[name] = None
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][name] = None
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][name] = (
None,
True,
)
continue
if val.startswith(_CONVERTED_NO_FILE):
no_file.append(name)
Expand All @@ -340,13 +343,14 @@ def process_configs(
no_default_file.append(name)
continue

parser, is_plain = self._parsers[name]
val = val[len(_CONVERT_PREFIX) :] # Remove the _CONVERT_PREFIX
if val.startswith(_DEFAULT_PREFIX): # Remove the _DEFAULT_PREFIX if needed
val = val[len(_DEFAULT_PREFIX) :]
if val.startswith("kv."):
# This means to load it from a file
try:
read_value = self.get_config(val[3:])
read_value, read_is_plain = self.get_config(val[3:])
except KeyError as e:
exc = click.UsageError(
"Could not find configuration '%s' in INFO file" % val
Expand All @@ -355,15 +359,23 @@ def process_configs(
click_obj.delayed_config_exception = exc
return None
raise exc from e
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][
name
] = read_value
if read_is_plain != is_plain:
raise click.UsageError(
"Configuration '%s' mismatched `plain` attribute -- "
"this is a bug, please report it." % val[3:]
)
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][name] = (
read_value,
True if read_value is None else is_plain,
)
to_return[name] = (
ConfigValue(read_value) if read_value is not None else None
read_value
if read_value is None or is_plain
else ConfigValue(read_value)
)
else:
if self._parsers[name]:
read_value = self._call_parser(self._parsers[name], val)
if parser:
read_value = self._call_parser(parser, val, is_plain)
else:
try:
read_value = json.loads(val)
Expand All @@ -374,11 +386,14 @@ def process_configs(
)
continue
# TODO: Support YAML
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][
name
] = read_value
flow_cls._flow_state.self_data[FlowStateItems.CONFIGS][name] = (
read_value,
True if read_value is None else is_plain,
)
to_return[name] = (
ConfigValue(read_value) if read_value is not None else None
read_value
if read_value is None or is_plain
else ConfigValue(read_value)
)

reqs = missing_configs.intersection(self._req_configs)
Expand Down Expand Up @@ -423,7 +438,7 @@ def __repr__(self):
return "ConfigInput"

@staticmethod
def _call_parser(parser, val):
def _call_parser(parser, val, is_plain):
if isinstance(parser, str):
if len(parser) and parser[0] == ".":
parser = "metaflow" + parser
Expand All @@ -438,7 +453,13 @@ def _call_parser(parser, val):
"Parser %s is either not part of %s or not a callable"
% (func, path)
)
return parser(val)
return_value = parser(val)
if not is_plain and not isinstance(return_value, Mapping):
raise ValueError(
"Parser %s returned a value that is not a mapping (got type %s): %s"
% (str(parser), type(return_value), return_value)
)
return return_value


class LocalFileInput(click.Path):
Expand Down Expand Up @@ -474,23 +495,22 @@ def config_options_with_config_input(cmd):
# List all the configuration options
for arg in parameters[::-1]:
kwargs = arg.option_kwargs(False)
if arg.name.lower() in config_seen:
if arg.name in config_seen:
msg = (
"Multiple configurations use the same name '%s'. Note that names are "
"case-insensitive. Please change the "
"Multiple configurations use the same name '%s'. Please change the "
"names of some of your configurations" % arg.name
)
raise MetaflowException(msg)
config_seen.add(arg.name.lower())
config_seen.add(arg.name)
if kwargs["required"]:
required_names.append(arg.name)

defaults[arg.name.lower()] = (
defaults[arg.name] = (
arg.kwargs.get("default", None),
arg._default_is_file,
)
help_strs.append(" - %s: %s" % (arg.name.lower(), kwargs.get("help", "")))
parsers[arg.name.lower()] = arg.parser
help_strs.append(" - %s: %s" % (arg.name, kwargs.get("help", "")))
parsers[arg.name] = (arg.parser, arg.kwargs["plain"])

if not config_seen:
# No configurations -- don't add anything; we set it to False so that it
Expand Down
23 changes: 17 additions & 6 deletions metaflow/user_configs/config_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,10 @@ def __call__(self, ctx=None, deploy_time=False):
to_eval_expr,
self._globals or globals(),
{
k: ConfigValue(v) if v is not None else None
for k, v in flow_cls._flow_state[FlowStateItems.CONFIGS].items()
k: v if plain_flag or v is None else ConfigValue(v)
for k, (v, plain_flag) in flow_cls._flow_state[
FlowStateItems.CONFIGS
].items()
},
)
except NameError as e:
Expand Down Expand Up @@ -467,6 +469,13 @@ class Config(Parameter, collections.abc.Mapping):
If the name starts with a ".", it is assumed to be relative to "metaflow".
show_default : bool, default True
If True, show the default value in the help text.
plain : bool, default False
If True, the configuration value is just returned as is and not converted to
a ConfigValue. Use this is you just want to directly access your configuration.
Note that modifications are not persisted across steps (ie: ConfigValue prevents
modifications and raises and error -- if you have your own object, no error
is raised but no modifications are persisted). You can also use this to return
any arbitrary object (not just dictionary-like objects).
"""

IS_CONFIG_PARAMETER = True
Expand All @@ -485,6 +494,7 @@ def __init__(
help: Optional[str] = None,
required: Optional[bool] = None,
parser: Optional[Union[str, Callable[[str], Dict[Any, Any]]]] = None,
plain: Optional[bool] = False,
**kwargs: Dict[str, str]
):
if default is not None and default_value is not None:
Expand All @@ -494,6 +504,7 @@ def __init__(
)
self._default_is_file = default is not None
kwargs["default"] = default if default is not None else default_value
kwargs["plain"] = plain
super(Config, self).__init__(
name, required=required, help=help, type=str, **kwargs
)
Expand All @@ -507,20 +518,20 @@ def __init__(
self._delayed_evaluator = None

def load_parameter(self, v):
return ConfigValue(v) if v is not None else None
return v if v is None or self.kwargs["plain"] else ConfigValue(v)

def _store_value(self, v: Any) -> None:
self._computed_value = v

def _init_delayed_evaluator(self) -> None:
if self._delayed_evaluator is None:
self._delayed_evaluator = DelayEvaluator(self.name.lower())
self._delayed_evaluator = DelayEvaluator(self.name)

# Support <config>.<var> syntax
def __getattr__(self, name):
# Need to return a new DelayEvaluator everytime because the evaluator will
# contain the "path" (ie: .name) and can be further accessed.
return getattr(DelayEvaluator(self.name.lower()), name)
return getattr(DelayEvaluator(self.name), name)

# Next three methods are to implement mapping to support **<config> syntax. We
# need to be careful, however, to also support a regular `config["key"]` syntax
Expand All @@ -537,7 +548,7 @@ def __getitem__(self, key):
self._init_delayed_evaluator()
if isinstance(key, str) and key.startswith(UNPACK_KEY):
return self._delayed_evaluator[key]
return DelayEvaluator(self.name.lower())[key]
return DelayEvaluator(self.name)[key]


def resolve_delayed_evaluator(
Expand Down
6 changes: 4 additions & 2 deletions metaflow/user_decorators/mutable_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ def start(self):
from metaflow.flowspec import FlowStateItems

# When configs are parsed, they are loaded in _flow_state[FlowStateItems.CONFIGS]
for name, value in self._flow_cls._flow_state[FlowStateItems.CONFIGS].items():
r = name, ConfigValue(value) if value is not None else None
for name, (value, plain_flag) in self._flow_cls._flow_state[
FlowStateItems.CONFIGS
].items():
r = name, value if plain_flag or value is None else ConfigValue(value)
debug.userconf_exec("Mutable flow yielding config: %s" % str(r))
yield r

Expand Down