Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions paimon-python/pypaimon/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CatalogOptions:
DLF_TOKEN_ECS_METADATA_URL = "dlf.token-ecs-metadata-url"
PREFIX = 'prefix'
HTTP_USER_AGENT_HEADER = 'header.HTTP_USER_AGENT'
BLOB_FILE_IO_DEFAULT_CACHE_SIZE = 2**31 - 1


class PVFSOptions:
Expand Down
1 change: 1 addition & 0 deletions paimon-python/pypaimon/common/core_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __str__(self):
FILE_COMPRESSION_PER_LEVEL = "file.compression.per.level"
FILE_FORMAT_PER_LEVEL = "file.format.per.level"
FILE_BLOCK_SIZE = "file.block-size"
FILE_BLOB_AS_DESCRIPTOR = "blob-as-descriptor"
# Scan options
SCAN_FALLBACK_BRANCH = "scan.fallback-branch"
# Commit options
Expand Down
13 changes: 9 additions & 4 deletions paimon-python/pypaimon/common/file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import logging
import os
import subprocess
Expand All @@ -28,8 +27,9 @@
from pyarrow._fs import FileSystem

from pypaimon.common.config import OssOptions, S3Options
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser
from pypaimon.table.row.blob import BlobData
from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind
from pypaimon.write.blob_format_writer import BlobFormatWriter
Expand All @@ -40,6 +40,7 @@ def __init__(self, path: str, catalog_options: dict):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
scheme, netloc, _ = self.parse_location(path)
self.uri_reader_factory = UriReaderFactory(catalog_options)
if scheme in {"oss"}:
self.filesystem = self._initialize_oss_fs(path)
elif scheme in {"s3", "s3a", "s3n"}:
Expand Down Expand Up @@ -370,7 +371,7 @@ def record_generator():
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)

def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
def write_blob(self, path: Path, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs):
try:
# Validate input constraints
if data.num_columns != 1:
Expand Down Expand Up @@ -399,7 +400,11 @@ def write_blob(self, path: Path, data: pyarrow.Table, **kwargs):
col_data = records_dict[field_name][i]
# Convert to appropriate type based on field type
if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB":
if isinstance(col_data, bytes):
if blob_as_descriptor:
blob_descriptor = BlobDescriptor.deserialize(col_data)
uri_reader = self.uri_reader_factory.create(blob_descriptor.uri)
blob_data = Blob.from_descriptor(uri_reader, blob_descriptor)
elif isinstance(col_data, bytes):
blob_data = BlobData(col_data)
else:
# Convert to bytes if needed
Expand Down
171 changes: 171 additions & 0 deletions paimon-python/pypaimon/common/uri_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import io
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse, ParseResult

import requests
from cachetools import LRUCache
from readerwriterlock import rwlock

from pypaimon.common.config import CatalogOptions


class UriReader(ABC):
@classmethod
def from_http(cls) -> 'HttpUriReader':
return HttpUriReader()

@classmethod
def from_file(cls, file_io: Any) -> 'FileUriReader':
return FileUriReader(file_io)

@classmethod
def get_file_path(cls, uri: str):
parsed_uri = urlparse(uri)
if parsed_uri.scheme == 'file':
path = Path(parsed_uri.path)
elif parsed_uri.scheme and parsed_uri.scheme != '':
path = Path(parsed_uri.netloc + parsed_uri.path)
else:
path = Path(uri)
return path

@abstractmethod
def new_input_stream(self, uri: str):
pass


class FileUriReader(UriReader):

def __init__(self, file_io: Any):
self._file_io = file_io

def new_input_stream(self, uri: str):
try:
path = self.get_file_path(uri)
return self._file_io.new_input_stream(path)
except Exception as e:
raise IOError(f"Failed to read file {uri}: {e}")


class HttpUriReader(UriReader):

def new_input_stream(self, uri: str):
try:
response = requests.get(uri)
if response.status_code != 200:
raise RuntimeError(f"Failed to read HTTP URI {uri} status code {response.status_code}")
return io.BytesIO(response.content)
except Exception as e:
raise RuntimeError(f"Failed to read HTTP URI {uri}: {e}")


class UriKey:

def __init__(self, scheme: Optional[str], authority: Optional[str]) -> None:
self._scheme = scheme
self._authority = authority
self._hash = hash((self._scheme, self._authority))

@property
def scheme(self) -> Optional[str]:
return self._scheme

@property
def authority(self) -> Optional[str]:
return self._authority

def __eq__(self, other: object) -> bool:
if not isinstance(other, UriKey):
return False

return (self._scheme == other._scheme and
self._authority == other._authority)

def __hash__(self) -> int:
return self._hash

def __repr__(self) -> str:
return f"UriKey(scheme='{self._scheme}', authority='{self._authority}')"


class UriReaderFactory:

def __init__(self, catalog_options: dict) -> None:
self.catalog_options = catalog_options
self._readers = LRUCache(CatalogOptions.BLOB_FILE_IO_DEFAULT_CACHE_SIZE)
self._readers_lock = rwlock.RWLockFair()

def create(self, input_uri: str) -> UriReader:
try:
parsed_uri = urlparse(input_uri)
except Exception as e:
raise ValueError(f"Invalid URI: {input_uri}") from e

key = UriKey(parsed_uri.scheme, parsed_uri.netloc or None)
rlock = self._readers_lock.gen_rlock()
rlock.acquire()
try:
reader = self._readers.get(key)
if reader is not None:
return reader
finally:
rlock.release()
wlock = self._readers_lock.gen_wlock()
wlock.acquire()
try:
reader = self._readers.get(key)
if reader is not None:
return reader
reader = self._new_reader(key, parsed_uri)
self._readers[key] = reader
return reader
finally:
wlock.release()

