Skip to content

Commit 87bdda4

Browse files
authored
[core]Python: fix blob write when blob_as_descriptor is true (#6404)
1 parent 032fee4 commit 87bdda4

File tree

10 files changed

+582
-99
lines changed

10 files changed

+582
-99
lines changed

paimon-python/pypaimon/common/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class CatalogOptions:
4747
DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
4848
PREFIX = 'prefix'
4949
HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
50+
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1
5051

5152

5253
class PVFSOptions:

paimon-python/pypaimon/common/core_options.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ def __str__(self):
4343
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
4444
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
4545
FILE_BLOCK_SIZE = "file.block-size"
46+
FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
4647
# Scan options
4748
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
4849
INCREMENTAL_BETWEEN_TIMESTAMP = "incremental-between-timestamp"

paimon-python/pypaimon/common/file_io.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
################################################################################
18-
1918
import logging
2019
import os
2120
import subprocess
@@ -28,8 +27,9 @@
2827
from pyarrow._fs import FileSystem
2928

3029
from pypaimon.common.config import OssOptions, S3Options
30+
from pypaimon.common.uri_reader import UriReaderFactory
3131
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
32-
from pypaimon.table.row.blob import BlobData
32+
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
3333
from pypaimon.table.row.generic_row import GenericRow
3434
from pypaimon.table.row.row_kind import RowKind
3535
from pypaimon.write.blob_format_writer import BlobFormatWriter
@@ -40,6 +40,7 @@ def __init__(self, path: str, catalog_options: dict):
4040
self.properties = catalog_options
4141
self.logger = logging.getLogger(__name__)
4242
scheme, netloc, _ = self.parse_location(path)
43+
self.uri_reader_factory = UriReaderFactory(catalog_options)
4344
if scheme in {"oss"}:
4445
self.filesystem = self._initialize_oss_fs(path)
4546
elif scheme in {"s3", "s3a", "s3n"}:
@@ -370,7 +371,7 @@ def record_generator():
370371
with self.new_output_stream(path) as output_stream:
371372
fastavro.writer(output_stream, avro_schema, records, **kwargs)
372373

373-
def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
374+
def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs):
374375
try:
375376
# Validate input constraints
376377
if data.num_columns != 1:
@@ -399,7 +400,11 @@ def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
399400
col_data = records_dict[field_name][i]
400401
# Convert to appropriate type based on field type
401402
if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB":
402-
if isinstance(col_data, bytes):
403+
if blob_as_descriptor:
404+
blob_descriptor = BlobDescriptor.deserialize(col_data)
405+
uri_reader = self.uri_reader_factory.create(blob_descriptor.uri)
406+
blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
407+
elif isinstance(col_data, bytes):
403408
blob_data = BlobData(col_data)
404409
else:
405410
# Convert to bytes if needed
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import io
20+
from abc import ABC, abstractmethod
21+
from pathlib import Path
22+
from typing import Any, Optional
23+
from urllib.parse import urlparse, ParseResult
24+
25+
import requests
26+
from cachetools import LRUCache
27+
from readerwriterlock import rwlock
28+
29+
from pypaimon.common.config import CatalogOptions
30+
31+
32+
class UriReader(ABC):
33+
@classmethod
34+
def from_http(cls) -> 'HttpUriReader':
35+
return HttpUriReader()
36+
37+
@classmethod
38+
def from_file(cls, file_io: Any) -> 'FileUriReader':
39+
return FileUriReader(file_io)
40+
41+
@classmethod
42+
def get_file_path(cls, uri: str):
43+
parsed_uri = urlparse(uri)
44+
if parsed_uri.scheme == 'file':
45+
path = Path(parsed_uri.path)
46+
elif parsed_uri.scheme and parsed_uri.scheme != '':
47+
path = Path(parsed_uri.netloc + parsed_uri.path)
48+
else:
49+
path = Path(uri)
50+
return path
51+
52+
@abstractmethod
53+
def new_input_stream(self, uri: str):
54+
pass
55+
56+
57+
class FileUriReader(UriReader):
58+
59+
def __init__(self, file_io: Any):
60+
self._file_io = file_io
61+
62+
def new_input_stream(self, uri: str):
63+
try:
64+
path = self.get_file_path(uri)
65+
return self._file_io.new_input_stream(path)
66+
except Exception as e:
67+
raise IOError(f"Failed to read file {uri}: {e}")
68+
69+
70+
class HttpUriReader(UriReader):
71+
72+
def new_input_stream(self, uri: str):
73+
try:
74+
response = requests.get(uri)
75+
if response.status_code != 200:
76+
raise RuntimeError(f"Failed to read HTTP URI {uri} status code {response.status_code}")
77+
return io.BytesIO(response.content)
78+
except Exception as e:
79+
raise RuntimeError(f"Failed to read HTTP URI {uri}: {e}")
80+
81+
82+
class UriKey:
83+
84+
def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None:
85+
self._scheme = scheme
86+
self._authority = authority
87+
self._hash = hash((self._scheme, self._authority))
88+
89+
@property
90+
def scheme(self) -> Optional[str]:
91+
return self._scheme
92+
93+
@property
94+
def authority(self) -> Optional[str]:
95+
return self._authority
96+
97+
def __eq__(self, other: object) -> bool:
98+
if not isinstance(other, UriKey):
99+
return False
100+
101+
return (self._scheme == other._scheme and
102+
self._authority == other._authority)
103+
104+
def __hash__(self) -> int:
105+
return self._hash
106+
107+
def __repr__(self) -> str:
108+
return f"UriKey(scheme='{self._scheme}', authority='{self._authority}')"
109+
110+
111+
class UriReaderFactory:
112+
113+
def __init__(self, catalog_options: dict) -> None:
114+
self.catalog_options = catalog_options
115+
self._readers = LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)
116+
self._readers_lock = rwlock.RWLockFair()
117+
118+
def create(self, input_uri: str) -> UriReader:
119+
try:
120+
parsed_uri = urlparse(input_uri)
121+
except Exception as e:
122+
raise ValueError(f"Invalid URI: {input_uri}") from e
123+
124+
key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None)
125+
rlock = self._readers_lock.gen_rlock()
126+
rlock.acquire()
127+
try:
128+
reader = self._readers.get(key)
129+
if reader is not None:
130+
return reader
131+
finally:
132+
rlock.release()
133+
wlock = self._readers_lock.gen_wlock()
134+
wlock.acquire()
135+
try:
136+
reader = self._readers.get(key)
137+
if reader is not None:
138+
return reader
139+
reader = self._new_reader(key, parsed_uri)
140+
self._readers[key] = reader
141+
return reader
142+
finally:
143+
wlock.release()
144+
145+
def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader:
146+
scheme = key.scheme
147+
if scheme in ('http', 'https'):
148+
return UriReader.from_http()
149+
try:
150+
# Import FileIO here to avoid circular imports
151+
from pypaimon.common.file_io import FileIO
152+
uri_string = parsed_uri.geturl()
153+
file_io = FileIO(uri_string, self.catalog_options)
154+
return UriReader.from_file(file_io)
155+
except Exception as e:
156+
raise RuntimeError(f"Failed to create reader for URI {parsed_uri.geturl()}") from e
157+
158+
def clear_cache(self) -> None:
159+
self._readers.clear()
160+
161+
def get_cache_size(self) -> int:
162+
return len(self._readers)
163+
164+
def __getstate__(self):
165+
state = self.__dict__.copy()
166+
del state['_readers_lock']
167+
return state
168+
169+
def __setstate__(self, state):
170+
self.__dict__.update(state)
171+
self._readers_lock = rwlock.RWLockFair()

