Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "VFS" #83

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ pub use storage::kv::error::{Error, Result};
pub use storage::kv::option::{IsolationLevel, Options};
pub use storage::kv::store::Store;
pub use storage::kv::transaction::{Durability, Mode, Transaction};

pub mod vfs;
109 changes: 63 additions & 46 deletions src/storage/kv/compaction.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fs::{self, File};
use std::io::{Read, Write};
use std::path::Path;
use std::sync::atomic::{AtomicBool, Ordering};

use crate::vfs::FileSystem;
use bytes::BytesMut;

use crate::storage::{
Expand Down Expand Up @@ -34,7 +34,7 @@ impl<'a> Drop for CompactionGuard<'a> {
}
}

impl StoreInner {
impl<V: FileSystem> StoreInner<V> {
pub async fn compact(&self) -> Result<()> {
// Early return if the store is closed or compaction is already in progress
if self.is_closed.load(Ordering::SeqCst) || !self.core.opts.should_persist_data() {
Expand All @@ -49,16 +49,16 @@ impl StoreInner {
// Clear files before starting compaction if a .merge or .tmp.merge directory exists
let tmp_merge_dir = self.core.opts.dir.join(".tmp.merge");
if tmp_merge_dir.exists() {
fs::remove_dir_all(&tmp_merge_dir)?;
self.vfs.remove_dir_all(&tmp_merge_dir)?;
}

let merge_dir = self.core.opts.dir.join(".merge");
if merge_dir.exists() {
fs::remove_dir_all(&merge_dir)?;
self.vfs.remove_dir_all(&merge_dir)?;
}

// Clean recovery state before starting compaction
RecoveryState::clear(&self.core.opts.dir)?;
RecoveryState::clear(&self.core.opts.dir, &self.vfs)?;

// Clear compaction stats before starting compaction
self.stats.compaction_stats.reset();
Expand All @@ -77,10 +77,10 @@ impl StoreInner {
drop(clog); // Explicitly drop the lock

// Create a temporary directory for compaction
fs::create_dir_all(&tmp_merge_dir)?;
self.vfs.create_dir_all(&tmp_merge_dir)?;

// Initialize a new manifest in the temporary directory
let mut manifest = Core::initialize_manifest(&tmp_merge_dir)?;
let mut manifest = Core::initialize_manifest(&tmp_merge_dir, &self.vfs)?;
// Add the last updated segment ID to the manifest
let changeset = Manifest::with_compacted_up_to_segment(last_updated_segment_id);
manifest.append(&changeset.serialize()?)?;
Expand All @@ -91,7 +91,7 @@ impl StoreInner {
let tm_opts = LogOptions::default()
.with_max_file_size(self.core.opts.max_compaction_segment_size)
.with_file_extension("clog".to_string());
let mut temp_writer = Aol::open(&temp_clog_dir, &tm_opts)?;
let mut temp_writer = Aol::open(&temp_clog_dir, &tm_opts, &self.vfs)?;

// TODO: Check later to add a new way for compaction by reading from the files first and then
// check in files for the keys that are not found in memory to handle deletion
Expand Down Expand Up @@ -205,7 +205,7 @@ impl StoreInner {
temp_writer.close()?;

// Finalize compaction by renaming the temporary directory
fs::rename(tmp_merge_dir, merge_dir)?;
self.vfs.rename(tmp_merge_dir, merge_dir)?;

Ok(())
}
Expand All @@ -218,10 +218,10 @@ pub(crate) enum RecoveryState {
}

impl RecoveryState {
pub(crate) fn load(dir: &Path) -> Result<Self> {
pub(crate) fn load<V: FileSystem>(dir: &Path, vfs: &V) -> Result<Self> {
let path = dir.join(".recovery_state");
if path.exists() {
let mut file = File::open(path)?;
let mut file = vfs.open(path)?;
let mut contents = String::new();
file.read_to_string(&mut contents)?;
match contents.as_str() {
Expand All @@ -233,9 +233,9 @@ impl RecoveryState {
}
}

pub(crate) fn save(&self, dir: &Path) -> Result<()> {
pub(crate) fn save<V: FileSystem>(&self, dir: &Path, vfs: &V) -> Result<()> {
let path = dir.join(".recovery_state");
let mut file = File::create(path)?;
let mut file = vfs.create(path)?;
match self {
RecoveryState::ClogDeleted => {
write!(file, "ClogDeleted")
Expand All @@ -246,26 +246,26 @@ impl RecoveryState {
}
}

pub(crate) fn clear(dir: &Path) -> Result<()> {
pub(crate) fn clear<V: FileSystem>(dir: &Path, vfs: &V) -> Result<()> {
let path = dir.join(".recovery_state");
if path.exists() {
fs::remove_file(path)?;
vfs.remove_file(path)?;
}
Ok(())
}
}

fn perform_recovery(opts: &Options) -> Result<()> {
fn perform_recovery<V: FileSystem>(opts: &Options, vfs: &V) -> Result<()> {
// Encapsulate operations in a closure for easier rollback
let result = || -> Result<()> {
let merge_dir = opts.dir.join(".merge");
let clog_dir = opts.dir.join("clog");
let merge_clog_subdir = merge_dir.join("clog");

// If there is a .merge directory, try reading manifest from it
let manifest = Core::initialize_manifest(&merge_dir)?;
let manifest = Core::initialize_manifest(&merge_dir, vfs)?;
let existing_manifest = if manifest.size()? > 0 {
Core::read_manifest(&merge_dir)?
Core::read_manifest(&merge_dir, vfs)?
} else {
return Err(Error::MergeManifestMissing);
};
Expand All @@ -276,18 +276,18 @@ fn perform_recovery(opts: &Options) -> Result<()> {
}

let compacted_upto_segment_id = compacted_upto_segments[0];
let segs = SegmentRef::read_segments_from_directory(&clog_dir)?;
let segs = SegmentRef::read_segments_from_directory(&clog_dir, vfs)?;
// Step 4: Copy files from clog dir to merge clog dir
for seg in segs.iter() {
if seg.id > compacted_upto_segment_id {
// Check if the path points to a regular file
match fs::metadata(&seg.file_path) {
match vfs.metadata(&seg.file_path) {
Ok(metadata) => {
if metadata.is_file() {
// Proceed to copy the file
let dest_path =
merge_clog_subdir.join(seg.file_path.file_name().unwrap());
match fs::copy(&seg.file_path, &dest_path) {
match vfs.copy(&seg.file_path, &dest_path) {
Ok(_) => println!("File copied successfully"),
Err(e) => {
println!("Error copying file: {:?}", e);
Expand All @@ -307,24 +307,24 @@ fn perform_recovery(opts: &Options) -> Result<()> {
}

// Clear any previous recovery state before setting a new one
RecoveryState::clear(&opts.dir)?;
RecoveryState::clear(&opts.dir, vfs)?;
// After successful operation, update recovery state to indicate clog can be deleted
RecoveryState::ClogDeleted.save(&opts.dir)?;
RecoveryState::ClogDeleted.save(&opts.dir, vfs)?;

// Delete the `clog` directory
if let Err(e) = fs::remove_dir_all(&clog_dir) {
if let Err(e) = vfs.remove_dir_all(&clog_dir) {
println!("Error deleting clog directory: {:?}", e);
return Err(Error::from(e));
}

// Rename `merge_clog_subdir` to `clog`
if let Err(e) = fs::rename(&merge_clog_subdir, &clog_dir) {
if let Err(e) = vfs.rename(&merge_clog_subdir, &clog_dir) {
println!("Error renaming merge_clog_subdir to clog: {:?}", e);
return Err(Error::from(e));
}

// Clear recovery state after successful completion
RecoveryState::clear(&opts.dir)?;
RecoveryState::clear(&opts.dir, vfs)?;

Ok(())
};
Expand All @@ -334,30 +334,40 @@ fn perform_recovery(opts: &Options) -> Result<()> {
Err(e) => {
let merge_dir = opts.dir.join(".merge");
let clog_dir = opts.dir.join("clog");
rollback(&merge_dir, &clog_dir, RecoveryState::load(&opts.dir)?)?;
rollback(
&merge_dir,
&clog_dir,
RecoveryState::load(&opts.dir, vfs)?,
vfs,
)?;
Err(e)
}
}
}

fn cleanup_after_recovery(opts: &Options) -> Result<()> {
fn cleanup_after_recovery<V: FileSystem>(opts: &Options, vfs: &V) -> Result<()> {
let merge_dir = opts.dir.join(".merge");

if merge_dir.exists() {
fs::remove_dir_all(&merge_dir)?;
vfs.remove_dir_all(&merge_dir)?;
}
Ok(())
}

fn rollback(merge_dir: &Path, clog_dir: &Path, checkpoint: RecoveryState) -> Result<()> {
fn rollback<V: FileSystem>(
merge_dir: &Path,
clog_dir: &Path,
checkpoint: RecoveryState,
vfs: &V,
) -> Result<()> {
if checkpoint == RecoveryState::ClogDeleted {
// Restore the clog directory from merge directory if it exists
// At this point the merge directory should exist and the clog directory should not
// So, we can safely rename the merge clog directory to clog directory
if !clog_dir.exists() && merge_dir.exists() {
let merge_clog_subdir = merge_dir.join("clog");
if merge_clog_subdir.exists() {
fs::rename(&merge_clog_subdir, clog_dir)?;
vfs.rename(&merge_clog_subdir, clog_dir)?;
}
}
}
Expand All @@ -369,36 +379,36 @@ fn needs_recovery(opts: &Options) -> Result<bool> {
Ok(opts.dir.join(".merge").exists())
}

fn handle_clog_deleted_state(opts: &Options) -> Result<()> {
fn handle_clog_deleted_state<V: FileSystem>(opts: &Options, vfs: &V) -> Result<()> {
let merge_dir = opts.dir.join(".merge");
let clog_dir = opts.dir.join("clog");
rollback(&merge_dir, &clog_dir, RecoveryState::ClogDeleted)
rollback(&merge_dir, &clog_dir, RecoveryState::ClogDeleted, vfs)
}

/// Restores the store from a compaction process by handling .tmp.merge and .merge directories.
/// TODO: This should happen post repair
pub fn restore_from_compaction(opts: &Options) -> Result<()> {
pub fn restore_from_compaction<V: FileSystem>(opts: &Options, vfs: &V) -> Result<()> {
let tmp_merge_dir = opts.dir.join(".tmp.merge");
// 1) Check if there is a .tmp.merge directory, delete it
if tmp_merge_dir.exists() {
// This means there was a previous compaction process that failed
// so we don't need to do anything here and just return
fs::remove_dir_all(&tmp_merge_dir)?;
vfs.remove_dir_all(&tmp_merge_dir)?;
return Ok(());
}

if !needs_recovery(opts)? {
return Ok(());
}

match RecoveryState::load(&opts.dir)? {
RecoveryState::ClogDeleted => handle_clog_deleted_state(opts)?,
match RecoveryState::load(&opts.dir, vfs)? {
RecoveryState::ClogDeleted => handle_clog_deleted_state(opts, vfs)?,
RecoveryState::None => (),
}

perform_recovery(opts)?;
perform_recovery(opts, vfs)?;
// Clean up merge directory after successful operation
cleanup_after_recovery(opts)
cleanup_after_recovery(opts, vfs)
}

#[cfg(test)]
Expand Down Expand Up @@ -428,30 +438,37 @@ mod tests {
let temp_dir = &temp_dir.to_path_buf();

// Clear state and re-setup for next test
RecoveryState::clear(temp_dir).unwrap();
RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap();
assert!(!path.exists());

// Test saving and loading ClogDeleted
RecoveryState::ClogDeleted.save(temp_dir).unwrap();
RecoveryState::ClogDeleted
.save(temp_dir, &crate::vfs::Dummy)
.unwrap();
assert_eq!(
RecoveryState::load(temp_dir).unwrap(),
RecoveryState::load(temp_dir, &crate::vfs::Dummy).unwrap(),
RecoveryState::ClogDeleted
);

// Clear state and re-setup for next test
RecoveryState::clear(temp_dir).unwrap();
RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap();
assert!(!path.exists());

// Test loading None when no state is saved
assert_eq!(RecoveryState::load(temp_dir).unwrap(), RecoveryState::None);
assert_eq!(
RecoveryState::load(temp_dir, &crate::vfs::Dummy).unwrap(),
RecoveryState::None
);

// Test save contents for ClogDeleted
RecoveryState::ClogDeleted.save(temp_dir).unwrap();
RecoveryState::ClogDeleted
.save(temp_dir, &crate::vfs::Dummy)
.unwrap();
let contents = read_to_string(&path).unwrap();
assert_eq!(contents, "ClogDeleted");

// Final clear to clean up
RecoveryState::clear(temp_dir).unwrap();
RecoveryState::clear(temp_dir, &crate::vfs::Dummy).unwrap();
assert!(!path.exists());
}

Expand Down
15 changes: 9 additions & 6 deletions src/storage/kv/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;

use super::reader::Reader;
use crate::storage::log::{write_field, Error as LogError, MultiSegmentReader, SegmentRef};
use crate::vfs::FileSystem;
use crate::{Error, Options, Result};

#[revisioned(revision = 1)]
Expand Down Expand Up @@ -81,13 +82,13 @@ impl Manifest {

// Load Vec<Manifest> from a dir
#[allow(unused)]
pub fn load_from_dir(path: &Path) -> Result<Self> {
pub fn load_from_dir<V: FileSystem>(path: &Path, vfs: &V) -> Result<Self> {
let mut manifests = Manifest::new();
if !path.exists() {
return Ok(manifests);
}

let sr = SegmentRef::read_segments_from_directory(path)?;
let sr = SegmentRef::read_segments_from_directory(path, vfs)?;
let reader = MultiSegmentReader::new(sr)?;
let mut reader = Reader::new_from(reader);

Expand Down Expand Up @@ -132,7 +133,8 @@ mod tests {
// Create a temporary directory
let temp_dir = create_temp_directory();
let opts = LogOptions::default();
let mut a = Aol::open(temp_dir.path(), &opts).expect("should create aol");
let mut a =
Aol::open(temp_dir.path(), &opts, &crate::vfs::Dummy).expect("should create aol");

let manifest = Manifest {
changes: vec![ManifestChangeType::Options(Options::default())],
Expand All @@ -144,7 +146,7 @@ mod tests {
a.close().expect("should close aol");

// Load the manifests from the file
let loaded_manifest = Manifest::load_from_dir(temp_dir.path()).unwrap();
let loaded_manifest = Manifest::load_from_dir(temp_dir.path(), &crate::vfs::Dummy).unwrap();

// Assert that the loaded manifests contain exactly one manifest
assert_eq!(loaded_manifest.changes.len(), 1);
Expand All @@ -155,7 +157,8 @@ mod tests {
// Step 1: Create a temporary directory
let temp_dir = create_temp_directory();
let log_opts = LogOptions::default();
let mut a = Aol::open(temp_dir.path(), &log_opts).expect("should create aol");
let mut a =
Aol::open(temp_dir.path(), &log_opts, &crate::vfs::Dummy).expect("should create aol");

// Step 2: Create the first Manifest instance and append it to the file
let first_manifest = Manifest {
Expand All @@ -181,7 +184,7 @@ mod tests {
a.close().expect("should close aol");

// Step 5: Load the manifests from the file
let loaded_manifest = Manifest::load_from_dir(temp_dir.path()).unwrap();
let loaded_manifest = Manifest::load_from_dir(temp_dir.path(), &crate::vfs::Dummy).unwrap();

// Step 6: Assert that the loaded manifests contain exactly two manifests
assert_eq!(loaded_manifest.changes.len(), 2);
Expand Down
Loading