Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
36b2651
Expose Ingestion API with Inversion of Control
zaidoon1 Nov 15, 2025
d9d041f
clippy
marvin-j97 Nov 15, 2025
3489b8f
don't pin ingestion output tables
zaidoon1 Nov 15, 2025
25c9cdf
split ingestion initialization from seqno assignment in bulk ingest t…
zaidoon1 Nov 15, 2025
087e5de
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 15, 2025
359a585
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 17, 2025
3314099
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
96a7f58
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
90b02a6
test: dirty snapshot after ingestion
marvin-j97 Nov 19, 2025
b73aa4e
refactor
marvin-j97 Nov 19, 2025
1877b57
refactor
marvin-j97 Nov 19, 2025
f3adbb7
Merge remote-tracking branch 'zaidoon1/zaidoon/ingestion-api-inversio…
marvin-j97 Nov 19, 2025
7849efa
more ergonomic ingestion API, add more tests
marvin-j97 Nov 19, 2025
0725204
refactor
marvin-j97 Nov 19, 2025
f06a70c
apply pinning on recovery
marvin-j97 Nov 19, 2025
8971f4c
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
0aa3ff4
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
fe64606
Merge branch 'main' into zaidoon/ingestion-api-inversion-control
marvin-j97 Nov 19, 2025
976cf87
change ingestion flush watermark to 0
marvin-j97 Nov 20, 2025
7fe024f
lint
marvin-j97 Nov 20, 2025
19e0876
feat: table global seqno
marvin-j97 Nov 22, 2025
bc0f85f
refactor
marvin-j97 Nov 22, 2025
0be5b17
implement atomic flush and global_seqno coordination
zaidoon1 Nov 22, 2025
f69061a
refactor ingestion to use upgrade_version_with_seqno for explicit seq…
zaidoon1 Nov 22, 2025
51e0fdc
refactor
marvin-j97 Nov 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 1 addition & 23 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

use crate::{
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, SequenceNumberCounter, TableId, Tree,
UserKey, UserValue,
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue,
};
use std::{
ops::RangeBounds,
Expand Down Expand Up @@ -143,27 +142,6 @@ pub trait AbstractTree {
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;

/// Ingests a sorted stream of key-value pairs into the tree.
///
/// Can only be called on a new fresh, empty tree.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
///
/// # Panics
///
/// Panics if the tree is **not** initially empty.
///
/// Will panic if the input iterator is not sorted in ascending order.
#[doc(hidden)]
fn ingest(
&self,
iter: impl Iterator<Item = (UserKey, UserValue)>,
seqno_generator: &SequenceNumberCounter,
visible_seqno: &SequenceNumberCounter,
) -> crate::Result<()>;

/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;

Expand Down
5 changes: 4 additions & 1 deletion src/any_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{BlobTree, Tree};
use crate::{
blob_tree::ingest::BlobIngestion, tree::ingest::Ingestion, BlobTree, SeqNo, Tree, UserKey,
UserValue,
};
use enum_dispatch::enum_dispatch;

/// May be a standard [`Tree`] or a [`BlobTree`]
Expand Down
254 changes: 254 additions & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
// Copyright (c) 2024-present, fjall-rs
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{
blob_tree::handle::BlobIndirection,
file::BLOBS_FOLDER,
table::Table,
tree::ingest::Ingestion as TableIngestion,
vlog::{BlobFileWriter, ValueHandle},
SeqNo, UserKey, UserValue,
};

/// Bulk ingestion for [`BlobTree`]
///
/// Items NEED to be added in ascending key order.
///
/// Uses table ingestion for the index and a blob file writer for large
/// values so both streams advance together.
pub struct BlobIngestion<'a> {
tree: &'a crate::BlobTree,
pub(crate) table: TableIngestion<'a>,
pub(crate) blob: BlobFileWriter,
seqno: SeqNo,
separation_threshold: u32,
last_key: Option<UserKey>,
}

impl<'a> BlobIngestion<'a> {
/// Creates a new ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
let kv = tree
.index
.config
.kv_separation_opts
.as_ref()
.expect("kv separation options should exist");

let blob_file_size = kv.file_target_size;

let table = TableIngestion::new(&tree.index)?;
let blob = BlobFileWriter::new(
tree.index.0.blob_file_id_counter.clone(),
tree.index.config.path.join(BLOBS_FOLDER),
)?
.use_target_size(blob_file_size)
.use_compression(kv.compression);

let separation_threshold = kv.separation_threshold;

Ok(Self {
tree,
table,
blob,
seqno: 0,
separation_threshold,
last_key: None,
})
}

/// Writes a key-value pair.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write(&mut self, key: UserKey, value: UserValue) -> crate::Result<()> {
// Check order before any blob I/O to avoid partial writes on failure
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

#[expect(clippy::cast_possible_truncation)]
let value_size = value.len() as u32;

if value_size >= self.separation_threshold {
let offset = self.blob.offset();
let blob_file_id = self.blob.blob_file_id();
let on_disk_size = self.blob.write(&key, self.seqno, &value)?;

let indirection = BlobIndirection {
vhandle: ValueHandle {
blob_file_id,
offset,
on_disk_size,
},
size: value_size,
};

let cloned_key = key.clone();
let res = self.table.write_indirection(key, indirection);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
} else {
let cloned_key = key.clone();
let res = self.table.write(key, value);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}
}

