Skip to content

Commit 97d6b3f

Browse files
committed
Async FilesystemStore
1 parent 1809349 commit 97d6b3f

File tree

4 files changed

+211
-54
lines changed

4 files changed

+211
-54
lines changed

lightning-persister/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.32.2"
1818
lightning = { version = "0.2.0", path = "../lightning" }
1919

20+
# TODO: Make conditional?
21+
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time", "fs", "io-util" ] }
22+
2023
[target.'cfg(windows)'.dependencies]
2124
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2225

lightning-persister/src/fs_store.rs

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -30,43 +30,29 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3030
path.as_ref().encode_wide().chain(Some(0)).collect()
3131
}
3232

33-
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34-
const GC_LOCK_INTERVAL: usize = 25;
35-
3633
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
3734
pub struct FilesystemStore {
3835
data_dir: PathBuf,
3936
tmp_file_counter: AtomicUsize,
40-
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
37+
38+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
39+
// latest written version per key.
40+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
4241
}
4342

4443
impl FilesystemStore {
4544
/// Constructs a new [`FilesystemStore`].
4645
pub fn new(data_dir: PathBuf) -> Self {
4746
let locks = Mutex::new(HashMap::new());
4847
let tmp_file_counter = AtomicUsize::new(0);
49-
let gc_counter = AtomicUsize::new(1);
50-
Self { data_dir, tmp_file_counter, gc_counter, locks }
48+
Self { data_dir, tmp_file_counter, locks }
5149
}
5250

5351
/// Returns the data directory.
5452
pub fn get_data_dir(&self) -> PathBuf {
5553
self.data_dir.clone()
5654
}
5755

58-
fn garbage_collect_locks(&self) {
59-
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
60-
61-
if gc_counter % GC_LOCK_INTERVAL == 0 {
62-
// Take outer lock for the cleanup.
63-
let mut outer_lock = self.locks.lock().unwrap();
64-
65-
// Garbage collect all lock entries that are not referenced anymore.
66-
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67-
}
68-
}
69-
7056
fn get_dest_dir_path(
7157
&self, primary_namespace: &str, secondary_namespace: &str,
7258
) -> std::io::Result<PathBuf> {
@@ -90,36 +76,12 @@ impl FilesystemStore {
9076

9177
Ok(dest_dir_path)
9278
}
93-
}
94-
95-
impl KVStoreSync for FilesystemStore {
96-
fn read(
97-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98-
) -> lightning::io::Result<Vec<u8>> {
99-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
100-
101-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
102-
dest_file_path.push(key);
103-
104-
let mut buf = Vec::new();
105-
{
106-
let inner_lock_ref = {
107-
let mut outer_lock = self.locks.lock().unwrap();
108-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
109-
};
110-
let _guard = inner_lock_ref.read().unwrap();
111-
112-
let mut f = fs::File::open(dest_file_path)?;
113-
f.read_to_end(&mut buf)?;
114-
}
115-
116-
self.garbage_collect_locks();
117-
118-
Ok(buf)
119-
}
12079

121-
fn write(
80+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
81+
/// returns early without writing.
82+
pub(crate) fn write_version(
12283
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
84+
version: Option<u64>,
12385
) -> lightning::io::Result<()> {
12486
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
12587

@@ -153,7 +115,18 @@ impl KVStoreSync for FilesystemStore {
153115
let mut outer_lock = self.locks.lock().unwrap();
154116
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155117
};
156-
let _guard = inner_lock_ref.write().unwrap();
118+
let mut last_written_version = inner_lock_ref.write().unwrap();
119+
120+
// If a version is provided, we check if we already have a newer version written. This is used in async
121+
// contexts to realize eventual consistency.
122+
if let Some(version) = version {
123+
if version <= *last_written_version {
124+
// If the version is not greater, we don't write the file.
125+
return Ok(());
126+
}
127+
128+
*last_written_version = version;
129+
}
157130

158131
#[cfg(not(target_os = "windows"))]
159132
{
@@ -200,10 +173,39 @@ impl KVStoreSync for FilesystemStore {
200173
}
201174
};
202175

203-
self.garbage_collect_locks();
204-
205176
res
206177
}
178+
}
179+
180+
impl KVStoreSync for FilesystemStore {
181+
fn read(
182+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
183+
) -> lightning::io::Result<Vec<u8>> {
184+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
185+
186+
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
187+
dest_file_path.push(key);
188+
189+
let mut buf = Vec::new();
190+
{
191+
let inner_lock_ref = {
192+
let mut outer_lock = self.locks.lock().unwrap();
193+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
194+
};
195+
let _guard = inner_lock_ref.read().unwrap();
196+
197+
let mut f = fs::File::open(dest_file_path)?;
198+
f.read_to_end(&mut buf)?;
199+
}
200+
201+
Ok(buf)
202+
}
203+
204+
fn write(
205+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
206+
) -> lightning::io::Result<()> {
207+
self.write_version(primary_namespace, secondary_namespace, key, buf, None)
208+
}
207209

208210
fn remove(
209211
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
@@ -295,8 +297,6 @@ impl KVStoreSync for FilesystemStore {
295297
}
296298
}
297299

298-
self.garbage_collect_locks();
299-
300300
Ok(())
301301
}
302302

@@ -325,8 +325,6 @@ impl KVStoreSync for FilesystemStore {
325325
keys.push(key);
326326
}
327327

328-
self.garbage_collect_locks();
329-
330328
Ok(keys)
331329
}
332330
}
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//! Objects related to [`FilesystemStoreAsync`] live here.
2+
3+
use std::sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
};
7+
8+
use crate::fs_store::FilesystemStore;
9+
use core::future::Future;
10+
use core::pin::Pin;
11+
use lightning::util::persist::{KVStore, KVStoreSync};
12+
13+
/// An asynchronous extension of FilesystemStore, implementing the `KVStore` trait for async operations. It is shaped as
14+
/// a wrapper around an existing [`FilesystemStore`] so that the same locks are used. This allows both the sync and
15+
/// async interface to be used simultaneously.
16+
pub struct FilesystemStoreAsync {
17+
inner: Arc<FilesystemStore>,
18+
19+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
20+
// operations aren't sensitive to the order of execution.
21+
version_counter: AtomicU64,
22+
}
23+
24+
impl FilesystemStoreAsync {
25+
/// Creates a new instance of [`FilesystemStoreAsync`].
26+
pub fn new(inner: Arc<FilesystemStore>) -> Self {
27+
Self { inner, version_counter: AtomicU64::new(0) }
28+
}
29+
}
30+
31+
impl KVStore for FilesystemStoreAsync {
32+
fn read(
33+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
34+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
35+
let primary_namespace = primary_namespace.to_string();
36+
let secondary_namespace = secondary_namespace.to_string();
37+
let key = key.to_string();
38+
let this = Arc::clone(&self.inner);
39+
40+
Box::pin(async move {
41+
tokio::task::spawn_blocking(move || {
42+
this.read(&primary_namespace, &secondary_namespace, &key)
43+
})
44+
.await
45+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
46+
})
47+
}
48+
49+
fn write(
50+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
51+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
52+
let primary_namespace = primary_namespace.to_string();
53+
let secondary_namespace = secondary_namespace.to_string();
54+
let key = key.to_string();
55+
let buf = buf.to_vec();
56+
let this = Arc::clone(&self.inner);
57+
58+
// Obtain a version number to retain the call sequence.
59+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
60+
61+
Box::pin(async move {
62+
tokio::task::spawn_blocking(move || {
63+
this.write_version(
64+
&primary_namespace,
65+
&secondary_namespace,
66+
&key,
67+
&buf,
68+
Some(version),
69+
)
70+
})
71+
.await
72+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
73+
})
74+
}
75+
76+
fn remove(
77+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
78+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
79+
let primary_namespace = primary_namespace.to_string();
80+
let secondary_namespace = secondary_namespace.to_string();
81+
let key = key.to_string();
82+
let this = Arc::clone(&self.inner);
83+
84+
Box::pin(async move {
85+
tokio::task::spawn_blocking(move || {
86+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
87+
})
88+
.await
89+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
90+
})
91+
}
92+
93+
fn list(
94+
&self, primary_namespace: &str, secondary_namespace: &str,
95+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
96+
let primary_namespace = primary_namespace.to_string();
97+
let secondary_namespace = secondary_namespace.to_string();
98+
let this = Arc::clone(&self.inner);
99+
100+
Box::pin(async move {
101+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
102+
.await
103+
.unwrap_or_else(|e| {
104+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
105+
})
106+
})
107+
}
108+
}
109+
110+
mod test {
111+
use crate::{fs_store::FilesystemStore, fs_store_async::FilesystemStoreAsync};
112+
use lightning::util::persist::KVStore;
113+
use std::sync::Arc;
114+
115+
#[tokio::test]
116+
async fn read_write_remove_list_persist() {
117+
let mut temp_path = std::env::temp_dir();
118+
temp_path.push("test_read_write_remove_list_persist");
119+
let fs_store = Arc::new(FilesystemStore::new(temp_path));
120+
let fs_store_async = FilesystemStoreAsync::new(Arc::clone(&fs_store));
121+
122+
let data1 = [42u8; 32];
123+
let data2 = [43u8; 32];
124+
125+
let primary_namespace = "testspace";
126+
let secondary_namespace = "testsubspace";
127+
let key = "testkey";
128+
129+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
130+
// that eventual consistency works.
131+
let fut1 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data1);
132+
let fut2 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data2);
133+
134+
fut2.await.unwrap();
135+
fut1.await.unwrap();
136+
137+
// Test list.
138+
let listed_keys =
139+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
140+
assert_eq!(listed_keys.len(), 1);
141+
assert_eq!(listed_keys[0], key);
142+
143+
// Test read. We expect to read data2, as the write call was initiated later.
144+
let read_data =
145+
fs_store_async.read(primary_namespace, secondary_namespace, key).await.unwrap();
146+
assert_eq!(data2, &*read_data);
147+
148+
// Test remove.
149+
fs_store_async.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
150+
151+
let listed_keys =
152+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
153+
assert_eq!(listed_keys.len(), 0);
154+
}
155+
}

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
extern crate criterion;
1010

1111
pub mod fs_store;
12+
pub mod fs_store_async;
1213

1314
mod utils;
1415

0 commit comments

Comments
 (0)