Skip to content

Commit 76e87a8

Browse files
authored
feat(puffin): Add PuffinWriter (#959)
Part of #744 # Summary - Add PuffinWriter # Context - This is the fifth of a number of PRs to add support for Iceberg Puffin file format. - It might be helpful to refer to the overarching #714 from which these changes were split to understand better how these changes will fit in to the larger picture. - It may also be helpful to refer to the Java reference implementation for PuffinWriter [here](https://github.com/apache/iceberg/blob/1d9fefeb9680d782dc128f242604903e71c32f97/core/src/main/java/org/apache/iceberg/puffin/PuffinWriter.java#L43).
1 parent 5cdd6eb commit 76e87a8

File tree

2 files changed

+339
-0
lines changed

2 files changed

+339
-0
lines changed

crates/iceberg/src/puffin/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod compression;
2626
mod metadata;
2727
#[cfg(feature = "tokio")]
2828
mod reader;
29+
mod writer;
2930

3031
#[cfg(test)]
3132
mod test_utils;

crates/iceberg/src/puffin/writer.rs

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use std::collections::{HashMap, HashSet};
19+
20+
use bytes::Bytes;
21+
22+
use crate::io::{FileWrite, OutputFile};
23+
use crate::puffin::blob::Blob;
24+
use crate::puffin::compression::CompressionCodec;
25+
use crate::puffin::metadata::{BlobMetadata, FileMetadata, Flag};
26+
use crate::Result;
27+
28+
/// Puffin writer
29+
pub(crate) struct PuffinWriter {
30+
writer: Box<dyn FileWrite>,
31+
is_header_written: bool,
32+
num_bytes_written: u64,
33+
written_blobs_metadata: Vec<BlobMetadata>,
34+
properties: HashMap<String, String>,
35+
footer_compression_codec: CompressionCodec,
36+
flags: HashSet<Flag>,
37+
}
38+
39+
impl PuffinWriter {
40+
/// Returns a new Puffin writer
41+
pub(crate) async fn new(
42+
output_file: &OutputFile,
43+
properties: HashMap<String, String>,
44+
compress_footer: bool,
45+
) -> Result<Self> {
46+
let mut flags = HashSet::<Flag>::new();
47+
let footer_compression_codec = if compress_footer {
48+
flags.insert(Flag::FooterPayloadCompressed);
49+
CompressionCodec::Lz4
50+
} else {
51+
CompressionCodec::None
52+
};
53+
54+
Ok(Self {
55+
writer: output_file.writer().await?,
56+
is_header_written: false,
57+
num_bytes_written: 0,
58+
written_blobs_metadata: Vec::new(),
59+
properties,
60+
footer_compression_codec,
61+
flags,
62+
})
63+
}
64+
65+
/// Adds blob to Puffin file
66+
pub(crate) async fn add(
67+
&mut self,
68+
blob: Blob,
69+
compression_codec: CompressionCodec,
70+
) -> Result<()> {
71+
self.write_header_once().await?;
72+
73+
let offset = self.num_bytes_written;
74+
let compressed_bytes: Bytes = compression_codec.compress(blob.data)?.into();
75+
let length = compressed_bytes.len().try_into()?;
76+
self.write(compressed_bytes).await?;
77+
self.written_blobs_metadata.push(BlobMetadata {
78+
r#type: blob.r#type,
79+
fields: blob.fields,
80+
snapshot_id: blob.snapshot_id,
81+
sequence_number: blob.sequence_number,
82+
offset,
83+
length,
84+
compression_codec,
85+
properties: blob.properties,
86+
});
87+
88+
Ok(())
89+
}
90+
91+
/// Finalizes the Puffin file
92+
pub(crate) async fn close(mut self) -> Result<()> {
93+
self.write_header_once().await?;
94+
self.write_footer().await?;
95+
self.writer.close().await?;
96+
Ok(())
97+
}
98+
99+
async fn write(&mut self, bytes: Bytes) -> Result<()> {
100+
let length = bytes.len();
101+
self.writer.write(bytes).await?;
102+
self.num_bytes_written += length as u64;
103+
Ok(())
104+
}
105+
106+
async fn write_header_once(&mut self) -> Result<()> {
107+
if !self.is_header_written {
108+
let bytes = Bytes::copy_from_slice(&FileMetadata::MAGIC);
109+
self.write(bytes).await?;
110+
self.is_header_written = true;
111+
}
112+
Ok(())
113+
}
114+
115+
fn footer_payload_bytes(&self) -> Result<Vec<u8>> {
116+
let file_metadata = FileMetadata {
117+
blobs: self.written_blobs_metadata.clone(),
118+
properties: self.properties.clone(),
119+
};
120+
let json = serde_json::to_string::<FileMetadata>(&file_metadata)?;
121+
let bytes = json.as_bytes();
122+
self.footer_compression_codec.compress(bytes.to_vec())
123+
}
124+
125+
fn flags_bytes(&self) -> [u8; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize] {
126+
let mut result = [0; FileMetadata::FOOTER_STRUCT_FLAGS_LENGTH as usize];
127+
for flag in &self.flags {
128+
let byte_idx: usize = flag.byte_idx().into();
129+
result[byte_idx] |= 0x1 << flag.bit_idx();
130+
}
131+
result
132+
}
133+
134+
async fn write_footer(&mut self) -> Result<()> {
135+
let mut footer_payload_bytes = self.footer_payload_bytes()?;
136+
let footer_payload_bytes_length = u32::to_le_bytes(footer_payload_bytes.len().try_into()?);
137+
138+
let mut footer_bytes = Vec::new();
139+
footer_bytes.extend(&FileMetadata::MAGIC);
140+
footer_bytes.append(&mut footer_payload_bytes);
141+
footer_bytes.extend(footer_payload_bytes_length);
142+
footer_bytes.extend(self.flags_bytes());
143+
footer_bytes.extend(&FileMetadata::MAGIC);
144+
145+
self.write(footer_bytes.into()).await
146+
}
147+
}
148+
149+
#[cfg(test)]
150+
mod tests {
151+
use std::collections::HashMap;
152+
153+
use tempfile::TempDir;
154+
155+
use crate::io::{FileIOBuilder, InputFile, OutputFile};
156+
use crate::puffin::blob::Blob;
157+
use crate::puffin::compression::CompressionCodec;
158+
use crate::puffin::metadata::FileMetadata;
159+
use crate::puffin::reader::PuffinReader;
160+
use crate::puffin::test_utils::{
161+
blob_0, blob_1, empty_footer_payload, empty_footer_payload_bytes, file_properties,
162+
java_empty_uncompressed_input_file, java_uncompressed_metric_input_file,
163+
java_zstd_compressed_metric_input_file, uncompressed_metric_file_metadata,
164+
zstd_compressed_metric_file_metadata,
165+
};
166+
use crate::puffin::writer::PuffinWriter;
167+
use crate::Result;
168+
169+
async fn write_puffin_file(
170+
temp_dir: &TempDir,
171+
blobs: Vec<(Blob, CompressionCodec)>,
172+
properties: HashMap<String, String>,
173+
) -> Result<OutputFile> {
174+
let file_io = FileIOBuilder::new_fs_io().build()?;
175+
176+
let path_buf = temp_dir.path().join("temp_puffin.bin");
177+
let temp_path = path_buf.to_str().unwrap();
178+
let output_file = file_io.new_output(temp_path)?;
179+
180+
let mut writer = PuffinWriter::new(&output_file, properties, false).await?;
181+
for (blob, compression_codec) in blobs {
182+
writer.add(blob, compression_codec).await?;
183+
}
184+
writer.close().await?;
185+
186+
Ok(output_file)
187+
}
188+
189+
async fn read_all_blobs_from_puffin_file(input_file: InputFile) -> Vec<Blob> {
190+
let puffin_reader = PuffinReader::new(input_file);
191+
let mut blobs = Vec::new();
192+
let blobs_metadata = puffin_reader.file_metadata().await.unwrap().clone().blobs;
193+
for blob_metadata in blobs_metadata {
194+
blobs.push(puffin_reader.blob(&blob_metadata).await.unwrap());
195+
}
196+
blobs
197+
}
198+
199+
#[tokio::test]
200+
async fn test_write_uncompressed_empty_file() {
201+
let temp_dir = TempDir::new().unwrap();
202+
203+
let input_file = write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
204+
.await
205+
.unwrap()
206+
.to_input_file();
207+
208+
assert_eq!(
209+
FileMetadata::read(&input_file).await.unwrap(),
210+
empty_footer_payload()
211+
);
212+
213+
assert_eq!(
214+
input_file.read().await.unwrap().len(),
215+
FileMetadata::MAGIC_LENGTH as usize
216+
// no blobs since puffin file is empty
217+
+ FileMetadata::MAGIC_LENGTH as usize
218+
+ empty_footer_payload_bytes().len()
219+
+ FileMetadata::FOOTER_STRUCT_LENGTH as usize
220+
)
221+
}
222+
223+
fn blobs_with_compression(
224+
blobs: Vec<Blob>,
225+
compression_codec: CompressionCodec,
226+
) -> Vec<(Blob, CompressionCodec)> {
227+
blobs
228+
.into_iter()
229+
.map(|blob| (blob, compression_codec))
230+
.collect()
231+
}
232+
233+
#[tokio::test]
234+
async fn test_write_uncompressed_metric_data() {
235+
let temp_dir = TempDir::new().unwrap();
236+
let blobs = vec![blob_0(), blob_1()];
237+
let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::None);
238+
239+
let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
240+
.await
241+
.unwrap()
242+
.to_input_file();
243+
244+
assert_eq!(
245+
FileMetadata::read(&input_file).await.unwrap(),
246+
uncompressed_metric_file_metadata()
247+
);
248+
249+
assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
250+
}
251+
252+
#[tokio::test]
253+
async fn test_write_zstd_compressed_metric_data() {
254+
let temp_dir = TempDir::new().unwrap();
255+
let blobs = vec![blob_0(), blob_1()];
256+
let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd);
257+
258+
let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
259+
.await
260+
.unwrap()
261+
.to_input_file();
262+
263+
assert_eq!(
264+
FileMetadata::read(&input_file).await.unwrap(),
265+
zstd_compressed_metric_file_metadata()
266+
);
267+
268+
assert_eq!(read_all_blobs_from_puffin_file(input_file).await, blobs)
269+
}
270+
271+
#[tokio::test]
272+
async fn test_write_lz4_compressed_metric_data() {
273+
let temp_dir = TempDir::new().unwrap();
274+
let blobs = vec![blob_0(), blob_1()];
275+
let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Lz4);
276+
277+
assert_eq!(
278+
write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
279+
.await
280+
.unwrap_err()
281+
.to_string(),
282+
"FeatureUnsupported => LZ4 compression is not supported currently"
283+
);
284+
}
285+
286+
async fn get_file_as_byte_vec(input_file: InputFile) -> Vec<u8> {
287+
input_file.read().await.unwrap().to_vec()
288+
}
289+
290+
async fn assert_files_are_bit_identical(actual: OutputFile, expected: InputFile) {
291+
let actual_bytes = get_file_as_byte_vec(actual.to_input_file()).await;
292+
let expected_bytes = get_file_as_byte_vec(expected).await;
293+
assert_eq!(actual_bytes, expected_bytes);
294+
}
295+
296+
#[tokio::test]
297+
async fn test_uncompressed_empty_puffin_file_is_bit_identical_to_java_generated_file() {
298+
let temp_dir = TempDir::new().unwrap();
299+
300+
assert_files_are_bit_identical(
301+
write_puffin_file(&temp_dir, Vec::new(), HashMap::new())
302+
.await
303+
.unwrap(),
304+
java_empty_uncompressed_input_file(),
305+
)
306+
.await
307+
}
308+
309+
#[tokio::test]
310+
async fn test_uncompressed_metric_data_is_bit_identical_to_java_generated_file() {
311+
let temp_dir = TempDir::new().unwrap();
312+
let blobs = vec![blob_0(), blob_1()];
313+
let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::None);
314+
315+
assert_files_are_bit_identical(
316+
write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
317+
.await
318+
.unwrap(),
319+
java_uncompressed_metric_input_file(),
320+
)
321+
.await
322+
}
323+
324+
#[tokio::test]
325+
async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() {
326+
let temp_dir = TempDir::new().unwrap();
327+
let blobs = vec![blob_0(), blob_1()];
328+
let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd);
329+
330+
assert_files_are_bit_identical(
331+
write_puffin_file(&temp_dir, blobs_with_compression, file_properties())
332+
.await
333+
.unwrap(),
334+
java_zstd_compressed_metric_input_file(),
335+
)
336+
.await
337+
}
338+
}

0 commit comments

Comments
 (0)