Skip to content

Commit 7e66ccb

Browse files
authored
feat: Add tell() to OutputStream writers (#2998)
# Rationale for this change Currently, PyIceberg writes one manifest per snapshot operation regardless of manifest size. In order to eventually support this we need to be able to track written bytes without closing the file, so that we can roll to a new file once we hit target size. We had some of this work done in #650, but we can keep this simple and add writers as a follow up. The nice thing is that the underlying streams we support already have a tell() method and we just need to expose it. With this change in the follow up we can do: ``` with write_manifest(...) as writer: writer.add_entry(entry) if writer.tell() >= target_file_size: # roll to new file ``` ## Are these changes tested? Yes, added a test :) ## Are there any user-facing changes? No
1 parent c0e7c6d commit 7e66ccb

File tree

4 files changed

+44
-0
lines changed

4 files changed

+44
-0
lines changed

pyiceberg/avro/file.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,3 +317,6 @@ def write_block(self, objects: list[D]) -> None:
317317
self.encoder.write(block_content)
318318

319319
self.encoder.write(self.sync_bytes)
320+
321+
def tell(self) -> int:
322+
return self.output_stream.tell()

pyiceberg/io/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@ class OutputStream(Protocol): # pragma: no cover
140140
@abstractmethod
141141
def write(self, b: bytes) -> int: ...
142142

143+
@abstractmethod
144+
def tell(self) -> int: ...
145+
143146
@abstractmethod
144147
def close(self) -> None: ...
145148

pyiceberg/manifest.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,6 +1059,9 @@ def __exit__(
10591059
self.closed = True
10601060
self._writer.__exit__(exc_type, exc_value, traceback)
10611061

1062+
def tell(self) -> int:
1063+
return self._writer.tell()
1064+
10621065
@abstractmethod
10631066
def content(self) -> ManifestContent: ...
10641067

tests/utils/test_manifest.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -897,3 +897,38 @@ def test_manifest_cache_efficiency_with_many_overlapping_lists() -> None:
897897
if len(references) > 1:
898898
for ref in references[1:]:
899899
assert ref is references[0], f"All references to manifest {i} should be the same object instance"
900+
901+
902+
@pytest.mark.parametrize("format_version", [1, 2])
903+
def test_manifest_writer_tell(format_version: TableVersion) -> None:
904+
io = load_file_io()
905+
test_schema = Schema(NestedField(1, "foo", IntegerType(), False))
906+
907+
with TemporaryDirectory() as tmpdir:
908+
output_file = io.new_output(f"{tmpdir}/test-manifest.avro")
909+
with write_manifest(
910+
format_version=format_version,
911+
spec=UNPARTITIONED_PARTITION_SPEC,
912+
schema=test_schema,
913+
output_file=output_file,
914+
snapshot_id=1,
915+
avro_compression="null",
916+
) as writer:
917+
initial_bytes = writer.tell()
918+
data_file = DataFile.from_args(
919+
content=DataFileContent.DATA,
920+
file_path=f"{tmpdir}/data.parquet",
921+
file_format=FileFormat.PARQUET,
922+
partition=Record(),
923+
record_count=100,
924+
file_size_in_bytes=1000,
925+
)
926+
entry = ManifestEntry.from_args(
927+
status=ManifestEntryStatus.ADDED,
928+
snapshot_id=1,
929+
data_file=data_file,
930+
)
931+
writer.add_entry(entry)
932+
after_entry_bytes = writer.tell()
933+
934+
assert after_entry_bytes > initial_bytes, "Bytes should increase after adding entry"

0 commit comments

Comments
 (0)