Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 32 additions & 16 deletions paimon-python/pypaimon/filesystem/pyarrow_file_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
################################################################################
import logging
import os
import re
import subprocess
import uuid
from datetime import datetime, timezone
Expand All @@ -40,11 +41,15 @@
from pypaimon.write.blob_format_writer import BlobFormatWriter


def _pyarrow_lt_7():
return parse(pyarrow.__version__) < parse("7.0.0")


class PyArrowFileIO(FileIO):
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options
self.logger = logging.getLogger(__name__)
self._pyarrow_gte_7 = parse(pyarrow.__version__) >= parse("7.0.0")
self._pyarrow_gte_7 = not _pyarrow_lt_7()
self._pyarrow_gte_8 = parse(pyarrow.__version__) >= parse("8.0.0")
scheme, netloc, _ = self.parse_location(path)
self.uri_reader_factory = UriReaderFactory(catalog_options)
Expand Down Expand Up @@ -194,10 +199,21 @@ def new_output_stream(self, path: str):

return self.filesystem.open_output_stream(path_str)

def _get_file_info(self, path_str: str):
try:
file_infos = self.filesystem.get_file_info([path_str])
return file_infos[0]
except OSError as e:
# this is for compatible with pyarrow < 7
msg = str(e).lower()
if ("does not exist" in msg or "not exist" in msg or "nosuchkey" in msg
or re.search(r'\b133\b', msg) or "notfound" in msg):
return pafs.FileInfo(path_str, pafs.FileType.NotFound)
raise

def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
file_infos = self.filesystem.get_file_info([path_str])
file_info = file_infos[0]
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
raise FileNotFoundError(f"File {path} (resolved as {path_str}) does not exist")
Expand All @@ -215,12 +231,11 @@ def list_directories(self, path: str):

def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
return file_info.type != pafs.FileType.NotFound
return self._get_file_info(path_str).type != pafs.FileType.NotFound

def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
return False
Expand All @@ -242,8 +257,11 @@ def delete(self, path: str, recursive: bool = False) -> bool:

def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
file_info = self._get_file_info(path_str)

if file_info.type == pafs.FileType.NotFound:
self.filesystem.create_dir(path_str, recursive=True)
return True
if file_info.type == pafs.FileType.Directory:
return True
elif file_info.type == pafs.FileType.File:
Expand All @@ -264,15 +282,15 @@ def rename(self, src: str, dst: str) -> bool:
if hasattr(self.filesystem, 'rename'):
return self.filesystem.rename(src_str, dst_str)

dst_file_info = self.filesystem.get_file_info([dst_str])[0]
dst_file_info = self._get_file_info(dst_str)
if dst_file_info.type != pafs.FileType.NotFound:
if dst_file_info.type == pafs.FileType.File:
return False
# Make it compatible with HadoopFileIO: if dst is an existing directory,
# dst=dst/srcFileName
src_name = Path(src_str).name
dst_str = str(Path(dst_str) / src_name)
final_dst_info = self.filesystem.get_file_info([dst_str])[0]
final_dst_info = self._get_file_info(dst_str)
if final_dst_info.type != pafs.FileType.NotFound:
return False

Expand Down Expand Up @@ -310,7 +328,7 @@ def delete_directory_quietly(self, directory: str):
def try_to_write_atomic(self, path: str, content: str) -> bool:
if self.exists(path):
path_str = self.to_filesystem_path(path)
file_info = self.filesystem.get_file_info([path_str])[0]
file_info = self._get_file_info(path_str)
if file_info.type == pafs.FileType.Directory:
return False

Expand Down Expand Up @@ -508,13 +526,11 @@ def to_filesystem_path(self, path: str) -> str:
if parsed.scheme:
if parsed.netloc:
path_part = normalized_path.lstrip('/')
# OSS+PyArrow<7: endpoint_override has bucket, pass key only.
if self._is_oss and not self._pyarrow_gte_7:
# For PyArrow 6.x + OSS, endpoint_override already contains bucket,
result = path_part if path_part else '.'
return result
else:
result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
return result
return path_part if path_part else '.'
result = f"{parsed.netloc}/{path_part}" if path_part else parsed.netloc
return result
else:
result = normalized_path.lstrip('/')
return result if result else '.'
Expand Down
66 changes: 56 additions & 10 deletions paimon-python/pypaimon/tests/file_io_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
from pathlib import Path
from unittest.mock import MagicMock, patch

import pyarrow
import pyarrow.fs as pafs

from pypaimon.common.options import Options
from pypaimon.common.options.config import OssOptions
from pypaimon.filesystem.local_file_io import LocalFileIO
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO
from pypaimon.filesystem.pyarrow_file_io import PyArrowFileIO, _pyarrow_lt_7


