@@ -2034,7 +2034,7 @@ def bin_pack_arrow_table(tbl: pa.Table, target_file_size: int) -> Iterator[List[
2034
2034
2035
2035
def _check_schema_compatible (table_schema : Schema , other_schema : pa .Schema , downcast_ns_timestamp_to_us : bool = False ) -> None :
2036
2036
"""
2037
- Check if the `table_schema` is compatible with `other_schema`.
2037
+ Check if the `table_schema` is compatible with `other_schema` in terms of the Iceberg Schema representation .
2038
2038
2039
2039
The schemas are compatible if:
2040
2040
- All fields in `other_schema` are present in `table_schema`. (other_schema <= table_schema)
@@ -2043,22 +2043,22 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down
2043
2043
Raises:
2044
2044
ValueError: If the schemas are not compatible.
2045
2045
"""
2046
- from pyiceberg .io .pyarrow import _pyarrow_to_schema_without_ids , pyarrow_to_schema
2047
-
2048
2046
name_mapping = table_schema .name_mapping
2049
2047
try :
2050
- other_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
2048
+ other_schema = pyarrow_to_schema (
2049
+ other_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2050
+ )
2051
2051
except ValueError as e :
2052
- other_schema = _pyarrow_to_schema_without_ids (other_schema )
2052
+ other_schema = _pyarrow_to_schema_without_ids (other_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2053
2053
additional_names = set (other_schema .column_names ) - set (table_schema .column_names )
2054
2054
raise ValueError (
2055
2055
f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
2056
2056
) from e
2057
2057
2058
- missing_table_schema_fields = {field for field in other_schema .fields if field not in table_schema .fields }
2059
- required_table_schema_fields = {field for field in table_schema .fields if field .required }
2060
- missing_required_fields = {field for field in required_table_schema_fields if field not in other_schema .fields }
2061
- if missing_table_schema_fields or missing_required_fields :
2058
+ fields_missing_from_table = {field for field in other_schema .fields if field not in table_schema .fields }
2059
+ required_fields_in_table = {field for field in table_schema .fields if field .required }
2060
+ missing_required_fields_in_other = {field for field in required_fields_in_table if field not in other_schema .fields }
2061
+ if fields_missing_from_table or missing_required_fields_in_other :
2062
2062
from rich .console import Console
2063
2063
from rich .table import Table as RichTable
2064
2064
@@ -2182,17 +2182,20 @@ def _dataframe_to_data_files(
2182
2182
default = TableProperties .WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ,
2183
2183
)
2184
2184
2185
+ # projects schema to match the pyarrow table
2186
+ write_schema = pyarrow_to_schema (df .schema , name_mapping = table_metadata .schema ().name_mapping )
2187
+
2185
2188
if table_metadata .spec ().is_unpartitioned ():
2186
2189
yield from write_file (
2187
2190
io = io ,
2188
2191
table_metadata = table_metadata ,
2189
2192
tasks = iter ([
2190
- WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = table_metadata . schema () )
2193
+ WriteTask (write_uuid = write_uuid , task_id = next (counter ), record_batches = batches , schema = write_schema )
2191
2194
for batches in bin_pack_arrow_table (df , target_file_size )
2192
2195
]),
2193
2196
)
2194
2197
else :
2195
- partitions = _determine_partitions (spec = table_metadata .spec (), schema = table_metadata . schema () , arrow_table = df )
2198
+ partitions = _determine_partitions (spec = table_metadata .spec (), schema = write_schema , arrow_table = df )
2196
2199
yield from write_file (
2197
2200
io = io ,
2198
2201
table_metadata = table_metadata ,
0 commit comments