Skip to content

Commit c969cf0

Browse files
committed
dry run
1 parent 417599d commit c969cf0

File tree

10 files changed

+245
-95
lines changed

10 files changed

+245
-95
lines changed

docs/declarative-pipelines-programming-guide.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ The `spark-pipelines init` command, described below, makes it easy to generate a
9494

9595
## The `spark-pipelines` Command Line Interface
9696

97-
The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project.
97+
The `spark-pipelines` command line interface (CLI) is the primary way to execute a pipeline. It also contains an `init` subcommand for generating a pipeline project and a `dry-run` subcommand for validating a pipeline.
9898

9999
`spark-pipelines` is built on top of `spark-submit`, meaning that it supports all cluster managers supported by `spark-submit`. It supports all `spark-submit` arguments except for `--class`.
100100

@@ -106,6 +106,13 @@ The `spark-pipelines` command line interface (CLI) is the primary way to execute
106106

107107
`spark-pipelines run` launches an execution of a pipeline and monitors its progress until it completes. The `--spec` parameter allows selecting the pipeline spec file. If not provided, the CLI will look in the current directory and parent directories for a file named `pipeline.yml` or `pipeline.yaml`.
108108

109+
### `spark-pipelines dry-run`
110+
111+
`spark-pipelines dry-run` launches an execution of a pipeline that doesn't write or rad any data, but catches many kinds of errors that would be caught if the pipeline were to actually run. E.g.
112+
- Syntax errors – e.g. invalid Python or SQL code
113+
- Analysis errors – e.g. selecting from a table that doesn't exist or selecting a column that doesn't exist
114+
- Graph validation errors - e.g. cyclic dependencies
115+
109116
## Programming with SDP in Python
110117

111118
SDP Python functions are defined in the `pyspark.pipelines` module. Your pipelines implemented with the Python API must import this module. It's common to alias the module to `sdp` to limit the number of characters you need to type when using its APIs.

python/pyspark/pipelines/cli.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ def change_dir(path: Path) -> Generator[None, None, None]:
217217
os.chdir(prev)
218218

219219

220-
def run(spec_path: Path) -> None:
220+
def run(spec_path: Path, dry: bool) -> None:
221221
"""Run the pipeline defined with the given spec."""
222222
log_with_curr_timestamp(f"Loading pipeline spec from {spec_path}...")
223223
spec = load_pipeline_spec(spec_path)
@@ -242,7 +242,7 @@ def run(spec_path: Path) -> None:
242242
register_definitions(spec_path, registry, spec)
243243

244244
log_with_curr_timestamp("Starting run...")
245-
result_iter = start_run(spark, dataflow_graph_id)
245+
result_iter = start_run(spark, dataflow_graph_id, dry=dry)
246246
try:
247247
handle_pipeline_events(result_iter)
248248
finally:
@@ -257,6 +257,13 @@ def run(spec_path: Path) -> None:
257257
run_parser = subparsers.add_parser("run", help="Run a pipeline.")
258258
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
259259

260+
# "dry-run" subcommand
261+
run_parser = subparsers.add_parser(
262+
"dry-run",
263+
help="Launch a run that just validates the graph and checks for errors.",
264+
)
265+
run_parser.add_argument("--spec", help="Path to the pipeline spec.")
266+
260267
# "init" subcommand
261268
init_parser = subparsers.add_parser(
262269
"init",
@@ -270,9 +277,9 @@ def run(spec_path: Path) -> None:
270277
)
271278

272279
args = parser.parse_args()
273-
assert args.command in ["run", "init"]
280+
assert args.command in ["run", "dry-run", "init"]
274281

275-
if args.command == "run":
282+
if args.command in ["run", "dry-run"]:
276283
if args.spec is not None:
277284
spec_path = Path(args.spec)
278285
if not spec_path.is_file():
@@ -283,6 +290,6 @@ def run(spec_path: Path) -> None:
283290
else:
284291
spec_path = find_pipeline_spec(Path.cwd())
285292

286-
run(spec_path=spec_path)
293+
run(spec_path=spec_path, dry=(args.command == "dry-run"))
287294
elif args.command == "init":
288295
init(args.name)

python/pyspark/pipelines/spark_connect_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,12 @@ def handle_pipeline_events(iter: Iterator[Dict[str, Any]]) -> None:
6565
log_with_provided_timestamp(event.message, dt)
6666

