@@ -695,6 +695,122 @@ def delete(
695
695
if not delete_snapshot .files_affected and not delete_snapshot .rewrites_needed :
696
696
warnings .warn ("Delete operation did not match any records" )
697
697
698
+ def upsert (
699
+ self ,
700
+ df : pa .Table ,
701
+ join_cols : Optional [List [str ]] = None ,
702
+ when_matched_update_all : bool = True ,
703
+ when_not_matched_insert_all : bool = True ,
704
+ case_sensitive : bool = True ,
705
+ ) -> UpsertResult :
706
+ """Shorthand API for performing an upsert to an iceberg table.
707
+
708
+ Args:
709
+
710
+ df: The input dataframe to upsert with the table's data.
711
+ join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
712
+ when_matched_update_all: Bool indicating to update rows that are matched but require an update due to a value in a non-key column changing
713
+ when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any existing rows in the table
714
+ case_sensitive: Bool indicating if the match should be case-sensitive
715
+
716
+ To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids
717
+
718
+ Example Use Cases:
719
+ Case 1: Both Parameters = True (Full Upsert)
720
+ Existing row found → Update it
721
+ New row found → Insert it
722
+
723
+ Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
724
+ Existing row found → Do nothing (no updates)
725
+ New row found → Insert it
726
+
727
+ Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
728
+ Existing row found → Update it
729
+ New row found → Do nothing (no inserts)
730
+
731
+ Case 4: Both Parameters = False (No Merge Effect)
732
+ Existing row found → Do nothing
733
+ New row found → Do nothing
734
+ (Function effectively does nothing)
735
+
736
+
737
+ Returns:
738
+ An UpsertResult class (contains details of rows updated and inserted)
739
+ """
740
+ try :
741
+ import pyarrow as pa # noqa: F401
742
+ except ModuleNotFoundError as e :
743
+ raise ModuleNotFoundError ("For writes PyArrow needs to be installed" ) from e
744
+
745
+ from pyiceberg .io .pyarrow import expression_to_pyarrow
746
+ from pyiceberg .table import upsert_util
747
+
748
+ if join_cols is None :
749
+ join_cols = []
750
+ for field_id in self .table_metadata .schema ().identifier_field_ids :
751
+ col = self .table_metadata .schema ().find_column_name (field_id )
752
+ if col is not None :
753
+ join_cols .append (col )
754
+ else :
755
+ raise ValueError (f"Field-ID could not be found: { join_cols } " )
756
+
757
+ if len (join_cols ) == 0 :
758
+ raise ValueError ("Join columns could not be found, please set identifier-field-ids or pass in explicitly." )
759
+
760
+ if not when_matched_update_all and not when_not_matched_insert_all :
761
+ raise ValueError ("no upsert options selected...exiting" )
762
+
763
+ if upsert_util .has_duplicate_rows (df , join_cols ):
764
+ raise ValueError ("Duplicate rows found in source dataset based on the key columns. No upsert executed" )
765
+
766
+ from pyiceberg .io .pyarrow import _check_pyarrow_schema_compatible
767
+
768
+ downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
769
+ _check_pyarrow_schema_compatible (
770
+ self .table_metadata .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
771
+ )
772
+
773
+ # get list of rows that exist so we don't have to load the entire target table
774
+ matched_predicate = upsert_util .create_match_filter (df , join_cols )
775
+
776
+ # We must use Transaction.table_metadata for the scan. This includes all uncommitted - but relevant - changes.
777
+ matched_iceberg_table = DataScan (
778
+ table_metadata = self .table_metadata ,
779
+ io = self ._table .io ,
780
+ row_filter = matched_predicate ,
781
+ case_sensitive = case_sensitive ,
782
+ ).to_arrow ()
783
+
784
+ update_row_cnt = 0
785
+ insert_row_cnt = 0
786
+
787
+ if when_matched_update_all :
788
+ # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
789
+ # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
790
+ # this extra step avoids unnecessary IO and writes
791
+ rows_to_update = upsert_util .get_rows_to_update (df , matched_iceberg_table , join_cols )
792
+
793
+ update_row_cnt = len (rows_to_update )
794
+
795
+ if len (rows_to_update ) > 0 :
796
+ # build the match predicate filter
797
+ overwrite_mask_predicate = upsert_util .create_match_filter (rows_to_update , join_cols )
798
+
799
+ self .overwrite (rows_to_update , overwrite_filter = overwrite_mask_predicate )
800
+
801
+ if when_not_matched_insert_all :
802
+ expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
803
+ expr_match_bound = bind (self .table_metadata .schema (), expr_match , case_sensitive = case_sensitive )
804
+ expr_match_arrow = expression_to_pyarrow (expr_match_bound )
805
+ rows_to_insert = df .filter (~ expr_match_arrow )
806
+
807
+ insert_row_cnt = len (rows_to_insert )
808
+
809
+ if insert_row_cnt > 0 :
810
+ self .append (rows_to_insert )
811
+
812
+ return UpsertResult (rows_updated = update_row_cnt , rows_inserted = insert_row_cnt )
813
+
698
814
def add_files (
699
815
self , file_paths : List [str ], snapshot_properties : Dict [str , str ] = EMPTY_DICT , check_duplicate_files : bool = True
700
816
) -> None :
@@ -1159,73 +1275,14 @@ def upsert(
1159
1275
Returns:
1160
1276
An UpsertResult class (contains details of rows updated and inserted)
1161
1277
"""
1162
- try :
1163
- import pyarrow as pa # noqa: F401
1164
- except ModuleNotFoundError as e :
1165
- raise ModuleNotFoundError ("For writes PyArrow needs to be installed" ) from e
1166
-
1167
- from pyiceberg .io .pyarrow import expression_to_pyarrow
1168
- from pyiceberg .table import upsert_util
1169
-
1170
- if join_cols is None :
1171
- join_cols = []
1172
- for field_id in self .schema ().identifier_field_ids :
1173
- col = self .schema ().find_column_name (field_id )
1174
- if col is not None :
1175
- join_cols .append (col )
1176
- else :
1177
- raise ValueError (f"Field-ID could not be found: { join_cols } " )
1178
-
1179
- if len (join_cols ) == 0 :
1180
- raise ValueError ("Join columns could not be found, please set identifier-field-ids or pass in explicitly." )
1181
-
1182
- if not when_matched_update_all and not when_not_matched_insert_all :
1183
- raise ValueError ("no upsert options selected...exiting" )
1184
-
1185
- if upsert_util .has_duplicate_rows (df , join_cols ):
1186
- raise ValueError ("Duplicate rows found in source dataset based on the key columns. No upsert executed" )
1187
-
1188
- from pyiceberg .io .pyarrow import _check_pyarrow_schema_compatible
1189
-
1190
- downcast_ns_timestamp_to_us = Config ().get_bool (DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE ) or False
1191
- _check_pyarrow_schema_compatible (
1192
- self .schema (), provided_schema = df .schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
1193
- )
1194
-
1195
- # get list of rows that exist so we don't have to load the entire target table
1196
- matched_predicate = upsert_util .create_match_filter (df , join_cols )
1197
- matched_iceberg_table = self .scan (row_filter = matched_predicate , case_sensitive = case_sensitive ).to_arrow ()
1198
-
1199
- update_row_cnt = 0
1200
- insert_row_cnt = 0
1201
-
1202
1278
with self .transaction () as tx :
1203
- if when_matched_update_all :
1204
- # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed
1205
- # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed
1206
- # this extra step avoids unnecessary IO and writes
1207
- rows_to_update = upsert_util .get_rows_to_update (df , matched_iceberg_table , join_cols )
1208
-
1209
- update_row_cnt = len (rows_to_update )
1210
-
1211
- if len (rows_to_update ) > 0 :
1212
- # build the match predicate filter
1213
- overwrite_mask_predicate = upsert_util .create_match_filter (rows_to_update , join_cols )
1214
-
1215
- tx .overwrite (rows_to_update , overwrite_filter = overwrite_mask_predicate )
1216
-
1217
- if when_not_matched_insert_all :
1218
- expr_match = upsert_util .create_match_filter (matched_iceberg_table , join_cols )
1219
- expr_match_bound = bind (self .schema (), expr_match , case_sensitive = case_sensitive )
1220
- expr_match_arrow = expression_to_pyarrow (expr_match_bound )
1221
- rows_to_insert = df .filter (~ expr_match_arrow )
1222
-
1223
- insert_row_cnt = len (rows_to_insert )
1224
-
1225
- if insert_row_cnt > 0 :
1226
- tx .append (rows_to_insert )
1227
-
1228
- return UpsertResult (rows_updated = update_row_cnt , rows_inserted = insert_row_cnt )
1279
+ return tx .upsert (
1280
+ df = df ,
1281
+ join_cols = join_cols ,
1282
+ when_matched_update_all = when_matched_update_all ,
1283
+ when_not_matched_insert_all = when_not_matched_insert_all ,
1284
+ case_sensitive = case_sensitive ,
1285
+ )
1229
1286
1230
1287
def append (self , df : pa .Table , snapshot_properties : Dict [str , str ] = EMPTY_DICT ) -> None :
1231
1288
"""
0 commit comments