Skip to content

Move manifest-based planning logic into separate class #2195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 160 additions & 132 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,9 @@ class TableScan(ABC):
options: Properties
limit: Optional[int]

# TODO: This is likely problematic with https://github.com/apache/iceberg-python/issues/2179
_manifest_planner: ManifestGroupPlanner

def __init__(
self,
table_metadata: TableMetadata,
Expand All @@ -1651,6 +1654,9 @@ def __init__(
self.snapshot_id = snapshot_id
self.options = options
self.limit = limit
self._manifest_planner = ManifestGroupPlanner(
self.io, self.table_metadata, self.row_filter, self.case_sensitive, self.options
)

def snapshot(self) -> Optional[Snapshot]:
if self.snapshot_id:
Expand Down Expand Up @@ -1801,77 +1807,9 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent


class DataScan(TableScan):
def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

@cached_property
@property
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's now memoized on the ManifestGroupPlanner

def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
# return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
from pyiceberg.expressions.visitors import residual_evaluator_of

# assert self.row_filter == False
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
)

def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)
return self._manifest_planner.partition_filters

def plan_files(self) -> Iterable[FileScanTask]:
"""Plans the relevant files by filtering on the PartitionSpecs.
Expand All @@ -1883,68 +1821,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
if not snapshot:
return iter([])

# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

manifests = [
manifest_file
for manifest_file in snapshot.manifests(self.io)
if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
self._build_metrics_evaluator(),
)
for manifest in manifests
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]
return self._manifest_planner.plan_files(manifests=snapshot.manifests(self.io))

def to_arrow(self) -> pa.Table:
"""Read an Arrow table eagerly from this DataScan.
Expand Down Expand Up @@ -2058,6 +1935,157 @@ def count(self) -> int:
return res


class ManifestGroupPlanner:
io: FileIO
table_metadata: TableMetadata
row_filter: BooleanExpression
case_sensitive: bool
options: Properties

def __init__(
self, io: FileIO, table_metadata: TableMetadata, row_filter: BooleanExpression, case_sensitive: bool, options: Properties
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah actually I'm seeing https://github.com/apache/iceberg-python/pull/1388/files#r1913363904 so maybe we can't just do this

):
self.io = io
self.table_metadata = table_metadata
self.row_filter = row_filter
self.case_sensitive = case_sensitive
self.options = options

def plan_files(self, manifests: List[ManifestFile]) -> Iterable[FileScanTask]:
# step 1: filter manifests using partition summaries
# the filter depends on the partition spec used to write the manifest file, so create a cache of filters for each spec id

manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator)
manifests = [
manifest_file for manifest_file in manifests if manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
]

residual_evaluators: Dict[int, Callable[[DataFile], ResidualEvaluator]] = KeyDefaultDict(self._build_residual_evaluator)

# step 2: filter the data files in each manifest
# this filter depends on the partition spec used to write the manifest file

partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)

min_sequence_number = _min_sequence_number(manifests)

data_entries: List[ManifestEntry] = []
positional_delete_entries = SortedList(key=lambda entry: entry.sequence_number or INITIAL_SEQUENCE_NUMBER)

executor = ExecutorFactory.get_or_create()
for manifest_entry in chain(
*executor.map(
lambda args: _open_manifest(*args),
[
(
self.io,
manifest,
partition_evaluators[manifest.partition_spec_id],
self._build_metrics_evaluator(),
)
for manifest in manifests
if self._check_sequence_number(min_sequence_number, manifest)
],
)
):
data_file = manifest_entry.data_file
if data_file.content == DataFileContent.DATA:
data_entries.append(manifest_entry)
elif data_file.content == DataFileContent.POSITION_DELETES:
positional_delete_entries.add(manifest_entry)
elif data_file.content == DataFileContent.EQUALITY_DELETES:
raise ValueError("PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568")
else:
raise ValueError(f"Unknown DataFileContent ({data_file.content}): {manifest_entry}")

return [
FileScanTask(
data_entry.data_file,
delete_files=_match_deletes_to_data_file(
data_entry,
positional_delete_entries,
),
residual=residual_evaluators[data_entry.data_file.spec_id](data_entry.data_file).residual_for(
data_entry.data_file.partition
),
)
for data_entry in data_entries
]

@cached_property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
return KeyDefaultDict(self._build_partition_projection)

def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive)
return project(self.row_filter)

def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]:
spec = self.table_metadata.specs()[spec_id]
return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive)

def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]:
spec = self.table_metadata.specs()[spec_id]
partition_type = spec.partition_type(self.table_metadata.schema())
partition_schema = Schema(*partition_type.fields)
partition_expr = self.partition_filters[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)

def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
schema = self.table_metadata.schema()
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))

# The lambda created here is run in multiple threads.
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
# shared instance across multiple threads.
return lambda data_file: _InclusiveMetricsEvaluator(
schema,
self.row_filter,
self.case_sensitive,
include_empty_files,
).eval(data_file)

def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
spec = self.table_metadata.specs()[spec_id]

# The lambda created here is run in multiple threads.
# So we avoid creating _EvaluatorExpression methods bound to a single
# shared instance across multiple threads.
# return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
from pyiceberg.expressions.visitors import residual_evaluator_of

# assert self.row_filter == False
return lambda datafile: (
residual_evaluator_of(
spec=spec,
expr=self.row_filter,
case_sensitive=self.case_sensitive,
schema=self.table_metadata.schema(),
)
)

@staticmethod
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only change in the move - made it static

def _check_sequence_number(min_sequence_number: int, manifest: ManifestFile) -> bool:
"""Ensure that no manifests are loaded that contain deletes that are older than the data.

Args:
min_sequence_number (int): The minimal sequence number.
manifest (ManifestFile): A ManifestFile that can be either data or deletes.

Returns:
Boolean indicating if it is either a data file, or a relevant delete file.
"""
return manifest.content == ManifestContent.DATA or (
# Not interested in deletes that are older than the data
manifest.content == ManifestContent.DELETES
and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number
)


@dataclass(frozen=True)
class WriteTask:
"""Task with the parameters for writing a DataFile."""
Expand Down