Skip to content

Commit d26d1e4

Browse files
authored
Avoid reusing shared metrics evaluator (#1664)
Hello! I have noticed the same issue as Issue #1506 where the number of results retrieved is inconsistent across reads and traced the issue to the reuse of the same metrics evaluator across threads when reading manifests. Because the metrics evaluator is stateful, this will result in the wrong results being retrieved nondeterministically, depending on the execution order of the threads. This PR addresses the issue by creating a single metrics evaluator per thread, which I have tested locally. Please let me know if there are any tests I can add, and I am happy to receive feedback. Thank you! Closes #1506
1 parent c4715c0 commit d26d1e4

File tree

1 file changed

+15
-8
lines changed

1 file changed

+15
-8
lines changed

pyiceberg/table/__init__.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1608,6 +1608,20 @@ def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]
16081608
# shared instance across multiple threads.
16091609
return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition)
16101610

1611+
def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
1612+
schema = self.table_metadata.schema()
1613+
include_empty_files = strtobool(self.options.get("include_empty_files", "false"))
1614+
1615+
# The lambda created here is run in multiple threads.
1616+
# So we avoid creating _InclusiveMetricsEvaluator methods bound to a single
1617+
# shared instance across multiple threads.
1618+
return lambda data_file: _InclusiveMetricsEvaluator(
1619+
schema,
1620+
self.row_filter,
1621+
self.case_sensitive,
1622+
include_empty_files,
1623+
).eval(data_file)
1624+
16111625
def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]:
16121626
spec = self.table_metadata.specs()[spec_id]
16131627

@@ -1671,13 +1685,6 @@ def plan_files(self) -> Iterable[FileScanTask]:
16711685

16721686
partition_evaluators: Dict[int, Callable[[DataFile], bool]] = KeyDefaultDict(self._build_partition_evaluator)
16731687

1674-
metrics_evaluator = _InclusiveMetricsEvaluator(
1675-
self.table_metadata.schema(),
1676-
self.row_filter,
1677-
self.case_sensitive,
1678-
strtobool(self.options.get("include_empty_files", "false")),
1679-
).eval
1680-
16811688
min_sequence_number = _min_sequence_number(manifests)
16821689

16831690
data_entries: List[ManifestEntry] = []
@@ -1693,7 +1700,7 @@ def plan_files(self) -> Iterable[FileScanTask]:
16931700
manifest,
16941701
partition_evaluators[manifest.partition_spec_id],
16951702
residual_evaluators[manifest.partition_spec_id],
1696-
metrics_evaluator,
1703+
self._build_metrics_evaluator(),
16971704
)
16981705
for manifest in manifests
16991706
if self._check_sequence_number(min_sequence_number, manifest)

0 commit comments

Comments
 (0)