paimon-python/pypaimon/read/reader/format_blob_reader.py

Lines changed: 19 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,23 @@
2626
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
2727
from pypaimon.common.file_io import FileIO
2828
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
29-
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
30-
from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
29+
from pypaimon.schema.data_types import DataField, PyarrowFieldParser, AtomicType
30+
from pypaimon.table.row.blob import Blob
3131
from pypaimon.table.row.generic_row import GenericRow
32+
from pypaimon.table.row.row_kind import RowKind
3233

3334

3435
class FormatBlobReader(RecordBatchReader):
3536

3637
def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
37-
full_fields: List[DataField], push_down_predicate: Any):
38+
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool):
3839
self._file_io = file_io
3940
self._file_path = file_path
4041
self._push_down_predicate = push_down_predicate
42+
self._blob_as_descriptor = blob_as_descriptor
4143

4244
# Get file size
43-
self._file_size = file_io.get_file_size(file_path)
45+
self._file_size = file_io.get_file_size(Path(file_path))
4446

4547
# Initialize the low-level blob format reader
4648
self.file_path = file_path
@@ -66,7 +68,10 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
6668
if self.returned:
6769
return None
6870
self.returned = True
69-
batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0])
71+
batch_iterator = BlobRecordIterator(
72+
self._file_io, self.file_path, self.blob_lengths,
73+
self.blob_offsets, self._fields[0]
74+
)
7075
self._blob_iterator = iter(batch_iterator)
7176

