Skip to content

Commit 9532243

Browse files
authored
Merge branch '2.8.0' into feat/new-cache-api
2 parents 31efeb1 + 0f46ab0 commit 9532243

File tree

10 files changed

+456
-11
lines changed

10 files changed

+456
-11
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
name = "lsm-tree"
33
description = "A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs)"
44
license = "MIT OR Apache-2.0"
5-
version = "2.7.5"
5+
version = "2.8.0"
66
edition = "2021"
77
rust-version = "1.75.0"
88
readme = "README.md"

src/abstract.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ pub type RangeItem = crate::Result<KvPair>;
1818
#[allow(clippy::module_name_repetitions)]
1919
#[enum_dispatch]
2020
pub trait AbstractTree {
21+
/// Ingests a sorted stream of key-value pairs into the tree.
22+
///
23+
/// Can only be called on a new fresh, empty tree.
24+
///
25+
/// # Errors
26+
///
27+
/// Will return `Err` if an IO error occurs.
28+
///
29+
/// # Panics
30+
///
31+
/// Panics if the tree is **not** initially empty.
32+
///
33+
/// Will panic if the input iterator is not sorted in ascending order.
34+
#[doc(hidden)]
35+
fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()>;
36+
2137
/// Performs major compaction, blocking the caller until it's done.
2238
///
2339
/// # Errors

src/blob_tree/mod.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,69 @@ impl BlobTree {
232232
}
233233

234234
impl AbstractTree for BlobTree {
235+
fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
236+
use crate::tree::ingest::Ingestion;
237+
use std::time::Instant;
238+
239+
// NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
240+
let lock = self.lock_active_memtable();
241+
assert!(
242+
lock.is_empty(),
243+
"can only perform bulk_ingest on empty trees",
244+
);
245+
246+
let mut segment_writer = Ingestion::new(&self.index)?;
247+
let mut blob_writer = self.blobs.get_writer()?;
248+
249+
let start = Instant::now();
250+
let mut count = 0;
251+
let mut last_key = None;
252+
253+
for (key, value) in iter {
254+
if let Some(last_key) = &last_key {
255+
assert!(
256+
key > last_key,
257+
"next key in bulk ingest was not greater than last key",
258+
);
259+
}
260+
last_key = Some(key.clone());
261+
262+
// NOTE: Values are 32-bit max
263+
#[allow(clippy::cast_possible_truncation)]
264+
let value_size = value.len() as u32;
265+
266+
if value_size >= self.index.config.blob_file_separation_threshold {
267+
let vhandle = blob_writer.get_next_value_handle();
268+
269+
let indirection = MaybeInlineValue::Indirect {
270+
vhandle,
271+
size: value_size,
272+
};
273+
// TODO: use Slice::with_size
274+
let mut serialized_indirection = vec![];
275+
indirection.encode_into(&mut serialized_indirection)?;
276+
277+
segment_writer.write(key.clone(), serialized_indirection.into())?;
278+
279+
blob_writer.write(&key, value)?;
280+
} else {
281+
// TODO: use Slice::with_size
282+
let direct = MaybeInlineValue::Inline(value);
283+
let serialized_direct = direct.encode_into_vec();
284+
segment_writer.write(key, serialized_direct.into())?;
285+
}
286+
287+
count += 1;
288+
}
289+
290+
self.blobs.register_writer(blob_writer)?;
291+
segment_writer.finish()?;
292+
293+
log::info!("Ingested {count} items in {:?}", start.elapsed());
294+
295+
Ok(())
296+
}
297+
235298
fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
236299
self.index.major_compact(target_size, seqno_threshold)
237300
}
@@ -369,6 +432,7 @@ impl AbstractTree for BlobTree {
369432
vhandle,
370433
size: value_size,
371434
};
435+
// TODO: use Slice::with_size
372436
let mut serialized_indirection = vec![];
373437
indirection.encode_into(&mut serialized_indirection)?;
374438