/// Writes a tombstone for a key.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn write_tombstone(&mut self, key: UserKey) -> crate::Result<()> {
if let Some(prev) = &self.last_key {
assert!(
key > *prev,
"next key in ingestion must be greater than last key"
);
}

let cloned_key = key.clone();
let res = self.table.write_tombstone(key);
if res.is_ok() {
self.last_key = Some(cloned_key);
}
res
}

/// Finishes the ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
#[allow(clippy::significant_drop_tightening)]
pub fn finish(self) -> crate::Result<SeqNo> {
use crate::AbstractTree;

let index = self.index().clone();

// CRITICAL SECTION: Atomic flush + seqno allocation + registration
//
// For BlobTree, we must coordinate THREE components atomically:
// 1. Index tree memtable flush
// 2. Value log blob files
// 3. Index tree tables (with blob indirections)
//
// The sequence ensures all components see the same global_seqno:
// 1. Acquire flush lock on index tree
// 2. Flush index tree active memtable
// 3. Finalize blob writer (creates blob files)
// 4. Finalize table writer (creates index tables)
// 5. Allocate next global seqno
// 6. Recover tables with that seqno
// 7. Register version with same seqno + blob files
//
// This prevents race conditions where blob files and their index
// entries could have mismatched sequence numbers.
let flush_lock = index.get_flush_lock();

// Flush any pending index memtable writes to ensure ingestion sees
// a consistent snapshot of the index.
// We call rotate + flush directly because we already hold the lock.
index.rotate_memtable();
index.flush(&flush_lock, 0)?;

// Finalize the blob writer first, ensuring all large values are
// written to blob files before we finalize the index tables that
// reference them.
let blob_files = self.blob.finish()?;

// Finalize the table writer, creating index tables with blob
// indirections pointing to the blob files we just created.
let results = self.table.writer.finish()?;

// Acquire locks for version registration on the index tree. We must
// hold both the compaction state lock and version history lock to
// safely modify the tree's version.
let mut _compaction_state = index.compaction_state.lock().expect("lock is poisoned");
let mut version_lock = index.version_history.write().expect("lock is poisoned");

// Allocate the next global sequence number. This seqno will be shared
// by all ingested tables, blob files, and the version that registers
// them, ensuring consistent MVCC snapshots across the value log.
let global_seqno = index.config.seqno.next();

// Recover all created index tables, assigning them the global_seqno
// we just allocated. These tables contain indirections to the blob
// files created above, so they must share the same sequence number
// for MVCC correctness.
//
// We intentionally do NOT pin filter/index blocks here. Large ingests
// are typically placed in level 1, and pinning would increase memory
// pressure unnecessarily.
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
index
.config
.path
.join(crate::file::TABLES_FOLDER)
.join(table_id.to_string()),
checksum,
global_seqno,
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
false,
false,
#[cfg(feature = "metrics")]
index.metrics.clone(),
)
})
.collect::<crate::Result<Vec<_>>>()?;

// Upgrade the version with our ingested tables and blob files, using
// the global_seqno we allocated earlier. This ensures the version,
// tables, and blob files all share the same sequence number, which is
// critical for GC correctness - we must not delete blob files that are
// still referenced by visible snapshots.
//
// We use upgrade_version_with_seqno (instead of upgrade_version) because
// we need precise control over the seqno: it must match the seqno we
// already assigned to the recovered tables.
version_lock.upgrade_version_with_seqno(
&index.config.path,
|current| {
let mut copy = current.clone();
copy.version =
copy.version
.with_new_l0_run(&created_tables, Some(&blob_files), None);
Ok(copy)
},
global_seqno,
)?;

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
log::warn!("Version GC failed: {e:?}");
}

Ok(global_seqno)
}

#[inline]
fn index(&self) -> &crate::Tree {
&self.tree.index
}
}
Loading