Skip to content

Commit

Permalink
[components] Update component naming scheme
Browse files Browse the repository at this point in the history
  • Loading branch information
OwenKephart committed Feb 4, 2025
1 parent c814e99 commit 1f03c36
Show file tree
Hide file tree
Showing 33 changed files with 227 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ def list_component_types_command(ctx: click.Context) -> None:
registry = ComponentTypeRegistry.from_entry_point_discovery(
builtin_component_lib=builtin_component_lib
)
for key in sorted(registry.keys()):
package, name = key.rsplit(".", 1)
output[key] = ComponentTypeMetadata(
name=name,
package=package,
for key in sorted(registry.keys(), key=lambda k: k.to_string()):
output[key.to_string()] = ComponentTypeMetadata(
name=key.name,
package=key.package,
**registry.get(key).get_metadata(),
)
click.echo(json.dumps(output))
Expand All @@ -46,13 +45,11 @@ def list_local_component_types_command(component_directories: Sequence[str]) ->
output: dict = {}
for component_directory in component_directories:
output_for_directory = {}
for component_type in find_local_component_types(Path(component_directory)):
output_for_directory[f".{get_component_type_name(component_type)}"] = (
ComponentTypeMetadata(
name=get_component_type_name(component_type),
package=component_directory,
**component_type.get_metadata(),
)
for key, component_type in find_local_component_types(Path(component_directory)).items():
output_for_directory[key.to_string()] = ComponentTypeMetadata(
name=get_component_type_name(component_type),
package=component_directory,
**component_type.get_metadata(),
)
if len(output_for_directory) > 0:
output[component_directory] = output_for_directory
Expand All @@ -71,17 +68,14 @@ def list_all_components_schema_command(ctx: click.Context) -> None:
)

schemas = []
for key in sorted(registry.keys()):
component_type = registry.get(key)

for key, component_type in sorted(registry.items()):
# Create ComponentFileModel schema for each type
schema_type = component_type.get_schema()
key_string = key.to_string()
if schema_type:
schemas.append(
create_model(
key,
type=(Literal[key], key),
params=(schema_type, None),
key.name, type=(Literal[key_string], key_string), params=(schema_type, None)
)
)
union_type = Union[tuple(schemas)] # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pydantic import TypeAdapter

from dagster_components import ComponentTypeRegistry
from dagster_components.core.component import ComponentTypeKey
from dagster_components.scaffold import (
ComponentScaffolderUnavailableReason,
scaffold_component_instance,
Expand Down Expand Up @@ -32,10 +33,11 @@ def scaffold_component_command(
registry = ComponentTypeRegistry.from_entry_point_discovery(
builtin_component_lib=builtin_component_lib
)
if not registry.has(component_type):
component_key = ComponentTypeKey.from_string(component_type)
if not registry.has(component_key):
exit_with_error(f"No component type `{component_type}` could be resolved.")

component_type_cls = registry.get(component_type)
component_type_cls = registry.get(component_key)
if json_params:
scaffolder = component_type_cls.get_scaffolder()
if isinstance(scaffolder, ComponentScaffolderUnavailableReason):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from dagster import _check as check
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.errors import DagsterError
from dagster._record import record
from dagster._utils import snakecase
from pydantic import BaseModel
from typing_extensions import Self
Expand Down Expand Up @@ -113,6 +114,20 @@ class ComponentTypeMetadata(ComponentTypeInternalMetadata):
package: str


@record
class ComponentTypeKey:
name: str
package: str

def to_string(self) -> str:
return f"{self.name}@{self.package}"

@staticmethod
def from_string(s: str) -> "ComponentTypeKey":
name, package = s.split("@")
return ComponentTypeKey(name=name, package=package)


def get_entry_points_from_python_environment(group: str) -> Sequence[importlib.metadata.EntryPoint]:
if sys.version_info >= (3, 10):
return importlib.metadata.entry_points(group=group)
Expand Down Expand Up @@ -145,7 +160,7 @@ def from_entry_point_discovery(
`dagster_components*`. Only one built-in component library can be loaded at a time.
Defaults to `dagster_components`, the standard set of published component types.
"""
component_types: dict[str, type[Component]] = {}
component_types: dict[ComponentTypeKey, type[Component]] = {}
for entry_point in get_entry_points_from_python_environment(COMPONENTS_ENTRY_POINT_GROUP):
# Skip built-in entry points that are not the specified builtin component library.
if (
Expand All @@ -161,33 +176,35 @@ def from_entry_point_discovery(
f"Value expected to be a module, got {root_module}."
)
for component_type in get_registered_component_types_in_module(root_module):
key = f"{entry_point.name}.{get_component_type_name(component_type)}"
key = ComponentTypeKey(
name=get_component_type_name(component_type), package=entry_point.name
)
component_types[key] = component_type

return cls(component_types)

def __init__(self, component_types: dict[str, type[Component]]):
self._component_types: dict[str, type[Component]] = copy.copy(component_types)
def __init__(self, component_types: dict[ComponentTypeKey, type[Component]]):
self._component_types: dict[ComponentTypeKey, type[Component]] = copy.copy(component_types)

@staticmethod
def empty() -> "ComponentTypeRegistry":
return ComponentTypeRegistry({})

def register(self, name: str, component_type: type[Component]) -> None:
if name in self._component_types:
raise DagsterError(f"There is an existing component registered under {name}")
self._component_types[name] = component_type
def register(self, key: ComponentTypeKey, component_type: type[Component]) -> None:
if key in self._component_types:
raise DagsterError(f"There is an existing component registered under {key}")
self._component_types[key] = component_type

def has(self, name: str) -> bool:
return name in self._component_types
def has(self, key: ComponentTypeKey) -> bool:
return key in self._component_types

def get(self, name: str) -> type[Component]:
return self._component_types[name]
def get(self, key: ComponentTypeKey) -> type[Component]:
return self._component_types[key]

def keys(self) -> Iterable[str]:
def keys(self) -> Iterable[ComponentTypeKey]:
return self._component_types.keys()

def items(self) -> Iterable[tuple[str, type[Component]]]:
def items(self) -> Iterable[tuple[ComponentTypeKey, type[Component]]]:
return self._component_types.items()

def __repr__(self) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Component,
ComponentDeclNode,
ComponentLoadContext,
ComponentTypeKey,
ComponentTypeRegistry,
get_component_type_name,
is_component_loader,
Expand All @@ -34,18 +35,26 @@ class ComponentFileModel(BaseModel):
T = TypeVar("T", bound=BaseModel)


def find_local_component_types(component_path: Path) -> list[type[Component]]:
"""Find all component types defined in a component directory."""
component_types = []
def find_local_component_types(component_path: Path) -> Mapping[ComponentTypeKey, type[Component]]:
"""Find all component types defined in a component directory, and their respective paths."""
component_types = {}
for py_file in component_path.glob("*.py"):
module_name = py_file.stem
for component_type in find_component_types_in_file(py_file):
component_types[
ComponentTypeKey(name=get_component_type_name(component_type), package=py_file.name)
] = component_type
return component_types

module = load_module_from_path(module_name, component_path / f"{module_name}.py")

for _name, obj in inspect.getmembers(module, inspect.isclass):
assert isinstance(obj, type)
if is_registered_component_type(obj):
component_types.append(obj)
def find_component_types_in_file(file_path: Path) -> list[type[Component]]:
"""Find all component types defined in a specific file."""
component_types = []
for _name, obj in inspect.getmembers(
load_module_from_path(file_path.stem, file_path), inspect.isclass
):
assert isinstance(obj, type)
if is_registered_component_type(obj):
component_types.append(obj)
return component_types


Expand Down Expand Up @@ -115,18 +124,16 @@ def from_path(path: Path) -> "YamlComponentDecl":

def get_component_type(self, registry: ComponentTypeRegistry) -> type[Component]:
parsed_defs = self.component_file_model
if parsed_defs.type.startswith("."):
component_registry_key = parsed_defs.type[1:]

for component_type in find_local_component_types(self.path):
if get_component_type_name(component_type) == component_registry_key:
key = ComponentTypeKey.from_string(parsed_defs.type)
if parsed_defs.type.endswith(".py"):
file = self.path / key.package
for component_type in find_component_types_in_file(file):
if get_component_type_name(component_type) == key.name:
return component_type

raise Exception(
f"Could not find component type {component_registry_key} in {self.path}"
)
raise Exception(f"Could not find component type {key.name} in {file}")

return registry.get(parsed_defs.type)
return registry.get(key)

def get_params(self, context: ComponentLoadContext, params_schema: type[T]) -> T:
with pushd(str(self.path)):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ def test_list_component_types_command():
result = json.loads(result.output)

assert list(result.keys()) == [
"dagster_components.test.all_metadata_empty_asset",
"dagster_components.test.complex_schema_asset",
"dagster_components.test.simple_asset",
"dagster_components.test.simple_pipes_script_asset",
"all_metadata_empty_asset@dagster_components.test",
"complex_schema_asset@dagster_components.test",
"simple_asset@dagster_components.test",
"simple_pipes_script_asset@dagster_components.test",
]

assert result["dagster_components.test.simple_asset"] == {
assert result["simple_asset@dagster_components.test"] == {
"name": "simple_asset",
"package": "dagster_components.test",
"summary": "A simple asset that returns a constant string value.",
Expand All @@ -77,7 +77,7 @@ def test_list_component_types_command():
"type": "object",
}

assert result["dagster_components.test.simple_pipes_script_asset"] == {
assert result["simple_pipes_script_asset@dagster_components.test"] == {
"name": "simple_pipes_script_asset",
"package": "dagster_components.test",
"summary": "A simple asset that runs a Python script with the Pipes subprocess client.",
Expand Down Expand Up @@ -114,7 +114,7 @@ def test_list_local_components_types() -> None:
assert len(result) == 1
assert set(result.keys()) == {"my_location/components/local_component_sample"}
assert set(result["my_location/components/local_component_sample"].keys()) == {
".my_component"
"my_component@__init__.py"
}

# Add a second directory and local component
Expand Down Expand Up @@ -181,11 +181,11 @@ def test_all_components_schema_command():
assert "type" in component_type_schema_def["properties"]
assert (
component_type_schema_def["properties"]["type"]["default"]
== f"dagster_components.test.{component_type_key}"
== f"{component_type_key}@dagster_components.test"
)
assert (
component_type_schema_def["properties"]["type"]["const"]
== f"dagster_components.test.{component_type_key}"
== f"{component_type_key}@dagster_components.test"
)


Expand All @@ -200,7 +200,7 @@ def test_scaffold_component_command():
"dagster_components.test",
"scaffold",
"component",
"dagster_components.test.simple_pipes_script_asset",
"simple_pipes_script_asset@dagster_components.test",
"bar/components/qux",
"--json-params",
'{"asset_key": "my_asset", "filename": "my_asset.py"}',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.dbt_project
type: dbt_project@dagster_components

params:
dbt:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.dbt_project
type: dbt_project@dagster_components

params:
dbt:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.pipes_subprocess_script_collection
type: pipes_subprocess_script_collection@dagster_components

params:
scripts:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.sling_replication_collection
type: sling_replication_collection@dagster_components

params:
replications:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.dbt_project
type: dbt_project@dagster_components

params:
dbt:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from dagster._core.definitions.definitions_class import Definitions
from dagster_components.core.component import (
ComponentTypeKey,
ComponentTypeRegistry,
get_component_type_name,
get_registered_component_types_in_module,
Expand Down Expand Up @@ -30,6 +31,9 @@ def load_test_component_project_registry(include_test: bool = False) -> Componen
dc_module = importlib.import_module(package_name)

for component in get_registered_component_types_in_module(dc_module):
key = f"dagster_components.{'test.' if package_name.endswith('test') else ''}{get_component_type_name(component)}"
key = ComponentTypeKey(
name=get_component_type_name(component),
package=f"dagster_components{'.test' if package_name.endswith('test') else ''}",
)
components[key] = component
return ComponentTypeRegistry(components)
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
type: dagster_components.definitions
type: definitions@dagster_components

params: {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.definitions
type: definitions@dagster_components

params:
definitions_path: some_file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component
type: my_component@__init__.py

params:
a_string: "a string"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: dagster_components.definitions
type: definitions@dagster_components

params:
definitions_path: {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component
type: my_component@__init__.py

params:
a_string: "a string"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component
type: my_component@__init__.py

params:
a_string: "test"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component_does_not_exist
type: my_component_does_not_exist@__init__.py

params:
a_string: "test"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component
type: my_component@__init__.py

params:
a_string: "test"
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
type: .my_component
type: my_component@__init__.py

params:
a_string: "a string"
Expand Down
Loading

0 comments on commit 1f03c36

Please sign in to comment.