forked from apache/iceberg-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserializers.py
136 lines (102 loc) · 4.85 KB
/
serializers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import codecs
import gzip
from abc import ABC, abstractmethod
from typing import Callable
from pyiceberg.io import InputFile, InputStream, OutputFile
from pyiceberg.table.metadata import TableMetadata, TableMetadataUtil
from pyiceberg.typedef import UTF8
from pyiceberg.utils.config import Config
GZIP = "gzip"
class Compressor(ABC):
@staticmethod
def get_compressor(location: str) -> Compressor:
return GzipCompressor() if location.endswith(".gz.metadata.json") else NOOP_COMPRESSOR
@abstractmethod
def stream_decompressor(self, inp: InputStream) -> InputStream:
"""Return a stream decompressor.
Args:
inp: The input stream that needs decompressing.
Returns:
The wrapped stream
"""
@abstractmethod
def bytes_compressor(self) -> Callable[[bytes], bytes]:
"""Return a function to compress bytes.
Returns:
A function that can be used to compress bytes.
"""
class NoopCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return inp
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return lambda b: b
NOOP_COMPRESSOR = NoopCompressor()
class GzipCompressor(Compressor):
def stream_decompressor(self, inp: InputStream) -> InputStream:
return gzip.open(inp)
def bytes_compressor(self) -> Callable[[bytes], bytes]:
return gzip.compress
class FromByteStream:
"""A collection of methods that deserialize dictionaries into Iceberg objects."""
@staticmethod
def table_metadata(
byte_stream: InputStream, encoding: str = UTF8, compression: Compressor = NOOP_COMPRESSOR
) -> TableMetadata:
"""Instantiate a TableMetadata object from a byte stream.
Args:
byte_stream: A file-like byte stream object.
encoding (default "utf-8"): The byte encoder to use for the reader.
compression: Optional compression method
"""
with compression.stream_decompressor(byte_stream) as byte_stream:
reader = codecs.getreader(encoding)
json_bytes = reader(byte_stream)
metadata = json_bytes.read()
return TableMetadataUtil.parse_raw(metadata)
class FromInputFile:
"""A collection of methods that deserialize InputFiles into Iceberg objects."""
@staticmethod
def table_metadata(input_file: InputFile, encoding: str = UTF8) -> TableMetadata:
"""Create a TableMetadata instance from an input file.
Args:
input_file (InputFile): A custom implementation of the iceberg.io.file.InputFile abstract base class.
encoding (str): Encoding to use when loading bytestream.
Returns:
TableMetadata: A table metadata instance.
"""
with input_file.open() as input_stream:
return FromByteStream.table_metadata(
byte_stream=input_stream, encoding=encoding, compression=Compressor.get_compressor(location=input_file.location)
)
class ToOutputFile:
"""A collection of methods that serialize Iceberg objects into files given an OutputFile instance."""
@staticmethod
def table_metadata(metadata: TableMetadata, output_file: OutputFile, overwrite: bool = False) -> None:
"""Write a TableMetadata instance to an output file.
Args:
output_file (OutputFile): A custom implementation of the iceberg.io.file.OutputFile abstract base class.
overwrite (bool): Where to overwrite the file if it already exists. Defaults to `False`.
"""
with output_file.create(overwrite=overwrite) as output_stream:
# We need to serialize None values, in order to dump `None` current-snapshot-id as `-1`
exclude_none = False if Config().get_bool("legacy-current-snapshot-id") else True
json_bytes = metadata.model_dump_json(exclude_none=exclude_none).encode(UTF8)
json_bytes = Compressor.get_compressor(output_file.location).bytes_compressor()(json_bytes)
output_stream.write(json_bytes)