Skip to content

Allow multitree compression by compressing at commit time #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions admin/src/bench/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ pub struct Args {
pub append: bool,
pub no_check: bool,
pub compress: bool,
pub ordered: bool,
pub uniform: bool,
pub writer_commits_per_sleep: u64,
pub writer_sleep_time: u64,
Expand All @@ -125,7 +124,6 @@ impl Stress {
archive: self.archive,
no_check: self.no_check,
compress: self.compress,
ordered: self.ordered,
uniform: self.uniform,
writer_commits_per_sleep: self.writer_commits_per_sleep.unwrap_or(100),
writer_sleep_time: self.writer_sleep_time.unwrap_or(0),
Expand Down
2 changes: 1 addition & 1 deletion src/btree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl BTreeTable {
return Ok(None)
}
let record_id = 0; // lifetime of Btree is the query, so no invalidate.
// keeping log locked when parsing tree.
// keeping log locked when parsing tree.
let tree = BTree::new(Some(btree_header.root), btree_header.depth, record_id);
tree.get(key, values, log)
}
Expand Down
130 changes: 106 additions & 24 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,10 +1120,76 @@ impl HashColumn {

// Can't support compression as we need to know the size earlier to get the tier.
let val: RcValue = data.into();
let cval = val.clone();
let compressed = false;

let address = Address::new(offset, target_tier as u8);

node_values.push(NodeChange::NewValue(address.as_u64(), val));
node_values.push(NodeChange::NewValue(address.as_u64(), val, cval, compressed));

Ok(address.as_u64())
}

fn claim_children_to_data_compress(
&self,
children: &Vec<NodeRef>,
tables: TablesRef,
node_values: &mut Vec<NodeChange>,
data: &mut Vec<u8>,
) -> Result<()> {
for child in children {
let address = match child {
NodeRef::New(node) => self.claim_node_compress(node, tables, node_values)?,
NodeRef::Existing(address) => {
if !self.append_only {
node_values.push(NodeChange::IncrementReference(*address));
}
*address
},
};
data.extend_from_slice(&address.to_le_bytes());
}
Ok(())
}

fn claim_node_compress(
&self,
node: &NewNode,
tables: TablesRef,
node_values: &mut Vec<NodeChange>,
) -> Result<NodeAddress> {
let num_children = node.children.len();
let data_size = packed_node_size(&node.data, num_children as u8);
let mut data: Vec<u8> = Vec::with_capacity(data_size);
data.extend_from_slice(&node.data);
self.claim_children_to_data_compress(&node.children, tables, node_values, &mut data)?;
data.push(num_children as u8);

let table_key = TableKey::NoHash;

let (cval, target_tier) =
Column::compress(tables.compression, &table_key, data.as_ref(), tables.tables);
let (cval, compressed) = cval
.as_ref()
.map(|cval| (cval.as_slice(), true))
.unwrap_or((data.as_ref(), false));

let cval: RcValue = cval.to_vec().into();
let val = if compressed { data.into() } else { cval.clone() };

assert!(
(target_tier >= (SIZE_TIERS - 1)) ||
cval.value().len() <=
tables.tables[target_tier].value_size(&table_key).unwrap() as usize
);

// Check it isn't multipart
//assert!(target_tier < (SIZE_TIERS - 1));

let offset = tables.tables[target_tier].claim_next_free()?;
let address = Address::new(offset, target_tier as u8);

node_values.push(NodeChange::NewValue(address.as_u64(), val, cval, compressed));

Ok(address.as_u64())
}
Expand All @@ -1139,32 +1205,48 @@ impl HashColumn {

let values = self.as_ref(&tables.value);

let mut tier_count: HashMap<usize, usize> = Default::default();
self.prepare_children(&node.children, values, &mut tier_count)?;

let mut tier_addresses: HashMap<usize, Vec<u64>> = Default::default();
let mut tier_index: HashMap<usize, usize> = Default::default();
for (tier, count) in tier_count {
let offsets = values.tables[tier].claim_entries(count)?;
tier_addresses.insert(tier, offsets);
tier_index.insert(tier, 0);
}

let mut node_values: Vec<NodeChange> = Default::default();

let num_children = node.children.len();
let data_size = packed_node_size(&node.data, num_children as u8);
let mut data: Vec<u8> = Vec::with_capacity(data_size);
data.extend_from_slice(&node.data);
self.claim_children_to_data(
&node.children,
values,
&tier_addresses,
&mut tier_index,
&mut node_values,
&mut data,
)?;
data.push(num_children as u8);

let data = if values.compression.does_compression() {
let data_size = packed_node_size(&node.data, num_children as u8);
let mut data: Vec<u8> = Vec::with_capacity(data_size);
data.extend_from_slice(&node.data);
self.claim_children_to_data_compress(
&node.children,
values,
&mut node_values,
&mut data,
)?;
data.push(num_children as u8);
data
} else {
let mut tier_count: HashMap<usize, usize> = Default::default();
self.prepare_children(&node.children, values, &mut tier_count)?;

let mut tier_addresses: HashMap<usize, Vec<u64>> = Default::default();
let mut tier_index: HashMap<usize, usize> = Default::default();
for (tier, count) in tier_count {
let offsets = values.tables[tier].claim_entries(count)?;
tier_addresses.insert(tier, offsets);
tier_index.insert(tier, 0);
}

let data_size = packed_node_size(&node.data, num_children as u8);
let mut data: Vec<u8> = Vec::with_capacity(data_size);
data.extend_from_slice(&node.data);
self.claim_children_to_data(
&node.children,
values,
&tier_addresses,
&mut tier_index,
&mut node_values,
&mut data,
)?;
data.push(num_children as u8);
data
};

return Ok((data, node_values))
},
Expand Down
7 changes: 7 additions & 0 deletions src/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ impl Compress {
pub fn new(kind: CompressionType, threshold: u32) -> Self {
Compress { inner: kind.into(), threshold }
}

pub fn does_compression(&self) -> bool {
match self.inner {
Compressor::NoCompression(..) => false,
_ => true,
}
}
}