@@ -377,6 +441,7 @@ impl AbstractTree for BlobTree {
377441

378442
blob_writer.write(&item.key.user_key, value)?;
379443
} else {
444+
// TODO: use Slice::with_size
380445
let direct = MaybeInlineValue::Inline(value);
381446
let serialized_direct = direct.encode_into_vec();
382447
segment_writer.write(InternalValue::new(item.key, serialized_direct))?;

src/compaction/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub(crate) mod fifo;
88
pub(crate) mod leveled;
99
pub(crate) mod maintenance;
1010
pub(crate) mod major;
11+
pub(crate) mod movedown;
1112
pub(crate) mod pulldown;
1213
pub(crate) mod stream;
1314
pub(crate) mod tiered;
@@ -22,6 +23,9 @@ use crate::{config::Config, level_manifest::LevelManifest, segment::meta::Segmen
2223
/// Alias for `Leveled`
2324
pub type Levelled = Leveled;
2425

26+
#[doc(hidden)]
27+
pub use movedown::Strategy as MoveDown;
28+
2529
#[doc(hidden)]
2630
pub use pulldown::Strategy as PullDown;
2731

src/compaction/movedown.rs

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
use super::{Choice, CompactionStrategy, Input};
6+
use crate::{level_manifest::LevelManifest, Config, HashSet, Segment};
7+
8+
/// Moves down a level into the destination level.
9+
pub struct Strategy(pub u8, pub u8);
10+
11+
impl CompactionStrategy for Strategy {
12+
fn get_name(&self) -> &'static str {
13+
"MoveDownCompaction"
14+
}
15+
16+
#[allow(clippy::expect_used)]
17+
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
18+
let resolved_view = levels.resolved_view();
19+
20+
let level = resolved_view
21+
.get(usize::from(self.0))
22+
.expect("level should exist");
23+
24+
let next_level = resolved_view
25+
.get(usize::from(self.1))
26+
.expect("next level should exist");
27+
28+
if next_level.is_empty() {
29+
let segment_ids: HashSet<_> = level.segments.iter().map(Segment::id).collect();
30+
31+
Choice::Move(Input {
32+
segment_ids,
33+
dest_level: self.1,
34+
target_size: 64_000_000,
35+
})
36+
} else {
37+
Choice::DoNothing
38+
}
39+
}
40+
}

