@@ -2036,24 +2036,29 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down
2036
2036
"""
2037
2037
Check if the `table_schema` is compatible with `other_schema`.
2038
2038
2039
- Two schemas are considered compatible when they are equal in terms of the Iceberg Schema type.
2039
+ The schemas are compatible if:
2040
+ - All fields in `other_schema` are present in `table_schema`. (other_schema <= table_schema)
2041
+ - All required fields in `table_schema` are present in `other_schema`.
2040
2042
2041
2043
Raises:
2042
2044
ValueError: If the schemas are not compatible.
2043
2045
"""
2046
+ from pyiceberg .io .pyarrow import _pyarrow_to_schema_without_ids , pyarrow_to_schema
2047
+
2044
2048
name_mapping = table_schema .name_mapping
2045
2049
try :
2046
- task_schema = pyarrow_to_schema (
2047
- other_schema , name_mapping = name_mapping , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us
2048
- )
2050
+ other_schema = pyarrow_to_schema (other_schema , name_mapping = name_mapping )
2049
2051
except ValueError as e :
2050
- other_schema = _pyarrow_to_schema_without_ids (other_schema , downcast_ns_timestamp_to_us = downcast_ns_timestamp_to_us )
2052
+ other_schema = _pyarrow_to_schema_without_ids (other_schema )
2051
2053
additional_names = set (other_schema .column_names ) - set (table_schema .column_names )
2052
2054
raise ValueError (
2053
2055
f"PyArrow table contains more columns: { ', ' .join (sorted (additional_names ))} . Update the schema first (hint, use union_by_name)."
2054
2056
) from e
2055
2057
2056
- if table_schema .as_struct () != task_schema .as_struct ():
2058
+ missing_table_schema_fields = {field for field in other_schema .fields if field not in table_schema .fields }
2059
+ required_table_schema_fields = {field for field in table_schema .fields if field .required }
2060
+ missing_required_fields = {field for field in required_table_schema_fields if field not in other_schema .fields }
2061
+ if missing_table_schema_fields or missing_required_fields :
2057
2062
from rich .console import Console
2058
2063
from rich .table import Table as RichTable
2059
2064
@@ -2066,7 +2071,7 @@ def _check_schema_compatible(table_schema: Schema, other_schema: pa.Schema, down
2066
2071
2067
2072
for lhs in table_schema .fields :
2068
2073
try :
2069
- rhs = task_schema .find_field (lhs .field_id )
2074
+ rhs = other_schema .find_field (lhs .field_id )
2070
2075
rich_table .add_row ("✅" if lhs == rhs else "❌" , str (lhs ), str (rhs ))
2071
2076
except ValueError :
2072
2077
rich_table .add_row ("❌" , str (lhs ), "Missing" )
0 commit comments