pub const NO_COMPRESSION: Compress =
Expand Down
14 changes: 7 additions & 7 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2056,8 +2056,8 @@ impl<K: AsRef<[u8]>, Value> Operation<K, Value> {

#[derive(Debug, PartialEq, Eq)]
pub enum NodeChange {
/// (address, value)
NewValue(u64, RcValue),
/// (address, value, compressed value, compressed)
NewValue(u64, RcValue, RcValue, bool),
/// (address)
IncrementReference(u64),
/// Dereference and remove any of the children in the tree
Expand Down Expand Up @@ -2162,7 +2162,7 @@ impl IndexedChangeSet {
}
}
for change in self.node_changes.iter() {
if let NodeChange::NewValue(address, val) = change {
if let NodeChange::NewValue(address, val, _cval, _compressed) = change {
*bytes += val.value().len();
overlay.address.insert(*address, (record_id, val.clone()));
}
Expand Down Expand Up @@ -2195,11 +2195,11 @@ impl IndexedChangeSet {
}
for change in self.node_changes.iter() {
match change {
NodeChange::NewValue(address, val) => {
NodeChange::NewValue(address, val, cval, compressed) => {
column.write_address_value_plan(
*address,
val.clone(),
false,
cval.clone(),
*compressed,
val.value().len() as u32,
writer,
)?;
Expand Down Expand Up @@ -2289,7 +2289,7 @@ impl IndexedChangeSet {
}
}
for change in self.node_changes.iter() {
if let NodeChange::NewValue(address, _val) = change {
if let NodeChange::NewValue(address, _val, _cval, _compressed) = change {
if let Entry::Occupied(e) = overlay.address.entry(*address) {
if e.get().0 == record_id {
e.remove_entry();
Expand Down
2 changes: 1 addition & 1 deletion src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub fn madvise_random(_map: &mut memmap2::MmapMut) {}
fn mmap(file: &std::fs::File, len: usize) -> Result<memmap2::MmapMut> {
#[cfg(not(test))]
const RESERVE_ADDRESS_SPACE: usize = 1024 * 1024 * 1024; // 1 Gb
// Use a different value for tests to work around docker limits on the test machine.
// Use a different value for tests to work around docker limits on the test machine.
#[cfg(test)]
const RESERVE_ADDRESS_SPACE: usize = 64 * 1024 * 1024; // 64 Mb

Expand Down
4 changes: 0 additions & 4 deletions src/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ impl ColumnOptions {
log::error!(target: "parity-db", "`ref_counted` option is redundant when `append_only` is enabled");
return false
}
if self.multitree && self.compression != CompressionType::NoCompression {
log::error!(target: "parity-db", "Compression is not currently supported with multitree columns");
return false
}
true
}

Expand Down
29 changes: 29 additions & 0 deletions src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,35 @@ impl ValueTable {
Ok(index)
}

pub fn claim_next_free(&self) -> Result<u64> {
match &self.free_entries {
Some(free_entries) => {
let mut free_entries = free_entries.write();

let filled = self.filled.load(Ordering::Relaxed);
let last_removed = self.last_removed.load(Ordering::Relaxed);
let index = if last_removed != 0 {
let last = free_entries.stack.pop().unwrap();
debug_assert_eq!(last, last_removed);

let next_removed = *free_entries.stack.last().unwrap_or(&0u64);

self.last_removed.store(next_removed, Ordering::Relaxed);
last_removed
} else {
self.filled.store(filled + 1, Ordering::Relaxed);
filled
};
self.dirty_header.store(true, Ordering::Relaxed);
Ok(index)
},
None =>
return Err(crate::error::Error::InvalidConfiguration(format!(
"claim_next_free called without free_entries"
))),
}
}

pub fn claim_entries(&self, num: usize) -> Result<Vec<u64>> {
match &self.free_entries {
Some(free_entries) => {
Expand Down
Loading