Skip to content

Commit

Permalink
Zip compression support
Browse files Browse the repository at this point in the history
Using ad-hoc `libzip` bindings, because they're 5-10 times faster than
the `zip` crate. Kinda annoying and hopefully we'll be able to move away
from that at some point, though.
  • Loading branch information
KenjiTakahashi committed Feb 27, 2019
1 parent 0266b91 commit 4123ce5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@

*.json
*.smoosh
output/
5 changes: 5 additions & 0 deletions src/conf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ pub struct Conf {
)]
pub compression: Compression,

#[structopt(short, long,
raw(possible_values = r#"&["1", "2", "3", "4", "5", "6", "7", "8", "9"]"#),
)]
pub zip: Option<u8>,

#[structopt(short, long, raw(default_value = "&numcpus"))]
pub threads: usize,

Expand Down
54 changes: 40 additions & 14 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate byteorder;
extern crate clap;
#[macro_use] extern crate lazy_static;
extern crate indexmap;
#[macro_use] extern crate log;
extern crate lz4;
extern crate num_cpus;
#[macro_use]
Expand All @@ -17,10 +18,13 @@ use std::collections::HashMap;
use std::fs;
use std::io::{BufWriter, Write};
use std::path::PathBuf;
use std::time::Instant;

pub mod conf;
mod interner;
mod zip;
use interner::IS;
use zip::Zip;

lazy_static! {
static ref META_TYPES: HashMap<&'static str, String> = {
Expand Down Expand Up @@ -230,27 +234,49 @@ impl Data {
}

pub fn write(&self, path: &PathBuf) {
self.write_version(path.join("version.bin"));
self.write_factory(path.join("factory.json"));
self.write_data(path.join("00000.smoosh"), path.join("meta.smoosh"));
if let Some(compression) = conf::vals.zip {
let mut version = vec![];
let mut factory = vec![];
let mut data = vec![];
let mut meta = vec![];
self.write_version(&mut version);
self.write_factory(&mut factory);
self.write_data(&mut data, &mut meta);

let instant = Instant::now();

let zip = Zip::new(path.join("index.zip"), compression).unwrap();
zip.file_add("version.bin", &version).unwrap();
zip.file_add("factory.json", &factory).unwrap();
zip.file_add("00000.smoosh", &data).unwrap();
zip.file_add("meta.smoosh", &meta).unwrap();
zip.close().unwrap();

debug!("zipf `{:?}`", instant.elapsed());

return;
}
let mut file = fs::File::create(path.join("version.bin")).unwrap();
self.write_version(&mut file);
file = fs::File::create(path.join("factory.json")).unwrap();
self.write_factory(&mut file);
file = fs::File::create(path.join("00000.smoosh")).unwrap();
let mut meta_file = fs::File::create(path.join("meta.smoosh")).unwrap();
self.write_data(&mut file, &mut meta_file);
}

fn write_version(&self, path: PathBuf) {
let mut fo = fs::File::create(path).unwrap();
fo.write_u32::<BE>(9).unwrap();
fn write_version(&self, writer: &mut Write) {
writer.write_u32::<BE>(9).unwrap();
}

fn write_factory(&self, path: PathBuf) {
let fo = fs::File::create(path).unwrap();
fn write_factory(&self, writer: &mut Write) {
let factory = json!({"type": "mMapSegmentFactory"});
serde_json::to_writer(fo, &factory).unwrap();
serde_json::to_writer(writer, &factory).unwrap();
}

fn write_data(&self, path: PathBuf, meta_path: PathBuf) {
let fo = fs::File::create(path).unwrap();
let mut writer = BufWriter::new(fo);
let m_fo = fs::File::create(meta_path).unwrap();
let mut m_writer = BufWriter::new(m_fo);
fn write_data(&self, data_writer: &mut Write, meta_writer: &mut Write) {
let mut writer = BufWriter::new(data_writer);
let mut m_writer = BufWriter::new(meta_writer);

let metrics = conf::vals.metrics.iter().collect::<IndexSet<_>>();
let mut dimensions = conf::vals.dimensions.iter().collect::<IndexSet<_>>();
Expand Down
79 changes: 79 additions & 0 deletions src/zip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::error::Error;
use std::ffi::CString;
use std::os::raw;
use std::path::PathBuf;

#[link(name = "zip")]
extern {
fn zip_open(path: *const raw::c_char, flags: raw::c_int, errorp: *mut raw::c_int) -> *mut raw::c_void;
fn zip_file_add(archive: *mut raw::c_void, name: *const raw::c_char, source: *mut raw::c_void, flags: raw::c_uint) -> raw::c_longlong;
fn zip_close(archive: *mut raw::c_void) -> raw::c_int;
fn zip_source_buffer_create(data: *const raw::c_void, len: raw::c_ulonglong, freep: raw::c_int, error: *mut raw::c_void) -> *mut raw::c_void;
fn zip_set_file_compression(archive: *mut raw::c_void, index: raw::c_ulonglong, comp: raw::c_int, comp_flags: raw::c_uint) -> raw::c_int;
}

static ZIP_CREATE: i32 = 1;
static ZIP_TRUNCATE: i32 = 8;
static ZIP_CM_DEFLATE: i32 = 8;

#[derive(Debug)]
pub struct ZipError(&'static str);

impl std::fmt::Display for ZipError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "Zip Error: `{}`", self.0)
}
}

impl Error for ZipError {}

pub struct Zip {
ptr: *mut raw::c_void,
compression: u8,
}

impl Zip {
pub fn new(path: PathBuf, compression: u8) -> Result<Self, ZipError> {
let mut errorp = 0;
unsafe {
let cpath = CString::new(path.to_str().unwrap()).unwrap();
let zip = zip_open(cpath.as_ptr(), ZIP_CREATE | ZIP_TRUNCATE, &mut errorp);
if zip.is_null() {
return Result::Err(ZipError("Could not open os file"));
}
Result::Ok(Self{ptr: zip, compression})
}
}

pub fn file_add(&self, name: &str, data: &[u8]) -> Result<(), ZipError> {
let mut error = vec![]; // Not used, but have to bring sth valid to the call
unsafe {
let source = zip_source_buffer_create(
data.as_ptr() as *const raw::c_void,
data.len() as u64,
0,
error.as_mut_ptr() as *mut raw::c_void,
);
if source.is_null() {
return Result::Err(ZipError("Could not allocate memort for source"));
}

let cname = CString::new(name).unwrap();
let idx = zip_file_add(self.ptr, cname.as_ptr(), source, 0);
if idx < 0 {
return Result::Err(ZipError("Could not allocate memory for file"));
}
zip_set_file_compression(self.ptr, idx as u64, ZIP_CM_DEFLATE, self.compression.into());
Result::Ok(())
}
}

pub fn close(&self) -> Result<(), ZipError> {
unsafe {
if zip_close(self.ptr) < 0 {
return Result::Err(ZipError("Could not write archive"));
}
}
Result::Ok(())
}
}

0 comments on commit 4123ce5

Please sign in to comment.