class FileIOTest(unittest.TestCase):
"""Test cases for FileIO.to_filesystem_path method."""

def test_s3_filesystem_path_conversion(self):
def test_filesystem_path_conversion(self):
"""Test S3FileSystem path conversion with various formats."""
file_io = PyArrowFileIO("s3://bucket/warehouse", Options({}))
self.assertIsInstance(file_io.filesystem, pafs.S3FileSystem)
Expand Down Expand Up @@ -66,18 +65,31 @@ def test_s3_filesystem_path_conversion(self):
parent_str = str(Path(converted_path).parent)
self.assertEqual(file_io.to_filesystem_path(parent_str), parent_str)

from packaging.version import parse as parse_version
lt7 = _pyarrow_lt_7()
oss_io = PyArrowFileIO("oss://test-bucket/warehouse", Options({
OssOptions.OSS_ENDPOINT.key(): 'oss-cn-hangzhou.aliyuncs.com'
}))
lt7 = parse_version(pyarrow.__version__) < parse_version("7.0.0")
got = oss_io.to_filesystem_path("oss://test-bucket/path/to/file.txt")
expected_path = (
"path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
self.assertEqual(got, expected_path)
self.assertEqual(got, "path/to/file.txt" if lt7 else "test-bucket/path/to/file.txt")
if lt7:
self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx/data.parquet"),
"db-xxx.db/tbl-xxx/data.parquet")
self.assertEqual(oss_io.to_filesystem_path("db-xxx.db/tbl-xxx"), "db-xxx.db/tbl-xxx")
manifest_uri = "oss://test-bucket/warehouse/db.db/table/manifest/manifest-list-abc-0"
manifest_key = oss_io.to_filesystem_path(manifest_uri)
self.assertEqual(manifest_key, "warehouse/db.db/table/manifest/manifest-list-abc-0",
"OSS+PyArrow6 must pass key only to PyArrow so manifest is written to correct bucket")
self.assertFalse(manifest_key.startswith("test-bucket/"),
"path must not start with bucket name or PyArrow 6 writes to wrong bucket")
nf = MagicMock(type=pafs.FileType.NotFound)
get_file_info_calls = []

def record_get_file_info(paths):
get_file_info_calls.append(list(paths))
return [MagicMock(type=pafs.FileType.NotFound) for _ in paths]

mock_fs = MagicMock()
mock_fs.get_file_info.side_effect = [[nf], [nf]]
mock_fs.get_file_info.side_effect = record_get_file_info if lt7 else [[nf], [nf]]
mock_fs.create_dir = MagicMock()
mock_fs.open_output_stream.return_value = MagicMock()
oss_io.filesystem = mock_fs
Expand All @@ -87,8 +99,42 @@ def test_s3_filesystem_path_conversion(self):
if lt7:
expected_parent = '/'.join(path_str.split('/')[:-1]) if '/' in path_str else ''
else:
expected_parent = str(Path(path_str).parent)
expected_parent = "/".join(path_str.split("/")[:-1]) if "/" in path_str else str(Path(path_str).parent)
self.assertEqual(mock_fs.create_dir.call_args[0][0], expected_parent)
if lt7:
for call_paths in get_file_info_calls:
for p in call_paths:
self.assertFalse(
p.startswith("test-bucket/"),
"OSS+PyArrow<7 must pass key only to get_file_info, not bucket/key. Got: %r" % (p,)
)

def test_exists(self):
lt7 = _pyarrow_lt_7()
with tempfile.TemporaryDirectory(prefix="file_io_nonexistent_") as tmpdir:
file_io = LocalFileIO("file://" + tmpdir, Options({}))
missing_uri = "file://" + os.path.join(tmpdir, "nonexistent_xyz")
path_str = file_io.to_filesystem_path(missing_uri)
raised = None
infos = None
try:
infos = file_io.filesystem.get_file_info([path_str])
except OSError as e:
raised = e
if lt7:
if raised is not None:
err = str(raised).lower()
self.assertTrue("133" in err or "does not exist" in err or "not exist" in err, str(raised))
else:
self.assertEqual(len(infos), 1)
self.assertEqual(infos[0].type, pafs.FileType.NotFound)
else:
self.assertIsNone(raised)
self.assertEqual(len(infos), 1)
self.assertEqual(infos[0].type, pafs.FileType.NotFound)
self.assertFalse(file_io.exists(missing_uri))
with self.assertRaises(FileNotFoundError):
file_io.get_file_status(missing_uri)

def test_local_filesystem_path_conversion(self):
file_io = LocalFileIO("file:///tmp/warehouse", Options({}))
Expand Down