Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cli-refactor] Type and validate workspace_opts at entry point #27540

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
from dagster_webserver.app import create_app_from_workspace_process_context
from starlette.testclient import TestClient

from dagster._cli.workspace import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.test_utils import instance_for_test
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._utils import check_script, pushd, script_relative_path

PIPELINES_OR_ERROR_QUERY = """
Expand Down Expand Up @@ -99,10 +100,15 @@ def path_to_tutorial_file(path):
)


def load_dagster_webserver_for_workspace_cli_args(n_pipelines=1, **kwargs):
def load_dagster_webserver_for_workspace_cli_args(
n_pipelines=1, *, workspace_opts: WorkspaceOpts
):
with instance_for_test() as instance:
with get_workspace_process_context_from_kwargs(
instance, version="", read_only=False, kwargs=kwargs
with WorkspaceProcessContext(
instance,
version="",
read_only=False,
workspace_load_target=workspace_opts.to_load_target(),
) as workspace_process_context:
client = TestClient(
create_app_from_workspace_process_context(workspace_process_context)
Expand Down Expand Up @@ -132,7 +138,7 @@ def test_load_pipeline(
with pushd(path_to_tutorial_file(dirname)):
filepath = path_to_tutorial_file(os.path.join(dirname, filename))
load_dagster_webserver_for_workspace_cli_args(
python_file=(filepath,), fn_name=fn_name
workspace_opts=WorkspaceOpts(python_file=(filepath,), attribute=fn_name),
)


Expand Down
36 changes: 27 additions & 9 deletions python_modules/dagster-graphql/dagster_graphql/cli.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import asyncio
from collections.abc import Mapping
from io import TextIOWrapper
from typing import Optional
from urllib.parse import urljoin, urlparse

import click
import dagster._check as check
import dagster._seven as seven
import requests
from dagster._cli.utils import get_instance_for_cli, get_temporary_instance_for_cli
from dagster._cli.workspace import workspace_options
from dagster._cli.workspace.cli_target import (
WORKSPACE_TARGET_WARNING,
get_workspace_process_context_from_kwargs,
from dagster._cli.utils import (
assert_no_remaining_opts,
get_instance_for_cli,
get_temporary_instance_for_cli,
)
from dagster._cli.workspace import workspace_options
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING, WorkspaceOpts
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME
from dagster._utils.log import get_stack_trace_array
Expand Down Expand Up @@ -127,7 +129,6 @@ def execute_query_against_remote(host, query, variables):
}


@workspace_options
@click.command(
name="ui",
help=(
Expand Down Expand Up @@ -180,9 +181,23 @@ def execute_query_against_remote(host, query, variables):
@click.option(
"--ephemeral-instance",
is_flag=True,
default=False,
help="Use an ephemeral DagsterInstance instead of resolving via DAGSTER_HOME",
)
def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **kwargs):
@workspace_options
def ui(
text: Optional[str],
file: Optional[TextIOWrapper],
predefined: Optional[str],
variables: Optional[str],
remote: Optional[str],
output: Optional[str],
ephemeral_instance: bool,
**other_opts,
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

query = None
if text is not None and file is None and predefined is None:
query = text.strip("'\" \n\t")
Expand All @@ -203,8 +218,11 @@ def ui(text, file, predefined, variables, remote, output, ephemeral_instance, **
with (
get_temporary_instance_for_cli() if ephemeral_instance else get_instance_for_cli()
) as instance:
with get_workspace_process_context_from_kwargs(
instance, version=__version__, read_only=False, kwargs=kwargs
with WorkspaceProcessContext(
instance=instance,
version=__version__,
read_only=False,
workspace_load_target=workspace_opts.to_load_target(),
) as workspace_process_context:
execute_query_from_cli(
workspace_process_context,
Expand Down
19 changes: 11 additions & 8 deletions python_modules/dagster-webserver/dagster_webserver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
import dagster._check as check
import uvicorn
from dagster._annotations import deprecated
from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace import get_workspace_process_context_from_kwargs, workspace_options
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING
from dagster._cli.utils import assert_no_remaining_opts, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace import workspace_options
from dagster._cli.workspace.cli_target import WORKSPACE_TARGET_WARNING, WorkspaceOpts
from dagster._core.instance import InstanceRef
from dagster._core.telemetry import START_DAGSTER_WEBSERVER, log_action
from dagster._core.telemetry_upload import uploading_logging_thread
from dagster._core.workspace.context import IWorkspaceProcessContext
from dagster._core.workspace.context import IWorkspaceProcessContext, WorkspaceProcessContext
from dagster._serdes import deserialize_value
from dagster._utils import DEFAULT_WORKSPACE_YAML_FILENAME, find_free_port, is_port_in_use
from dagster._utils.log import configure_loggers
Expand Down Expand Up @@ -72,7 +72,6 @@ def create_dagster_webserver_cli():
"""
),
)
@workspace_options
@click.option(
"--host",
"-h",
Expand Down Expand Up @@ -178,6 +177,7 @@ def create_dagster_webserver_cli():
show_default=True,
)
@click.version_option(version=__version__, prog_name="dagster-webserver")
@workspace_options
def dagster_webserver(
host: str,
port: int,
Expand All @@ -192,8 +192,11 @@ def dagster_webserver(
code_server_log_level: str,
instance_ref: Optional[str],
live_data_poll_rate: int,
**kwargs: ClickArgValue,
**other_opts: object,
):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

if suppress_warnings:
os.environ["PYTHONWARNINGS"] = "ignore"

Expand All @@ -214,11 +217,11 @@ def dagster_webserver(
# Allow the instance components to change behavior in the context of a long running server process
instance.optimize_for_webserver(db_statement_timeout, db_pool_recycle)

with get_workspace_process_context_from_kwargs(
with WorkspaceProcessContext(
instance,
version=__version__,
read_only=read_only,
kwargs=kwargs,
workspace_load_target=workspace_opts.to_load_target(),
code_server_log_level=code_server_log_level,
) as workspace_process_context:
host_dagster_ui_with_workspace_process_context(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dagster._cli.workspace import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.test_utils import instance_for_test
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver import app
from starlette.testclient import TestClient

Expand Down Expand Up @@ -29,11 +30,13 @@
)
def test_smoke_app(gen_instance):
with gen_instance() as instance:
with get_workspace_process_context_from_kwargs(
with WorkspaceProcessContext(
instance,
version="",
read_only=False,
kwargs=dict(module_name=("dagster_webserver_tests.toy.bar_repo",), definition="bar"),
workspace_load_target=WorkspaceOpts(
module_name=("dagster_webserver_tests.toy.bar_repo",)
).to_load_target(),
) as workspace_process_context:
asgi_app = app.create_app_from_workspace_process_context(workspace_process_context)
client = TestClient(asgi_app)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
from dagster import DagsterInstance, __version__
from dagster._cli.workspace.cli_target import get_workspace_process_context_from_kwargs
from dagster._cli.workspace.cli_target import WorkspaceOpts
from dagster._core.workspace.context import WorkspaceProcessContext
from dagster_webserver.webserver import DagsterWebserver
from starlette.requests import Request
from starlette.responses import JSONResponse
Expand Down Expand Up @@ -29,11 +30,11 @@ def build_routes(self):

@pytest.fixture(scope="session")
def test_client(instance):
process_context = get_workspace_process_context_from_kwargs(
process_context = WorkspaceProcessContext(
instance=instance,
version=__version__,
read_only=False,
kwargs={"empty_workspace": True}, # pyright: ignore[reportArgumentType]
workspace_load_target=WorkspaceOpts(empty_workspace=True).to_load_target(),
)

app = TestDagsterWebserver(process_context).create_asgi_app(debug=True)
Expand Down
49 changes: 28 additions & 21 deletions python_modules/dagster/dagster/_cli/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,12 @@
import click

from dagster import __version__ as dagster_version
from dagster._cli.utils import ClickArgValue, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import get_workspace_from_kwargs, workspace_options
from dagster._cli.utils import assert_no_remaining_opts, get_possibly_temporary_instance_for_cli
from dagster._cli.workspace.cli_target import (
WorkspaceOpts,
get_workspace_from_cli_opts,
workspace_options,
)
from dagster._utils.log import configure_loggers


Expand All @@ -15,22 +19,6 @@ def definitions_cli():
"""Commands for working with Dagster definitions."""


@workspace_options
@click.option(
"--log-level",
help="Set the log level for dagster services.",
show_default=True,
default="info",
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
)
@click.option(
"--log-format",
type=click.Choice(["colored", "json", "rich"], case_sensitive=False),
show_default=True,
required=False,
default="colored",
help="Format of the logs for dagster services",
)
@definitions_cli.command(
name="validate",
help="""
Expand All @@ -47,7 +35,26 @@ def definitions_cli():
This command should be run in a Python environment where the `dagster` package is installed.
""",
)
def definitions_validate_command(log_level: str, log_format: str, **kwargs: ClickArgValue):
@click.option(
"--log-level",
help="Set the log level for dagster services.",
show_default=True,
default="info",
type=click.Choice(["critical", "error", "warning", "info", "debug"], case_sensitive=False),
)
@click.option(
"--log-format",
type=click.Choice(["colored", "json", "rich"], case_sensitive=False),
show_default=True,
required=False,
default="colored",
help="Format of the logs for dagster services",
)
@workspace_options
def definitions_validate_command(log_level: str, log_format: str, **other_opts: object):
workspace_opts = WorkspaceOpts.extract_from_cli_options(other_opts)
assert_no_remaining_opts(other_opts)

os.environ["DAGSTER_IS_DEFS_VALIDATION_CLI"] = "1"

configure_loggers(formatter=log_format, log_level=log_level.upper())
Expand All @@ -57,8 +64,8 @@ def definitions_validate_command(log_level: str, log_format: str, **kwargs: Clic
with get_possibly_temporary_instance_for_cli(
"dagster definitions validate", logger=logger
) as instance:
with get_workspace_from_kwargs(
instance=instance, version=dagster_version, kwargs=kwargs
with get_workspace_from_cli_opts(
instance=instance, version=dagster_version, workspace_opts=workspace_opts
) as workspace:
invalid = any(
entry
Expand Down
Loading