Skip to content

Commit 0f46ab0

Browse files
authored
Merge pull request #119 from fjall-rs/feat/bulk-ingest
Bulk ingestion API
2 parents 450fae3 + 47d5763 commit 0f46ab0

File tree

9 files changed

+455
-10
lines changed

9 files changed

+455
-10
lines changed

src/abstract.rs

+16
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ pub type RangeItem = crate::Result<KvPair>;
1818
#[allow(clippy::module_name_repetitions)]
1919
#[enum_dispatch]
2020
pub trait AbstractTree {
21+
/// Ingests a sorted stream of key-value pairs into the tree.
22+
///
23+
/// Can only be called on a new fresh, empty tree.
24+
///
25+
/// # Errors
26+
///
27+
/// Will return `Err` if an IO error occurs.
28+
///
29+
/// # Panics
30+
///
31+
/// Panics if the tree is **not** initially empty.
32+
///
33+
/// Will panic if the input iterator is not sorted in ascending order.
34+
#[doc(hidden)]
35+
fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()>;
36+
2137
/// Performs major compaction, blocking the caller until it's done.
2238
///
2339
/// # Errors

src/blob_tree/mod.rs

+65
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,69 @@ impl BlobTree {
230230
}
231231

232232
impl AbstractTree for BlobTree {
233+
fn ingest(&self, iter: impl Iterator<Item = (UserKey, UserValue)>) -> crate::Result<()> {
234+
use crate::tree::ingest::Ingestion;
235+
use std::time::Instant;
236+
237+
// NOTE: Lock active memtable so nothing else can be going on while we are bulk loading
238+
let lock = self.lock_active_memtable();
239+
assert!(
240+
lock.is_empty(),
241+
"can only perform bulk_ingest on empty trees",
242+
);
243+
244+
let mut segment_writer = Ingestion::new(&self.index)?;
245+
let mut blob_writer = self.blobs.get_writer()?;
246+
247+
let start = Instant::now();
248+
let mut count = 0;
249+
let mut last_key = None;
250+
251+
for (key, value) in iter {
252+
if let Some(last_key) = &last_key {
253+
assert!(
254+
key > last_key,
255+
"next key in bulk ingest was not greater than last key",
256+
);
257+
}
258+
last_key = Some(key.clone());
259+
260+
// NOTE: Values are 32-bit max
261+
#[allow(clippy::cast_possible_truncation)]
262+
let value_size = value.len() as u32;
263+
264+
if value_size >= self.index.config.blob_file_separation_threshold {
265+
let vhandle = blob_writer.get_next_value_handle();
266+
267+
let indirection = MaybeInlineValue::Indirect {
268+
vhandle,
269+
size: value_size,
270+
};
271+
// TODO: use Slice::with_size
272+
let mut serialized_indirection = vec![];
273+
indirection.encode_into(&mut serialized_indirection)?;
274+
275+
segment_writer.write(key.clone(), serialized_indirection.into())?;
276+
277+
blob_writer.write(&key, value)?;
278+
} else {
279+
// TODO: use Slice::with_size
280+
let direct = MaybeInlineValue::Inline(value);
281+
let serialized_direct = direct.encode_into_vec();
282+
segment_writer.write(key, serialized_direct.into())?;
283+
}
284+
285+
count += 1;
286+
}
287+
288+
self.blobs.register_writer(blob_writer)?;
289+
segment_writer.finish()?;
290+
291+
log::info!("Ingested {count} items in {:?}", start.elapsed());
292+
293+
Ok(())
294+
}
295+
233296
fn major_compact(&self, target_size: u64, seqno_threshold: SeqNo) -> crate::Result<()> {
234297
self.index.major_compact(target_size, seqno_threshold)
235298
}
@@ -367,6 +430,7 @@ impl AbstractTree for BlobTree {
367430
vhandle,
368431
size: value_size,
369432
};
433+
// TODO: use Slice::with_size
370434
let mut serialized_indirection = vec![];
371435
indirection.encode_into(&mut serialized_indirection)?;
372436

@@ -375,6 +439,7 @@ impl AbstractTree for BlobTree {
375439

376440
blob_writer.write(&item.key.user_key, value)?;
377441
} else {
442+
// TODO: use Slice::with_size
378443
let direct = MaybeInlineValue::Inline(value);
379444
let serialized_direct = direct.encode_into_vec();
380445
segment_writer.write(InternalValue::new(item.key, serialized_direct))?;

src/compaction/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub(crate) mod fifo;
88
pub(crate) mod leveled;
99
pub(crate) mod maintenance;
1010
pub(crate) mod major;
11+
pub(crate) mod movedown;
1112
pub(crate) mod pulldown;
1213
pub(crate) mod stream;
1314
pub(crate) mod tiered;
@@ -22,6 +23,9 @@ use crate::{config::Config, level_manifest::LevelManifest, segment::meta::Segmen
2223
/// Alias for `Leveled`
2324
pub type Levelled = Leveled;
2425

26+
#[doc(hidden)]
27+
pub use movedown::Strategy as MoveDown;
28+
2529
#[doc(hidden)]
2630
pub use pulldown::Strategy as PullDown;
2731

src/compaction/movedown.rs

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
use super::{Choice, CompactionStrategy, Input};
6+
use crate::{level_manifest::LevelManifest, Config, HashSet, Segment};
7+
8+
/// Moves down a level into the destination level.
9+
pub struct Strategy(pub u8, pub u8);
10+
11+
impl CompactionStrategy for Strategy {
12+
fn get_name(&self) -> &'static str {
13+
"MoveDownCompaction"
14+
}
15+
16+
#[allow(clippy::expect_used)]
17+
fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
18+
let resolved_view = levels.resolved_view();
19+
20+
let level = resolved_view
21+
.get(usize::from(self.0))
22+
.expect("level should exist");
23+
24+
let next_level = resolved_view
25+
.get(usize::from(self.1))
26+
.expect("next level should exist");
27+
28+
if next_level.is_empty() {
29+
let segment_ids: HashSet<_> = level.segments.iter().map(Segment::id).collect();
30+
31+
Choice::Move(Input {
32+
segment_ids,
33+
dest_level: self.1,
34+
target_size: 64_000_000,
35+
})
36+
} else {
37+
Choice::DoNothing
38+
}
39+
}
40+
}

src/compaction/worker.rs

-4
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@ pub struct Options {
4343
/// Levels manifest.
4444
pub levels: Arc<RwLock<LevelManifest>>,
4545

46-
/// Sealed memtables (required for temporarily locking).
47-
pub sealed_memtables: Arc<RwLock<SealedMemtables>>,
48-
4946
/// Compaction strategy to use.
5047
pub strategy: Arc<dyn CompactionStrategy>,
5148

@@ -63,7 +60,6 @@ impl Options {
6360
tree_id: tree.id,
6461
segment_id_generator: tree.segment_id_counter.clone(),
6562
config: tree.config.clone(),
66-
sealed_memtables: tree.sealed_memtables.clone(),
6763
levels: tree.levels.clone(),
6864
stop_signal: tree.stop_signal.clone(),
6965
strategy,

src/key.rs

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use std::{
1111
cmp::Reverse,
1212
io::{Read, Write},
1313
};
14-
use value_log::Slice;
1514
use varint_rs::{VarintReader, VarintWriter};
1615

1716
#[derive(Clone, PartialEq, Eq)]

src/tree/ingest.rs

+136
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright (c) 2024-present, fjall-rs
2+
// This source code is licensed under both the Apache 2.0 and MIT License
3+
// (found in the LICENSE-* files in the repository)
4+
5+
use super::Tree;
6+
use crate::{
7+
file::SEGMENTS_FOLDER,
8+
segment::{block_index::BlockIndexImpl, multi_writer::MultiWriter, SegmentInner},
9+
AbstractTree, Segment, UserKey, UserValue, ValueType,
10+
};
11+
use std::{
12+
path::PathBuf,
13+
sync::{atomic::AtomicBool, Arc},
14+
};
15+
16+
pub struct Ingestion<'a> {
17+
folder: PathBuf,
18+
tree: &'a Tree,
19+
writer: MultiWriter,
20+
}
21+
22+
impl<'a> Ingestion<'a> {
23+
pub fn new(tree: &'a Tree) -> crate::Result<Self> {
24+
assert_eq!(
25+
0,
26+
tree.segment_count(),
27+
"can only perform bulk_ingest on empty trees",
28+
);
29+
30+
let folder = tree.config.path.join(SEGMENTS_FOLDER);
31+
log::debug!("Ingesting into disk segments in {folder:?}");
32+
33+
let mut writer = MultiWriter::new(
34+
tree.segment_id_counter.clone(),
35+
128 * 1_024 * 1_024,
36+
crate::segment::writer::Options {
37+
folder: folder.clone(),
38+
data_block_size: tree.config.data_block_size,
39+
index_block_size: tree.config.index_block_size,
40+
segment_id: 0, /* TODO: unused */
41+
},
42+
)?
43+
.use_compression(tree.config.compression);
44+
45+
{
46+
use crate::segment::writer::BloomConstructionPolicy;
47+
48+
if tree.config.bloom_bits_per_key >= 0 {
49+
writer = writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(
50+
tree.config.bloom_bits_per_key.unsigned_abs(),
51+
));
52+
} else {
53+
writer = writer.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0));
54+
}
55+
}
56+
57+
Ok(Self {
58+
folder,
59+
tree,
60+
writer,
61+
})
62+
}
63+
64+
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
65+
self.writer.write(crate::InternalValue::from_components(
66+
key,
67+
value,
68+
0,
69+
ValueType::Value,
70+
))
71+
}
72+
73+
pub fn finish(self) -> crate::Result<()> {
74+
use crate::{
75+
compaction::MoveDown, segment::block_index::two_level_index::TwoLevelBlockIndex,
76+
};
77+
78+
let results = self.writer.finish()?;
79+
80+
let created_segments = results
81+
.into_iter()
82+
.map(|trailer| -> crate::Result<Segment> {
83+
let segment_id = trailer.metadata.id;
84+
let segment_file_path = self.folder.join(segment_id.to_string());
85+
86+
let block_index = TwoLevelBlockIndex::from_file(
87+
&segment_file_path,
88+
&trailer.metadata,
89+
trailer.offsets.tli_ptr,
90+
(self.tree.id, segment_id).into(),
91+
self.tree.config.descriptor_table.clone(),
92+
self.tree.config.block_cache.clone(),
93+
)?;
94+
let block_index = BlockIndexImpl::TwoLevel(block_index);
95+
let block_index = Arc::new(block_index);
96+
97+
Ok(SegmentInner {
98+
tree_id: self.tree.id,
99+
100+
descriptor_table: self.tree.config.descriptor_table.clone(),
101+
block_cache: self.tree.config.block_cache.clone(),
102+
103+
metadata: trailer.metadata,
104+
offsets: trailer.offsets,
105+
106+
#[allow(clippy::needless_borrows_for_generic_args)]
107+
block_index,
108+
109+
bloom_filter: Segment::load_bloom(
110+
&segment_file_path,
111+
trailer.offsets.bloom_ptr,
112+
)?,
113+
114+
path: segment_file_path,
115+
is_deleted: AtomicBool::default(),
116+
}
117+
.into())
118+
})
119+
.collect::<crate::Result<Vec<_>>>()?;
120+
121+
self.tree.register_segments(&created_segments)?;
122+
123+
self.tree.compact(Arc::new(MoveDown(0, 6)), 0)?;
124+
125+
for segment in &created_segments {
126+
let segment_file_path = self.folder.join(segment.id().to_string());
127+
128+
self.tree
129+
.config
130+
.descriptor_table
131+
.insert(&segment_file_path, segment.global_id());
132+
}
133+
134+
Ok(())
135+
}
136+
}

0 commit comments

Comments
 (0)