31
31
Type ,
32
32
)
33
33
34
+ from pydantic_core import to_json
35
+
34
36
from pyiceberg .avro .file import AvroFile , AvroOutputFile
35
37
from pyiceberg .conversions import to_bytes
36
38
from pyiceberg .exceptions import ValidationError
37
39
from pyiceberg .io import FileIO , InputFile , OutputFile
38
40
from pyiceberg .partitioning import PartitionSpec
39
41
from pyiceberg .schema import Schema
40
- from pyiceberg .typedef import EMPTY_DICT , Record , TableVersion
42
+ from pyiceberg .typedef import Record , TableVersion
41
43
from pyiceberg .types import (
42
44
BinaryType ,
43
45
BooleanType ,
@@ -645,7 +647,6 @@ class ManifestWriter(ABC):
645
647
_output_file : OutputFile
646
648
_writer : AvroOutputFile [ManifestEntry ]
647
649
_snapshot_id : int
648
- _meta : Dict [str , str ]
649
650
_added_files : int
650
651
_added_rows : int
651
652
_existing_files : int
@@ -655,15 +656,12 @@ class ManifestWriter(ABC):
655
656
_min_data_sequence_number : Optional [int ]
656
657
_partitions : List [Record ]
657
658
658
- def __init__ (
659
- self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int , meta : Dict [str , str ] = EMPTY_DICT
660
- ) -> None :
659
+ def __init__ (self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int ) -> None :
661
660
self .closed = False
662
661
self ._spec = spec
663
662
self ._schema = schema
664
663
self ._output_file = output_file
665
664
self ._snapshot_id = snapshot_id
666
- self ._meta = meta
667
665
668
666
self ._added_files = 0
669
667
self ._added_rows = 0
@@ -697,6 +695,15 @@ def content(self) -> ManifestContent: ...
697
695
@abstractmethod
698
696
def version (self ) -> TableVersion : ...
699
697
698
+ @property
699
+ def _meta (self ) -> Dict [str , str ]:
700
+ return {
701
+ "schema" : self ._schema .model_dump_json (),
702
+ "partition-spec" : to_json (self ._spec .fields ).decode ("utf-8" ),
703
+ "partition-spec-id" : str (self ._spec .spec_id ),
704
+ "format-version" : str (self .version ),
705
+ }
706
+
700
707
def _with_partition (self , format_version : TableVersion ) -> Schema :
701
708
data_file_type = data_file_with_partition (
702
709
format_version = format_version , partition_type = self ._spec .partition_type (self ._schema )
@@ -771,12 +778,6 @@ def __init__(self, spec: PartitionSpec, schema: Schema, output_file: OutputFile,
771
778
schema ,
772
779
output_file ,
773
780
snapshot_id ,
774
- {
775
- "schema" : schema .model_dump_json (),
776
- "partition-spec" : spec .model_dump_json (),
777
- "partition-spec-id" : str (spec .spec_id ),
778
- "format-version" : "1" ,
779
- },
780
781
)
781
782
782
783
def content (self ) -> ManifestContent :
@@ -792,19 +793,7 @@ def prepare_entry(self, entry: ManifestEntry) -> ManifestEntry:
792
793
793
794
class ManifestWriterV2 (ManifestWriter ):
794
795
def __init__ (self , spec : PartitionSpec , schema : Schema , output_file : OutputFile , snapshot_id : int ):
795
- super ().__init__ (
796
- spec ,
797
- schema ,
798
- output_file ,
799
- snapshot_id ,
800
- meta = {
801
- "schema" : schema .model_dump_json (),
802
- "partition-spec" : spec .model_dump_json (),
803
- "partition-spec-id" : str (spec .spec_id ),
804
- "format-version" : "2" ,
805
- "content" : "data" ,
806
- },
807
- )
796
+ super ().__init__ (spec , schema , output_file , snapshot_id )
808
797
809
798
def content (self ) -> ManifestContent :
810
799
return ManifestContent .DATA
@@ -813,6 +802,13 @@ def content(self) -> ManifestContent:
813
802
def version (self ) -> TableVersion :
814
803
return 2
815
804
805
+ @property
806
+ def _meta (self ) -> Dict [str , str ]:
807
+ return {
808
+ ** super ()._meta ,
809
+ "content" : "data" ,
810
+ }
811
+
816
812
def prepare_entry (self , entry : ManifestEntry ) -> ManifestEntry :
817
813
if entry .data_sequence_number is None :
818
814
if entry .snapshot_id is not None and entry .snapshot_id != self ._snapshot_id :
0 commit comments