Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions paimon-python/pypaimon/read/reader/filter_record_batch_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

import logging
from typing import List, Optional

import pyarrow as pa
import pyarrow.dataset as ds

from pypaimon.common.predicate import Predicate
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField

logger = logging.getLogger(__name__)


class FilterRecordBatchReader(RecordBatchReader):
"""
Wraps a RecordBatchReader and filters each batch by predicate.
Used for data evolution read where predicate cannot be pushed down to
individual file readers. Only used when predicate columns are in read schema.
"""

def __init__(
self,
reader: RecordBatchReader,
predicate: Predicate,
field_names: Optional[List[str]] = None,
schema_fields: Optional[List[DataField]] = None,
):
self.reader = reader
self.predicate = predicate
self.field_names = field_names
self.schema_fields = schema_fields

def read_arrow_batch(self) -> Optional[pa.RecordBatch]:
while True:
batch = self.reader.read_arrow_batch()
if batch is None:
return None
if batch.num_rows == 0:
return batch
filtered = self._filter_batch(batch)
if filtered is not None and filtered.num_rows > 0:
return filtered
continue

def _filter_batch(self, batch: pa.RecordBatch) -> Optional[pa.RecordBatch]:
expr = self.predicate.to_arrow()
result = ds.InMemoryDataset(pa.Table.from_batches([batch])).scanner(
filter=expr
).to_table()
if result.num_rows == 0:
return None
batches = result.to_batches()
if not batches:
return None
if len(batches) == 1:
return batches[0]
concat_batches = getattr(pa, "concat_batches", None)
if concat_batches is not None:
return concat_batches(batches)
return pa.RecordBatch.from_arrays(
[result.column(i) for i in range(result.num_columns)],
schema=result.schema,
)

def return_batch_pos(self) -> int:
pos = getattr(self.reader, 'return_batch_pos', lambda: 0)()
return pos if pos is not None else 0

def close(self) -> None:
self.reader.close()
7 changes: 5 additions & 2 deletions paimon-python/pypaimon/read/scanner/file_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,13 +418,13 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
return False
if self.partition_key_predicate and not self.partition_key_predicate.test(entry.partition):
return False
if self.deletion_vectors_enabled and entry.file.level == 0: # do not read level 0 file
return False
# Get SimpleStatsEvolution for this schema
evolution = self.simple_stats_evolutions.get_or_create(entry.file.schema_id)

# Apply evolution to stats
if self.table.is_primary_key_table:
if self.deletion_vectors_enabled and entry.file.level == 0: # do not read level 0 file
return False
if not self.primary_key_predicate:
return True
return self.primary_key_predicate.test_by_simple_stats(
Expand All @@ -436,6 +436,9 @@ def _filter_manifest_entry(self, entry: ManifestEntry) -> bool:
return True
if self.predicate_for_stats is None:
return True
# Data evolution: file stats may be from another schema, skip stats filter and filter in reader.
if self.data_evolution:
return True
if entry.file.value_stats_cols is None and entry.file.write_cols is not None:
stats_fields = entry.file.write_cols
else:
Expand Down
32 changes: 27 additions & 5 deletions paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from pypaimon.read.reader.field_bunch import BlobBunch, DataBunch, FieldBunch
from pypaimon.read.reader.filter_record_reader import FilterRecordReader
from pypaimon.read.reader.format_avro_reader import FormatAvroReader
from pypaimon.read.reader.filter_record_batch_reader import FilterRecordBatchReader
from pypaimon.read.reader.row_range_filter_record_reader import RowIdFilterRecordBatchReader
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
Expand All @@ -52,6 +53,7 @@
from pypaimon.read.reader.key_value_wrap_reader import KeyValueWrapReader
from pypaimon.read.reader.shard_batch_reader import ShardBatchReader
from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
from pypaimon.read.push_down_utils import _get_all_fields
from pypaimon.read.split import Split
from pypaimon.read.sliced_split import SlicedSplit
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
Expand Down Expand Up @@ -88,6 +90,13 @@ def __init__(
self.read_fields = self._create_key_value_fields(read_type)
self.schema_id_2_fields = {}
self.deletion_file_readers = {}
# Only apply filter when all predicate columns are in read schema.
read_names = {f.name for f in self.read_fields}
self.predicate_for_reader = (
self.predicate
if self.predicate is not None and _get_all_fields(self.predicate).issubset(read_names)
else None
)

def _push_down_predicate(self) -> Optional[Predicate]:
if self.predicate is None:
Expand Down Expand Up @@ -382,8 +391,8 @@ def create_reader(self) -> RecordReader:

concat_reader = ConcatBatchReader(data_readers)
# if the table is appendonly table, we don't need extra filter, all predicates has pushed down
if self.table.is_primary_key_table and self.predicate:
return FilterRecordReader(concat_reader, self.predicate)
if self.table.is_primary_key_table and self.predicate_for_reader:
return FilterRecordReader(concat_reader, self.predicate_for_reader)
else:
return concat_reader

Expand Down Expand Up @@ -424,8 +433,8 @@ def create_reader(self) -> RecordReader:
section_readers.append(supplier)
concat_reader = ConcatRecordReader(section_readers)
kv_unwrap_reader = KeyValueUnwrapRecordReader(DropDeleteRecordReader(concat_reader))
if self.predicate:
return FilterRecordReader(kv_unwrap_reader, self.predicate)
if self.predicate_for_reader:
return FilterRecordReader(kv_unwrap_reader, self.predicate_for_reader)
else:
return kv_unwrap_reader

Expand All @@ -449,6 +458,11 @@ def __init__(
actual_split = split.data_split()
super().__init__(table, predicate, read_type, actual_split, row_tracking_enabled)

def _push_down_predicate(self) -> Optional[Predicate]:
# Data evolution: files may have different schemas, so we don't push predicate
# to file readers; filtering is done in FilterRecordBatchReader after merge.
return None

def create_reader(self) -> RecordReader:
files = self.split.files
suppliers = []
Expand All @@ -467,7 +481,15 @@ def create_reader(self) -> RecordReader:
lambda files=need_merge_files: self._create_union_reader(files)
)

return ConcatBatchReader(suppliers)
merge_reader = ConcatBatchReader(suppliers)
if self.predicate_for_reader is not None:
return FilterRecordBatchReader(
merge_reader,
self.predicate_for_reader,
field_names=[f.name for f in self.read_fields],
schema_fields=self.read_fields,
)
return merge_reader

def _split_by_row_id(self, files: List[DataFileMeta]) -> List[List[DataFileMeta]]:
"""Split files by firstRowId for data evolution."""
Expand Down
4 changes: 2 additions & 2 deletions paimon-python/pypaimon/read/table_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
num_rows = batch.num_rows

for field in target_schema:
if field.name in batch.column_names:
if field.name in batch.schema.names:
col = batch.column(field.name)
else:
col = pyarrow.nulls(num_rows, type=field.type)
Expand Down Expand Up @@ -198,7 +198,7 @@ def _create_split_read(self, split: Split) -> SplitRead:
elif self.table.options.data_evolution_enabled():
return DataEvolutionSplitRead(
table=self.table,
predicate=None, # Never push predicate to split read
predicate=self.predicate,
read_type=self.read_type,
split=split,
row_tracking_enabled=True
Expand Down
Loading