def _new_reader(self, key: UriKey, parsed_uri: ParseResult) -> UriReader:
scheme = key.scheme
if scheme in ('http', 'https'):
return UriReader.from_http()
try:
# Import FileIO here to avoid circular imports
from pypaimon.common.file_io import FileIO
uri_string = parsed_uri.geturl()
file_io = FileIO(uri_string, self.catalog_options)
return UriReader.from_file(file_io)
except Exception as e:
raise RuntimeError(f"Failed to create reader for URI {parsed_uri.geturl()}") from e

def clear_cache(self) -> None:
self._readers.clear()

def get_cache_size(self) -> int:
return len(self._readers)

def __getstate__(self):
state = self.__dict__.copy()
del state['_readers_lock']
return state

def __setstate__(self, state):
self.__dict__.update(state)
self._readers_lock = rwlock.RWLockFair()
48 changes: 19 additions & 29 deletions paimon-python/pypaimon/read/reader/format_blob_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,23 @@
from pypaimon.common.delta_varint_compressor import DeltaVarintCompressor
from pypaimon.common.file_io import FileIO
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.blob import Blob, BlobDescriptor, BlobRef
from pypaimon.schema.data_types import DataField, PyarrowFieldParser, AtomicType
from pypaimon.table.row.blob import Blob
from pypaimon.table.row.generic_row import GenericRow
from pypaimon.table.row.row_kind import RowKind


class FormatBlobReader(RecordBatchReader):

def __init__(self, file_io: FileIO, file_path: str, read_fields: List[str],
full_fields: List[DataField], push_down_predicate: Any):
full_fields: List[DataField], push_down_predicate: Any, blob_as_descriptor: bool):
self._file_io = file_io
self._file_path = file_path
self._push_down_predicate = push_down_predicate
self._blob_as_descriptor = blob_as_descriptor

# Get file size
self._file_size = file_io.get_file_size(file_path)
self._file_size = file_io.get_file_size(Path(file_path))

# Initialize the low-level blob format reader
self.file_path = file_path
Expand All @@ -66,7 +68,10 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
if self.returned:
return None
self.returned = True
batch_iterator = BlobRecordIterator(self.file_path, self.blob_lengths, self.blob_offsets, self._fields[0])
batch_iterator = BlobRecordIterator(
self._file_io, self.file_path, self.blob_lengths,
self.blob_offsets, self._fields[0]
)
self._blob_iterator = iter(batch_iterator)

# Collect records for this batch
Expand All @@ -75,22 +80,16 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:

try:
while True:
# Get next blob record
blob_row = next(self._blob_iterator)
# Check if first read returns None, stop immediately
if blob_row is None:
break

# Extract blob data from the row
blob = blob_row.values[0] # Blob files have single blob field

# Convert blob to appropriate format for each requested field
blob = blob_row.values[0]
for field_name in self._fields:
# For blob files, all fields should contain blob data
if isinstance(blob, Blob):
blob_data = blob.to_data()
blob_descriptor = blob.to_descriptor()
if self._blob_as_descriptor:
blob_data = blob_descriptor.serialize()
else:
blob_data = bytes(blob) if blob is not None else None
blob_data = blob.to_data()
pydict_data[field_name].append(blob_data)

records_in_batch += 1
Expand Down Expand Up @@ -162,7 +161,9 @@ class BlobRecordIterator:
MAGIC_NUMBER_SIZE = 4
METADATA_OVERHEAD = 16

def __init__(self, file_path: str, blob_lengths: List[int], blob_offsets: List[int], field_name: str):
def __init__(self, file_io: FileIO, file_path: str, blob_lengths: List[int],
blob_offsets: List[int], field_name: str):
self.file_io = file_io
self.file_path = file_path
self.field_name = field_name
self.blob_lengths = blob_lengths
Expand All @@ -175,25 +176,14 @@ def __iter__(self) -> Iterator[GenericRow]:
def __next__(self) -> GenericRow:
if self.current_position >= len(self.blob_lengths):
raise StopIteration

# Create blob reference for the current blob
# Skip magic number (4 bytes) and exclude length (8 bytes) + CRC (4 bytes) = 12 bytes
blob_offset = self.blob_offsets[self.current_position] + self.MAGIC_NUMBER_SIZE # Skip magic number
blob_length = self.blob_lengths[self.current_position] - self.METADATA_OVERHEAD

# Create BlobDescriptor for this blob
descriptor = BlobDescriptor(self.file_path, blob_offset, blob_length)
blob = BlobRef(descriptor)

blob = Blob.from_file(self.file_io, self.file_path, blob_offset, blob_length)
self.current_position += 1

# Return as GenericRow with single blob field
from pypaimon.schema.data_types import DataField, AtomicType
from pypaimon.table.row.row_kind import RowKind

fields = [DataField(0, self.field_name, AtomicType("BLOB"))]
return GenericRow([blob], fields, RowKind.INSERT)

def returned_position(self) -> int:
"""Get current position in the iterator."""
return self.current_position
3 changes: 2 additions & 1 deletion paimon-python/pypaimon/read/split_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ def file_reader_supplier(self, file_path: str, for_merge_read: bool):
format_reader = FormatAvroReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
self.read_fields, self.push_down_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
blob_as_descriptor = self.table.options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR, False)
format_reader = FormatBlobReader(self.table.file_io, file_path, self._get_final_read_data_fields(),
self.read_fields, self.push_down_predicate)
self.read_fields, self.push_down_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format == CoreOptions.FILE_FORMAT_ORC:
format_reader = FormatPyArrowReader(self.table.file_io, file_format, file_path,
self._get_final_read_data_fields(), self.push_down_predicate)
Expand Down
Loading