6767

68-
def start_run(spark: SparkSession, dataflow_graph_id: str) -> Iterator[Dict[str, Any]]:
68+
def start_run(spark: SparkSession, dataflow_graph_id: str, dry: bool) -> Iterator[Dict[str, Any]]:
6969
"""Start a run of the dataflow graph in the Spark Connect server.
7070
7171
:param dataflow_graph_id: The ID of the dataflow graph to start.
7272
"""
73-
inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id)
73+
inner_command = pb2.PipelineCommand.StartRun(dataflow_graph_id=dataflow_graph_id, dry=dry)
7474
command = pb2.Command()
7575
command.pipeline_command.start_run.CopyFrom(inner_command)
7676
# Cast because mypy seems to think `spark`` is a function, not an object. Likely related to

python/pyspark/sql/connect/proto/pipelines_pb2.py

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141

4242
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
43-
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\xf2\x12\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xc8\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x07\n\x05_once\x1aQ\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
43+
b'\n\x1dspark/connect/pipelines.proto\x12\rspark.connect\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"\x91\x13\n\x0fPipelineCommand\x12h\n\x15\x63reate_dataflow_graph\x18\x01 \x01(\x0b\x32\x32.spark.connect.PipelineCommand.CreateDataflowGraphH\x00R\x13\x63reateDataflowGraph\x12U\n\x0e\x64\x65\x66ine_dataset\x18\x02 \x01(\x0b\x32,.spark.connect.PipelineCommand.DefineDatasetH\x00R\rdefineDataset\x12L\n\x0b\x64\x65\x66ine_flow\x18\x03 \x01(\x0b\x32).spark.connect.PipelineCommand.DefineFlowH\x00R\ndefineFlow\x12\x62\n\x13\x64rop_dataflow_graph\x18\x04 \x01(\x0b\x32\x30.spark.connect.PipelineCommand.DropDataflowGraphH\x00R\x11\x64ropDataflowGraph\x12\x46\n\tstart_run\x18\x05 \x01(\x0b\x32\'.spark.connect.PipelineCommand.StartRunH\x00R\x08startRun\x12r\n\x19\x64\x65\x66ine_sql_graph_elements\x18\x06 \x01(\x0b\x32\x35.spark.connect.PipelineCommand.DefineSqlGraphElementsH\x00R\x16\x64\x65\x66ineSqlGraphElements\x1a\x87\x03\n\x13\x43reateDataflowGraph\x12,\n\x0f\x64\x65\x66\x61ult_catalog\x18\x01 \x01(\tH\x00R\x0e\x64\x65\x66\x61ultCatalog\x88\x01\x01\x12.\n\x10\x64\x65\x66\x61ult_database\x18\x02 \x01(\tH\x01R\x0f\x64\x65\x66\x61ultDatabase\x88\x01\x01\x12Z\n\x08sql_conf\x18\x05 \x03(\x0b\x32?.spark.connect.PipelineCommand.CreateDataflowGraph.SqlConfEntryR\x07sqlConf\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1aQ\n\x08Response\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x12\n\x10_default_catalogB\x13\n\x11_default_database\x1aZ\n\x11\x44ropDataflowGraph\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_id\x1a\xd1\x04\n\rDefineDataset\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12&\n\x0c\x64\x61taset_name\x18\x02 \x01(\tH\x01R\x0b\x64\x61tasetName\x88\x01\x01\x12\x42\n\x0c\x64\x61taset_type\x18\x03 \x01(\x0e\x32\x1a.spark.connect.DatasetTypeH\x02R\x0b\x64\x61tasetType\x88\x01\x01\x12\x1d\n\x07\x63omment\x18\x04 \x01(\tH\x03R\x07\x63omment\x88\x01\x01\x12l\n\x10table_properties\x18\x05 \x03(\x0b\x32\x41.spark.connect.PipelineCommand.DefineDataset.TablePropertiesEntryR\x0ftableProperties\x12%\n\x0epartition_cols\x18\x06 \x03(\tR\rpartitionCols\x12\x34\n\x06schema\x18\x07 \x01(\x0b\x32\x17.spark.connect.DataTypeH\x04R\x06schema\x88\x01\x01\x12\x1b\n\x06\x66ormat\x18\x08 \x01(\tH\x05R\x06\x66ormat\x88\x01\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0f\n\r_dataset_nameB\x0f\n\r_dataset_typeB\n\n\x08_commentB\t\n\x07_schemaB\t\n\x07_format\x1a\xc8\x03\n\nDefineFlow\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12 \n\tflow_name\x18\x02 \x01(\tH\x01R\x08\x66lowName\x88\x01\x01\x12\x33\n\x13target_dataset_name\x18\x03 \x01(\tH\x02R\x11targetDatasetName\x88\x01\x01\x12\x38\n\x08relation\x18\x04 \x01(\x0b\x32\x17.spark.connect.RelationH\x03R\x08relation\x88\x01\x01\x12Q\n\x08sql_conf\x18\x05 \x03(\x0b\x32\x36.spark.connect.PipelineCommand.DefineFlow.SqlConfEntryR\x07sqlConf\x12\x17\n\x04once\x18\x06 \x01(\x08H\x04R\x04once\x88\x01\x01\x1a:\n\x0cSqlConfEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\x14\n\x12_dataflow_graph_idB\x0c\n\n_flow_nameB\x16\n\x14_target_dataset_nameB\x0b\n\t_relationB\x07\n\x05_once\x1ap\n\x08StartRun\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\x15\n\x03\x64ry\x18\x02 \x01(\x08H\x01R\x03\x64ry\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x06\n\x04_dry\x1a\xc7\x01\n\x16\x44\x65\x66ineSqlGraphElements\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x12\'\n\rsql_file_path\x18\x02 \x01(\tH\x01R\x0bsqlFilePath\x88\x01\x01\x12\x1e\n\x08sql_text\x18\x03 \x01(\tH\x02R\x07sqlText\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\x10\n\x0e_sql_file_pathB\x0b\n\t_sql_textB\x0e\n\x0c\x63ommand_type"\x8e\x02\n\x15PipelineCommandResult\x12\x81\x01\n\x1c\x63reate_dataflow_graph_result\x18\x01 \x01(\x0b\x32>.spark.connect.PipelineCommandResult.CreateDataflowGraphResultH\x00R\x19\x63reateDataflowGraphResult\x1a\x62\n\x19\x43reateDataflowGraphResult\x12/\n\x11\x64\x61taflow_graph_id\x18\x01 \x01(\tH\x00R\x0f\x64\x61taflowGraphId\x88\x01\x01\x42\x14\n\x12_dataflow_graph_idB\r\n\x0bresult_type"I\n\x13PipelineEventResult\x12\x32\n\x05\x65vent\x18\x01 \x01(\x0b\x32\x1c.spark.connect.PipelineEventR\x05\x65vent"t\n\rPipelineEvent\x12\x38\n\ttimestamp\x18\x01 \x01(\x0b\x32\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n\x07message\x18\x02 \x01(\tH\x00R\x07message\x88\x01\x01\x42\n\n\x08_message*a\n\x0b\x44\x61tasetType\x12\x1c\n\x18\x44\x41TASET_TYPE_UNSPECIFIED\x10\x00\x12\x15\n\x11MATERIALIZED_VIEW\x10\x01\x12\t\n\x05TABLE\x10\x02\x12\x12\n\x0eTEMPORARY_VIEW\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3'
4444
)
4545

