Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions dash-spv/src/storage/filter_headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use crate::error::StorageResult;
use crate::storage::segments::SegmentCache;
use crate::storage::PersistentStorage;
use async_trait::async_trait;
use dashcore::hash_types::FilterHeader;
use std::ops::Range;
use std::path::PathBuf;
use tokio::sync::RwLock;

#[async_trait]
pub trait FilterHeaderStorage {
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()>;

async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>>;

async fn get_filter_header(&self, height: u32) -> StorageResult<Option<FilterHeader>> {
if let Some(tip_height) = self.get_filter_tip_height().await? {
if height > tip_height {
return Ok(None);
}
} else {
return Ok(None);
}

if let Some(start_height) = self.get_filter_start_height().await {
if height < start_height {
return Ok(None);
}
} else {
return Ok(None);
}

Ok(self.load_filter_headers(height..height + 1).await?.first().copied())
}

async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>>;

async fn get_filter_start_height(&self) -> Option<u32>;
}

pub struct PersistentFilterHeaderStorage {
filter_headers: RwLock<SegmentCache<FilterHeader>>,
}

impl PersistentFilterHeaderStorage {
const FOLDER_NAME: &str = "filter_headers";
}

#[async_trait]
impl PersistentStorage for PersistentFilterHeaderStorage {
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
let storage_path = storage_path.into();
let segments_folder = storage_path.join(Self::FOLDER_NAME);

let filter_headers = SegmentCache::load_or_new(segments_folder).await?;

Ok(Self {
filter_headers: RwLock::new(filter_headers),
})
}

async fn persist(&mut self, base_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
let filter_headers_folder = base_path.into().join(Self::FOLDER_NAME);

tokio::fs::create_dir_all(&filter_headers_folder).await?;

self.filter_headers.write().await.persist(&filter_headers_folder).await;
Ok(())
}
}

#[async_trait]
impl FilterHeaderStorage for PersistentFilterHeaderStorage {
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
self.filter_headers.write().await.store_items(headers).await
}

async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>> {
self.filter_headers.write().await.get_items(range).await
}

async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>> {
Ok(self.filter_headers.read().await.tip_height())
}

async fn get_filter_start_height(&self) -> Option<u32> {
self.filter_headers.read().await.start_height()
}
}
82 changes: 0 additions & 82 deletions dash-spv/src/storage/filters.rs
Original file line number Diff line number Diff line change
@@ -1,102 +1,20 @@
use std::{ops::Range, path::PathBuf};

use async_trait::async_trait;
use dashcore::hash_types::FilterHeader;
use tokio::sync::RwLock;

use crate::{
error::StorageResult,
storage::{segments::SegmentCache, PersistentStorage},
};

#[async_trait]
pub trait FilterHeaderStorage {
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()>;

async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>>;

async fn get_filter_header(&self, height: u32) -> StorageResult<Option<FilterHeader>> {
if let Some(tip_height) = self.get_filter_tip_height().await? {
if height > tip_height {
return Ok(None);
}
} else {
return Ok(None);
}

if let Some(start_height) = self.get_filter_start_height().await {
if height < start_height {
return Ok(None);
}
} else {
return Ok(None);
}

Ok(self.load_filter_headers(height..height + 1).await?.first().copied())
}

async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>>;

async fn get_filter_start_height(&self) -> Option<u32>;
}

#[async_trait]
pub trait FilterStorage {
async fn store_filter(&mut self, height: u32, filter: &[u8]) -> StorageResult<()>;

async fn load_filters(&self, range: Range<u32>) -> StorageResult<Vec<Vec<u8>>>;
}

pub struct PersistentFilterHeaderStorage {
filter_headers: RwLock<SegmentCache<FilterHeader>>,
}

impl PersistentFilterHeaderStorage {
const FOLDER_NAME: &str = "filter_headers";
}

#[async_trait]
impl PersistentStorage for PersistentFilterHeaderStorage {
async fn open(storage_path: impl Into<PathBuf> + Send) -> StorageResult<Self> {
let storage_path = storage_path.into();
let segments_folder = storage_path.join(Self::FOLDER_NAME);

let filter_headers = SegmentCache::load_or_new(segments_folder).await?;

Ok(Self {
filter_headers: RwLock::new(filter_headers),
})
}

async fn persist(&mut self, base_path: impl Into<PathBuf> + Send) -> StorageResult<()> {
let filter_headers_folder = base_path.into().join(Self::FOLDER_NAME);

tokio::fs::create_dir_all(&filter_headers_folder).await?;

self.filter_headers.write().await.persist(&filter_headers_folder).await;
Ok(())
}
}

#[async_trait]
impl FilterHeaderStorage for PersistentFilterHeaderStorage {
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
self.filter_headers.write().await.store_items(headers).await
}

async fn load_filter_headers(&self, range: Range<u32>) -> StorageResult<Vec<FilterHeader>> {
self.filter_headers.write().await.get_items(range).await
}

async fn get_filter_tip_height(&self) -> StorageResult<Option<u32>> {
Ok(self.filter_headers.read().await.tip_height())
}

async fn get_filter_start_height(&self) -> Option<u32> {
self.filter_headers.read().await.start_height()
}
}

pub struct PersistentFilterStorage {
filters: RwLock<SegmentCache<Vec<u8>>>,
}
Expand Down
8 changes: 5 additions & 3 deletions dash-spv/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod types;

mod blocks;
mod chainstate;
mod filter_headers;
mod filters;
mod io;
mod lockfile;
Expand All @@ -26,7 +27,8 @@ use tokio::sync::RwLock;
use crate::error::StorageResult;
use crate::storage::blocks::{BlockHeaderTip, PersistentBlockHeaderStorage};
use crate::storage::chainstate::PersistentChainStateStorage;
use crate::storage::filters::{PersistentFilterHeaderStorage, PersistentFilterStorage};
use crate::storage::filter_headers::PersistentFilterHeaderStorage;
use crate::storage::filters::PersistentFilterStorage;
use crate::storage::lockfile::LockFile;
use crate::storage::masternode::PersistentMasternodeStateStorage;
use crate::storage::metadata::PersistentMetadataStorage;
Expand All @@ -36,7 +38,7 @@ use crate::{ChainState, ClientConfig};

pub use crate::storage::blocks::BlockHeaderStorage;
pub use crate::storage::chainstate::ChainStateStorage;
pub use crate::storage::filters::FilterHeaderStorage;
pub use crate::storage::filter_headers::FilterHeaderStorage;
pub use crate::storage::filters::FilterStorage;
pub use crate::storage::masternode::MasternodeStateStorage;
pub use crate::storage::metadata::MetadataStorage;
Expand Down Expand Up @@ -295,7 +297,7 @@ impl blocks::BlockHeaderStorage for DiskStorageManager {
}

#[async_trait]
impl filters::FilterHeaderStorage for DiskStorageManager {
impl FilterHeaderStorage for DiskStorageManager {
async fn store_filter_headers(&mut self, headers: &[FilterHeader]) -> StorageResult<()> {
self.filter_headers.write().await.store_filter_headers(headers).await
}
Expand Down
Loading