Skip to content

Commit

Permalink
chore: make streamed_exec default
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Feb 14, 2025
1 parent fd3b6a3 commit a4a6c23
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
5 changes: 3 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,7 @@ def merge(
error_on_type_mismatch: bool = True,
writer_properties: Optional[WriterProperties] = None,
large_dtypes: Optional[bool] = None,
streamed_exec: bool = False,
streamed_exec: bool = True,
custom_metadata: Optional[Dict[str, str]] = None,
post_commithook_properties: Optional[PostCommitHookProperties] = None,
commit_properties: Optional[CommitProperties] = None,
Expand All @@ -994,7 +994,8 @@ def merge(
error_on_type_mismatch: specify if merge will return error if data types are mismatching :default = True
writer_properties: Pass writer properties to the Rust parquet writer
large_dtypes: Deprecated, will be removed in 1.0
streamed_exec: Will execute MERGE using a LazyMemoryExec plan
streamed_exec: Will execute MERGE using a LazyMemoryExec plan, this improves memory pressure for large source tables. Enabling streamed_exec
implicitly disables source table stats to derive an early_pruning_predicate
arrow_schema_conversion_mode: Large converts all types of data schema into Large Arrow types, passthrough keeps string/binary/list types untouched
custom_metadata: Deprecated and will be removed in future versions. Use commit_properties instead.
post_commithook_properties: properties for the post commit hook. If None, default values are used.
Expand Down
4 changes: 3 additions & 1 deletion python/tests/test_generated_columns.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,9 @@ def test_merge_with_g_during_schema_evolution(
expected_data = pa.Table.from_pydict(
{"id": [1, 2], "gc": [5, 5]}, schema=pa.schema([id_col, gc])
)
assert table_with_gc.to_pyarrow_table() == expected_data
assert (
table_with_gc.to_pyarrow_table().sort_by([("id", "ascending")]) == expected_data
)


def test_raise_when_gc_passed_merge_statement_during_schema_evolution(
Expand Down
3 changes: 3 additions & 0 deletions python/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -1116,6 +1116,7 @@ def test_merge_timestamps_partitioned_2344(tmp_path: pathlib.Path, timezone, pre
predicate="s.datetime = t.datetime",
source_alias="s",
target_alias="t",
streamed_exec=False, # only with streamed execution off can we use stats to create a pruning predicate
).when_matched_update_all().when_not_matched_insert_all().execute()

result = dt.to_pyarrow_table()
Expand Down Expand Up @@ -1441,13 +1442,15 @@ def test_merge_on_decimal_3033(tmp_path):
predicate="target.timestamp = source.timestamp",
source_alias="source",
target_alias="target",
streamed_exec=False,
).when_matched_update_all().when_not_matched_insert_all().execute()

dt.merge(
source=table,
predicate="target.timestamp = source.timestamp AND target.altitude = source.altitude",
source_alias="source",
target_alias="target",
streamed_exec=False, # only with streamed execution off can we use stats to create a pruning predicate
).when_matched_update_all().when_not_matched_insert_all().execute()

string_predicate = dt.history(1)[0]["operationParameters"]["predicate"]
Expand Down

0 comments on commit a4a6c23

Please sign in to comment.