4646
_globals = globals()
@@ -59,10 +59,10 @@
5959
_globals["_PIPELINECOMMAND_DEFINEDATASET_TABLEPROPERTIESENTRY"]._serialized_options = b"8\001"
6060
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._loaded_options = None
6161
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_options = b"8\001"
62-
_globals["_DATASETTYPE"]._serialized_start = 3026
63-
_globals["_DATASETTYPE"]._serialized_end = 3123
62+
_globals["_DATASETTYPE"]._serialized_start = 3057
63+
_globals["_DATASETTYPE"]._serialized_end = 3154
6464
_globals["_PIPELINECOMMAND"]._serialized_start = 140
65-
_globals["_PIPELINECOMMAND"]._serialized_end = 2558
65+
_globals["_PIPELINECOMMAND"]._serialized_end = 2589
6666
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_start = 719
6767
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH"]._serialized_end = 1110
6868
_globals["_PIPELINECOMMAND_CREATEDATAFLOWGRAPH_SQLCONFENTRY"]._serialized_start = 928
@@ -80,15 +80,15 @@
8080
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_start = 928
8181
_globals["_PIPELINECOMMAND_DEFINEFLOW_SQLCONFENTRY"]._serialized_end = 986
8282
_globals["_PIPELINECOMMAND_STARTRUN"]._serialized_start = 2259
83-
_globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2340
84-
_globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2343
85-
_globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2542
86-
_globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2561
87-
_globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2831
88-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2718
89-
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2816
90-
_globals["_PIPELINEEVENTRESULT"]._serialized_start = 2833
91-
_globals["_PIPELINEEVENTRESULT"]._serialized_end = 2906
92-
_globals["_PIPELINEEVENT"]._serialized_start = 2908
93-
_globals["_PIPELINEEVENT"]._serialized_end = 3024
83+
_globals["_PIPELINECOMMAND_STARTRUN"]._serialized_end = 2371
84+
_globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_start = 2374
85+
_globals["_PIPELINECOMMAND_DEFINESQLGRAPHELEMENTS"]._serialized_end = 2573
86+
_globals["_PIPELINECOMMANDRESULT"]._serialized_start = 2592
87+
_globals["_PIPELINECOMMANDRESULT"]._serialized_end = 2862
88+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_start = 2749
89+
_globals["_PIPELINECOMMANDRESULT_CREATEDATAFLOWGRAPHRESULT"]._serialized_end = 2847
90+
_globals["_PIPELINEEVENTRESULT"]._serialized_start = 2864
91+
_globals["_PIPELINEEVENTRESULT"]._serialized_end = 2937
92+
_globals["_PIPELINEEVENT"]._serialized_start = 2939
93+
_globals["_PIPELINEEVENT"]._serialized_end = 3055
9494
# @@protoc_insertion_point(module_scope)