7277
# Collect records for this batch
@@ -75,22 +80,16 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
7580

7681
try:
7782
while True:
78-
# Get next blob record
7983
blob_row = next(self._blob_iterator)
80-
# Check if first read returns None, stop immediately
8184
if blob_row is None:
8285
break
83-
84-
# Extract blob data from the row
85-
blob = blob_row.values[0] # Blob files have single blob field
86-
87-
# Convert blob to appropriate format for each requested field
86+
blob = blob_row.values[0]
8887
for field_name in self._fields:
89-
# For blob files, all fields should contain blob data
90-
if isinstance(blob, Blob):
91-
blob_data = blob.to_data()
88+
blob_descriptor = blob.to_descriptor()
89+
if self._blob_as_descriptor:
90+
blob_data = blob_descriptor.serialize()
9291
else:
93-
blob_data = bytes(blob) if blob is not None else None
92+
blob_data = blob.to_data()
9493
pydict_data[field_name].append(blob_data)
9594

9695
records_in_batch += 1
@@ -162,7 +161,9 @@ class BlobRecordIterator:
162161
MAGIC_NUMBER_SIZE = 4
163162
METADATA_OVERHEAD = 16
164163

165-
def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str):
164+
def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int],
165+
blob_offsets: List[int], field_name: str):
166+
self.file_io = file_io
166167
self.file_path = file_path
167168
self.field_name = field_name
168169
self.blob_lengths = blob_lengths
@@ -175,25 +176,14 @@ def __iter__(self) -> Iterator[GenericRow]:
175176
def __next__(self) -> GenericRow:
176177
if self.current_position >= len(self.blob_lengths):
177178
raise StopIteration
178-
179179
# Create blob reference for the current blob
180180
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes
181181
blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number
182182
blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD
183-
184-
# Create BlobDescriptor for this blob
185-
descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
186-
blob = BlobRef(descriptor)
187-
183+
blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length)
188184
self.current_position += 1
189-
190-
# Return as GenericRow with single blob field
191-
from pypaimon.schema.data_types import DataField, AtomicType
192-
from pypaimon.table.row.row_kind import RowKind
193-
194185
fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
195186
return GenericRow([blob], fields, RowKind.INSERT)
196187

197188
def returned_position(self) -> int:
198-
"""Get current position in the iterator."""
199189
return self.current_position

paimon-python/pypaimon/read/split_read.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ def file_reader_supplier(self, file_path: str, for_merge_read: bool):
8282
format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
8383
self.read_fields, self.push_down_predicate)
8484
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
85+
blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
8586
format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
86-
self.read_fields, self.push_down_predicate)
87+
self.read_fields, self.push_down_predicate, blob_as_descriptor)
8788
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
8889
format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path,
8990
self._get_final_read_data_fields(), self.push_down_predicate)

0 commit comments

Comments
 (0)