Skip to content

Commit 00b1d6e

Browse files
committed
chore: use AMQFilter trait and decode StandardBloomFilter from AMQFilterBuilder
1 parent 38b2322 commit 00b1d6e

File tree

5 files changed

+116
-68
lines changed

5 files changed

+116
-68
lines changed

benches/bloom.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ fn standard_filter_construction(c: &mut Criterion) {
1414
}
1515

1616
fn standard_filter_contains(c: &mut Criterion) {
17-
use lsm_tree::segment::filter::standard_bloom::Builder;
17+
use lsm_tree::segment::filter::{standard_bloom::Builder, AMQFilter};
1818

1919
let keys = (0..100_000u128)
2020
.map(|x| x.to_be_bytes().to_vec())

src/segment/filter/mod.rs

+59-3
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ pub mod bit_array;
66
pub mod blocked_bloom;
77
pub mod standard_bloom;
88

9-
const CACHE_LINE_BYTES: usize = 64;
9+
use crate::{coding::DecodeError, file::MAGIC_BYTES};
10+
use byteorder::ReadBytesExt;
11+
use std::io::Read;
12+
13+
use standard_bloom::{Builder as StandardBloomFilterBuilder, StandardBloomFilter};
1014

11-
use blocked_bloom::Builder as BlockedBloomFilterBuilder;
12-
use standard_bloom::Builder as StandardBloomFilterBuilder;
15+
const CACHE_LINE_BYTES: usize = 64;
1316

1417
#[derive(Copy, Clone, Debug)]
1518
pub enum BloomConstructionPolicy {
@@ -42,3 +45,56 @@ impl BloomConstructionPolicy {
4245
}
4346
}
4447
}
48+
49+
enum FilterType {
50+
StandardBloom = 0,
51+
BlockedBloom = 1,
52+
}
53+
54+
impl TryFrom<u8> for FilterType {
55+
type Error = ();
56+
fn try_from(value: u8) -> Result<Self, Self::Error> {
57+
match value {
58+
0 => Ok(Self::StandardBloom),
59+
1 => Ok(Self::BlockedBloom),
60+
_ => Err(()),
61+
}
62+
}
63+
}
64+
65+
pub trait AMQFilter: Sync + Send {
66+
fn bytes(&self) -> &[u8];
67+
fn len(&self) -> usize;
68+
fn contains(&self, item: &[u8]) -> bool;
69+
fn contains_hash(&self, hash: (u64, u64)) -> bool;
70+
}
71+
72+
pub struct AMQFilterBuilder {}
73+
74+
impl AMQFilterBuilder {
75+
pub fn decode_from<R: Read>(reader: &mut R) -> Result<Box<dyn AMQFilter + Sync>, DecodeError> {
76+
// Check header
77+
let mut magic = [0u8; MAGIC_BYTES.len()];
78+
reader.read_exact(&mut magic)?;
79+
80+
if magic != MAGIC_BYTES {
81+
return Err(DecodeError::InvalidHeader("BloomFilter"));
82+
}
83+
84+
let filter_type = reader.read_u8()?;
85+
let filter_type = FilterType::try_from(filter_type)
86+
.map_err(|_| DecodeError::InvalidHeader("FilterType"))?;
87+
88+
match filter_type {
89+
FilterType::StandardBloom => StandardBloomFilter::decode_from(reader)
90+
.map(Self::wrap_filter)
91+
.map_err(|e| DecodeError::from(e)),
92+
// TODO: Implement
93+
FilterType::BlockedBloom => Err(DecodeError::InvalidHeader("BlockedBloom")),
94+
}
95+
}
96+
97+
fn wrap_filter<T: 'static + AMQFilter + Sync>(filter: T) -> Box<dyn AMQFilter + Sync> {
98+
Box::new(filter)
99+
}
100+
}

src/segment/filter/standard_bloom/mod.rs