python/pyspark/sql/connect/proto/pipelines_pb2.pyi

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,35 +530,54 @@ class PipelineCommand(google.protobuf.message.Message):
530530
DESCRIPTOR: google.protobuf.descriptor.Descriptor
531531

532532
DATAFLOW_GRAPH_ID_FIELD_NUMBER: builtins.int
533+
DRY_FIELD_NUMBER: builtins.int
533534
dataflow_graph_id: builtins.str
534535
"""The graph to start."""
536+
dry: builtins.bool
537+
"""If true, the run will not actually execute any flows, but will only validate the graph and
538+
check for any errors. This is useful for testing and validation purposes.
539+
"""
535540
def __init__(
536541
self,
537542
*,
538543
dataflow_graph_id: builtins.str | None = ...,
544+
dry: builtins.bool | None = ...,
539545
) -> None: ...
540546
def HasField(
541547
self,
542548
field_name: typing_extensions.Literal[
543549
"_dataflow_graph_id",
544550
b"_dataflow_graph_id",
551+
"_dry",
552+
b"_dry",
545553
"dataflow_graph_id",
546554
b"dataflow_graph_id",
555+
"dry",
556+
b"dry",
547557
],
548558
) -> builtins.bool: ...
549559
def ClearField(
550560
self,
551561
field_name: typing_extensions.Literal[
552562
"_dataflow_graph_id",
553563
b"_dataflow_graph_id",
564+
"_dry",
565+
b"_dry",
554566
"dataflow_graph_id",
555567
b"dataflow_graph_id",
568+
"dry",
569+
b"dry",
556570
],
557571
) -> None: ...
572+
@typing.overload
558573
def WhichOneof(
559574
self,
560575
oneof_group: typing_extensions.Literal["_dataflow_graph_id", b"_dataflow_graph_id"],
561576
) -> typing_extensions.Literal["dataflow_graph_id"] | None: ...
577+
@typing.overload
578+
def WhichOneof(
579+
self, oneof_group: typing_extensions.Literal["_dry", b"_dry"]
580+
) -> typing_extensions.Literal["dry"] | None: ...
562581

563582
class DefineSqlGraphElements(google.protobuf.message.Message):
564583
"""Parses the SQL file and registers all datasets and flows."""

sql/connect/common/src/main/protobuf/spark/connect/pipelines.proto

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ message PipelineCommand {
116116
message StartRun {
117117
// The graph to start.
118118
optional string dataflow_graph_id = 1;
119+
120+
// If true, the run will not actually execute any flows, but will only validate the graph and
121+
// check for any errors. This is useful for testing and validation purposes.
122+
optional bool dry = 2;
119123
}
120124

121125
// Parses the SQL file and registers all datasets and flows.

0 commit comments

Comments
 (0)