src/compaction/worker.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ pub struct Options {
4343
/// Levels manifest.
4444
pub levels: Arc<RwLock<LevelManifest>>,
4545

46-
/// Sealed memtables (required for temporarily locking).
47-
pub sealed_memtables: Arc<RwLock<SealedMemtables>>,
48-
4946
/// Compaction strategy to use.
5047
pub strategy: Arc<dyn CompactionStrategy>,
5148

@@ -63,7 +60,6 @@ impl Options {
6360
tree_id: tree.id,
6461
segment_id_generator: tree.segment_id_counter.clone(),
6562
config: tree.config.clone(),
66-
sealed_memtables: tree.sealed_memtables.clone(),
6763
levels: tree.levels.clone(),
6864
stop_signal: tree.stop_signal.clone(),
6965
strategy,

src/key.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::{
1111
cmp::Reverse,
1212
io::{Read, Write},
1313
};
14-
use value_log::Slice;
1514
use varint_rs::{VarintReader, VarintWriter};
1615

1716
#[derive(Clone, PartialEq, Eq)]

src/tree/ingest.rs

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
use super::Tree;
6+
use crate::{
7+
file::SEGMENTS_FOLDER,
8+
segment::{block_index::BlockIndexImpl, multi_writer::MultiWriter, SegmentInner},
9+
AbstractTree, Segment, UserKey, UserValue, ValueType,
10+
};
11+
use std::{
12+
path::PathBuf,
13+
sync::{atomic::AtomicBool, Arc},
14+
};
15+
16+
pub struct Ingestion<'a> {
17+
folder: PathBuf,
18+
tree: &'a Tree,
19+
writer: MultiWriter,
20+
}
21+
22+
impl<'a> Ingestion<'a> {
23+
pub fn new(tree: &'a Tree) -> crate::Result<Self> {
24+
assert_eq!(
25+
0,
26+
tree.segment_count(),
27+
"can only perform bulk_ingest on empty trees",
28+
);
29+
30+
let folder = tree.config.path.join(SEGMENTS_FOLDER);
31+
log::debug!("Ingesting into disk segments in {folder:?}");
32+
33+
let mut writer = MultiWriter::new(
34+
tree.segment_id_counter.clone(),
35+
128 * 1_024 * 1_024,
36+
crate::segment::writer::Options {
37+
folder: folder.clone(),
38+
data_block_size: tree.config.data_block_size,
39+
index_block_size: tree.config.index_block_size,
40+
segment_id: 0, /* TODO: unused */
41+
},
42+
)?
43+
.use_compression(tree.config.compression);
44+
45+
{
46+
use crate::segment::writer::BloomConstructionPolicy;
47+
48+
if tree.config.bloom_bits_per_key >= 0 {
49+
writer = writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(
50+
tree.config.bloom_bits_per_key.unsigned_abs(),
51+
));
52+
} else {
53+
writer = writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
54+
}
55+
}
56+
57+
Ok(Self {
58+
folder,
59+
tree,
60+
writer,
61+
})
62+
}
63+
64+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
65+
self.writer.write(crate::InternalValue::from_components(
66+
key,
67+
value,
68+
0,
69+
ValueType::Value,
70+
))
71+
}
72+
73+
pub fn finish(self) -> crate::Result<()> {
74+
use crate::{
75+
compaction::MoveDown, segment::block_index::two_level_index::TwoLevelBlockIndex,
76+
};
77+
78+
let results = self.writer.finish()?;
79+
80+
let created_segments = results
81+
.into_iter()
82+
.map(|trailer| -> crate::Result<Segment> {
83+
let segment_id = trailer.metadata.id;
84+
let segment_file_path = self.folder.join(segment_id.to_string());
85+
86+
let block_index = TwoLevelBlockIndex::from_file(
87+
&segment_file_path,
88+
&trailer.metadata,
89+
trailer.offsets.tli_ptr,
90+
(self.tree.id, segment_id).into(),
91+
self.tree.config.descriptor_table.clone(),
92+
self.tree.config.block_cache.clone(),
93+
)?;
94+
let block_index = BlockIndexImpl::TwoLevel(block_index);
95+
let block_index = Arc::new(block_index);
96+
97+
Ok(SegmentInner {
98+
tree_id: self.tree.id,
99+
100+
descriptor_table: self.tree.config.descriptor_table.clone(),
101+
block_cache: self.tree.config.block_cache.clone(),
102+
103+
metadata: trailer.metadata,
104+
offsets: trailer.offsets,
105+
106+
#[allow(clippy::needless_borrows_for_generic_args)]
107+
block_index,
108+
109+
bloom_filter: Segment::load_bloom(
110+
&segment_file_path,
111+
trailer.offsets.bloom_ptr,
112+
)?,
113+
114+
path: segment_file_path,
115+
is_deleted: AtomicBool::default(),
116+
}
117+
.into())
118+
})
119+
.collect::<crate::Result<Vec<_>>>()?;
120+
121+
self.tree.register_segments(&created_segments)?;
122+
123+
self.tree.compact(Arc::new(MoveDown(0, 6)), 0)?;
124+
125+
for segment in &created_segments {
126+
let segment_file_path = self.folder.join(segment.id().to_string());
127+
128+
self.tree
129+
.config
130+
.descriptor_table
131+
.insert(&segment_file_path, segment.global_id());
132+
}
133+
134+
Ok(())
135+
}
136+
}

0 commit comments

Comments
 (0)