+52-57
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::bit_array::BitArrayReader;
1+
use super::{bit_array::BitArrayReader, AMQFilter};
22
use crate::{
33
coding::{Decode, DecodeError, Encode, EncodeError},
44
file::MAGIC_BYTES,
@@ -30,6 +30,49 @@ pub struct StandardBloomFilter {
3030
}
3131

3232
// TODO: change encode/decode to be Filter enum
33+
impl AMQFilter for StandardBloomFilter {
34+
/// Size of bloom filter in bytes.
35+
#[must_use]
36+
fn len(&self) -> usize {
37+
self.inner.bytes().len()
38+
}
39+
40+
/// Returns the raw bytes of the filter.
41+
fn bytes(&self) -> &[u8] {
42+
self.inner.bytes()
43+
}
44+
45+
/// Returns `true` if the item may be contained.
46+
///
47+
/// Will never have a false negative.
48+
#[must_use]
49+
fn contains(&self, key: &[u8]) -> bool {
50+
self.contains_hash(Self::get_hash(key))
51+
}
52+
53+
/// Returns `true` if the hash may be contained.
54+
///
55+
/// Will never have a false negative.
56+
#[must_use]
57+
fn contains_hash(&self, hash: CompositeHash) -> bool {
58+
let (mut h1, mut h2) = hash;
59+
60+
for i in 1..=(self.k as u64) {
61+
let idx = h1 % (self.m as u64);
62+
63+
// NOTE: should be in bounds because of modulo
64+
#[allow(clippy::expect_used)]
65+
if !self.has_bit(idx as usize) {
66+
return false;
67+
}
68+
69+
h1 = h1.wrapping_add(h2);
70+
h2 = h2.wrapping_mul(i);
71+
}
72+
73+
true
74+
}
75+
}
3376

3477
impl Encode for StandardBloomFilter {
3578
fn encode_into<W: Write>(&self, writer: &mut W) -> Result<(), EncodeError> {
@@ -50,20 +93,10 @@ impl Encode for StandardBloomFilter {
5093
}
5194
}
5295

53-
impl Decode for StandardBloomFilter {
54-
fn decode_from<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
55-
// Check header
56-
let mut magic = [0u8; MAGIC_BYTES.len()];
57-
reader.read_exact(&mut magic)?;
58-
59-
if magic != MAGIC_BYTES {
60-
return Err(DecodeError::InvalidHeader("BloomFilter"));
61-
}
62-
63-
// NOTE: Filter type (unused)
64-
let filter_type = reader.read_u8()?;
65-
assert_eq!(0, filter_type, "Invalid filter type");
66-
96+
#[allow(clippy::len_without_is_empty)]
97+
impl StandardBloomFilter {
98+
// To be used by AMQFilter after magic bytes and filter type have been read and parsed
99+
pub(super) fn decode_from<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
67100
// NOTE: Hash type (unused)
68101
let hash_type = reader.read_u8()?;
69102
assert_eq!(0, hash_type, "Invalid bloom hash type");
@@ -76,15 +109,6 @@ impl Decode for StandardBloomFilter {
76109

77110
Ok(Self::from_raw(m, k, bytes.into()))
78111
}
79-
}
80-
81-
#[allow(clippy::len_without_is_empty)]
82-
impl StandardBloomFilter {
83-
/// Size of bloom filter in bytes.
84-
#[must_use]
85-
pub fn len(&self) -> usize {
86-
self.inner.bytes().len()
87-
}
88112

89113
fn from_raw(m: usize, k: usize, slice: crate::Slice) -> Self {
90114
Self {
@@ -94,37 +118,6 @@ impl StandardBloomFilter {
94118
}
95119
}
96120

97-
/// Returns `true` if the hash may be contained.
98-
///
99-
/// Will never have a false negative.
100-
#[must_use]
101-
pub fn contains_hash(&self, hash: CompositeHash) -> bool {
102-
let (mut h1, mut h2) = hash;
103-
104-
for i in 1..=(self.k as u64) {
105-
let idx = h1 % (self.m as u64);
106-
107-
// NOTE: should be in bounds because of modulo
108-
#[allow(clippy::expect_used)]
109-
if !self.has_bit(idx as usize) {
110-
return false;
111-
}
112-
113-
h1 = h1.wrapping_add(h2);
114-
h2 = h2.wrapping_mul(i);
115-
}
116-
117-
true
118-
}
119-
120-
/// Returns `true` if the item may be contained.
121-
///
122-
/// Will never have a false negative.
123-
#[must_use]
124-
pub fn contains(&self, key: &[u8]) -> bool {
125-
self.contains_hash(Self::get_hash(key))
126-
}
127-
128121
/// Returns `true` if the bit at `idx` is `1`.
129122
fn has_bit(&self, idx: usize) -> bool {
130123
self.inner.get(idx)
@@ -138,6 +131,8 @@ impl StandardBloomFilter {
138131

139132
#[cfg(test)]
140133
mod tests {
134+
use crate::segment::filter::AMQFilterBuilder;
135+
141136
use super::*;
142137
use std::fs::File;
143138
use test_log::test;
@@ -174,9 +169,9 @@ mod tests {
174169
drop(file);
175170

176171
let mut file = File::open(&path)?;
177-
let filter_copy = StandardBloomFilter::decode_from(&mut file)?;
172+
let filter_copy = AMQFilterBuilder::decode_from(&mut file)?;
178173

179-
assert_eq!(filter.inner, filter_copy.inner);
174+
assert_eq!(filter.inner.bytes(), filter_copy.bytes());
180175

181176
for key in keys {
182177
assert!(filter.contains(&**key));

src/segment/inner.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
// (found in the LICENSE-* files in the repository)
44

55
use super::{
6-
block_index::NewBlockIndexImpl, filter::standard_bloom::StandardBloomFilter, meta::ParsedMeta,
7-
trailer::Trailer,
6+
block_index::NewBlockIndexImpl, filter::AMQFilter, meta::ParsedMeta, trailer::Trailer,
87
};
98
use crate::{
109
cache::Cache, descriptor_table::DescriptorTable, tree::inner::TreeId, GlobalSegmentId,
@@ -39,7 +38,7 @@ pub struct Inner {
3938
pub cache: Arc<Cache>,
4039

4140
/// Pinned AMQ filter
42-
pub pinned_filter: Option<StandardBloomFilter>,
41+
pub pinned_filter: Option<Box<dyn AMQFilter + Sync>>,
4342

4443
// /// Pinned filter
4544
// #[doc(hidden)]

src/segment/mod.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::{
2727
cache::Cache, descriptor_table::DescriptorTable, InternalValue, SeqNo, TreeId, UserKey,
2828
};
2929
use block_index::{NewBlockIndex, NewBlockIndexImpl, NewFullBlockIndex};
30-
use filter::standard_bloom::{CompositeHash, StandardBloomFilter};
30+
use filter::{standard_bloom::CompositeHash, AMQFilterBuilder};
3131
use inner::Inner;
3232
use meta::ParsedMeta;
3333
use std::{
@@ -341,8 +341,6 @@ impl Segment {
341341
let pinned_filter = trailer
342342
.filter
343343
.map(|filter_ptr| {
344-
use crate::coding::Decode;
345-
346344
log::debug!("Reading filter block for pinning, with filter_ptr={filter_ptr:?}");
347345

348346
let block = Block::from_file(
@@ -353,7 +351,7 @@ impl Segment {
353351
)?;
354352

355353
let mut reader = &block.data[..];
356-
StandardBloomFilter::decode_from(&mut reader).map_err(Into::<crate::Error>::into)
354+
AMQFilterBuilder::decode_from(&mut reader).map_err(Into::<crate::Error>::into)
357355
})
358356
.transpose()?;
359357

0 commit comments

Comments
 (0)