Skip to content

Commit 81ad668

Browse files
committed
Async FilesystemStore
1 parent c3a4c24 commit 81ad668

File tree

4 files changed

+218
-30
lines changed

4 files changed

+218
-30
lines changed

lightning-persister/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ rustdoc-args = ["--cfg", "docsrs"]
1717
bitcoin = "0.32.2"
1818
lightning = { version = "0.2.0", path = "../lightning" }
1919

20+
# TODO: Make conditional?
21+
tokio = { version = "1.35", features = [ "macros", "rt", "rt-multi-thread", "sync", "time", "fs", "io-util" ] }
22+
2023
[target.'cfg(windows)'.dependencies]
2124
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }
2225

lightning-persister/src/fs_store.rs

Lines changed: 59 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ pub struct FilesystemStore {
3838
data_dir: PathBuf,
3939
tmp_file_counter: AtomicUsize,
4040
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
41+
42+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
43+
// latest written version per key.
44+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<HashMap<String, u64>>>>>,
4245
}
4346

4447
impl FilesystemStore {
@@ -90,36 +93,12 @@ impl FilesystemStore {
9093

9194
Ok(dest_dir_path)
9295
}
93-
}
9496

95-
impl KVStoreSync for FilesystemStore {
96-
fn read(
97-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98-
) -> lightning::io::Result<Vec<u8>> {
99-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
100-
101-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
102-
dest_file_path.push(key);
103-
104-
let mut buf = Vec::new();
105-
{
106-
let inner_lock_ref = {
107-
let mut outer_lock = self.locks.lock().unwrap();
108-
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
109-
};
110-
let _guard = inner_lock_ref.read().unwrap();
111-
112-
let mut f = fs::File::open(dest_file_path)?;
113-
f.read_to_end(&mut buf)?;
114-
}
115-
116-
self.garbage_collect_locks();
117-
118-
Ok(buf)
119-
}
120-
121-
fn write(
97+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
98+
/// returns early without writing.
99+
pub(crate) fn write_version(
122100
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
101+
version: Option<u64>,
123102
) -> lightning::io::Result<()> {
124103
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125104

@@ -153,7 +132,24 @@ impl KVStoreSync for FilesystemStore {
153132
let mut outer_lock = self.locks.lock().unwrap();
154133
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155134
};
156-
let _guard = inner_lock_ref.write().unwrap();
135+
let mut guard = inner_lock_ref.write().unwrap();
136+
137+
// If a version is provided, we check if we already have a newer version written. This is used in async
138+
// contexts to realize eventual consistency.
139+
if let Some(version) = version {
140+
match guard.entry(key.to_string()) {
141+
std::collections::hash_map::Entry::Vacant(e) => {
142+
e.insert(version);
143+
},
144+
std::collections::hash_map::Entry::Occupied(mut e) => {
145+
if version <= *e.get() {
146+
// If the version is not greater, we don't write the file.
147+
return Ok(());
148+
}
149+
e.insert(version);
150+
},
151+
}
152+
}
157153

158154
#[cfg(not(target_os = "windows"))]
159155
{
@@ -204,6 +200,39 @@ impl KVStoreSync for FilesystemStore {
204200

205201
res
206202
}
203+
}
204+
205+
impl KVStoreSync for FilesystemStore {
206+
fn read(
207+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
208+
) -> lightning::io::Result<Vec<u8>> {
209+
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
210+
211+
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
212+
dest_file_path.push(key);
213+
214+
let mut buf = Vec::new();
215+
{
216+
let inner_lock_ref = {
217+
let mut outer_lock = self.locks.lock().unwrap();
218+
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
219+
};
220+
let _guard = inner_lock_ref.read().unwrap();
221+
222+
let mut f = fs::File::open(dest_file_path)?;
223+
f.read_to_end(&mut buf)?;
224+
}
225+
226+
self.garbage_collect_locks();
227+
228+
Ok(buf)
229+
}
230+
231+
fn write(
232+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
233+
) -> lightning::io::Result<()> {
234+
self.write_version(primary_namespace, secondary_namespace, key, buf, None)
235+
}
207236

208237
fn remove(
209238
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
//! Objects related to [`FilesystemStoreAsync`] live here.
2+
3+
use std::sync::{
4+
atomic::{AtomicU64, Ordering},
5+
Arc,
6+
};
7+
8+
use crate::fs_store::FilesystemStore;
9+
use core::future::Future;
10+
use core::pin::Pin;
11+
use lightning::util::persist::{KVStore, KVStoreSync};
12+
13+
/// An asynchronous extension of FilesystemStore, implementing the `KVStore` trait for async operations. It is shaped as
14+
/// a wrapper around an existing [`FilesystemStore`] so that the same locks are used. This allows both the sync and
15+
/// async interface to be used simultaneously.
16+
pub struct FilesystemStoreAsync {
17+
inner: Arc<FilesystemStore>,
18+
19+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
20+
// operations aren't sensitive to the order of execution.
21+
version_counter: AtomicU64,
22+
}
23+
24+
impl FilesystemStoreAsync {
25+
/// Creates a new instance of [`FilesystemStoreAsync`].
26+
pub fn new(inner: Arc<FilesystemStore>) -> Self {
27+
Self { inner, version_counter: AtomicU64::new(0) }
28+
}
29+
}
30+
31+
impl KVStore for FilesystemStoreAsync {
32+
fn read(
33+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
34+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
35+
let primary_namespace = primary_namespace.to_string();
36+
let secondary_namespace = secondary_namespace.to_string();
37+
let key = key.to_string();
38+
let this = Arc::clone(&self.inner);
39+
40+
Box::pin(async move {
41+
tokio::task::spawn_blocking(move || {
42+
this.read(&primary_namespace, &secondary_namespace, &key)
43+
})
44+
.await
45+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
46+
})
47+
}
48+
49+
fn write(
50+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
51+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
52+
let primary_namespace = primary_namespace.to_string();
53+
let secondary_namespace = secondary_namespace.to_string();
54+
let key = key.to_string();
55+
let buf = buf.to_vec();
56+
let this = Arc::clone(&self.inner);
57+
58+
// Obtain a version number to retain the call sequence.
59+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
60+
61+
Box::pin(async move {
62+
tokio::task::spawn_blocking(move || {
63+
this.write_version(
64+
&primary_namespace,
65+
&secondary_namespace,
66+
&key,
67+
&buf,
68+
Some(version),
69+
)
70+
})
71+
.await
72+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
73+
})
74+
}
75+
76+
fn remove(
77+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
78+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
79+
let primary_namespace = primary_namespace.to_string();
80+
let secondary_namespace = secondary_namespace.to_string();
81+
let key = key.to_string();
82+
let this = Arc::clone(&self.inner);
83+
84+
Box::pin(async move {
85+
tokio::task::spawn_blocking(move || {
86+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
87+
})
88+
.await
89+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
90+
})
91+
}
92+
93+
fn list(
94+
&self, primary_namespace: &str, secondary_namespace: &str,
95+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
96+
let primary_namespace = primary_namespace.to_string();
97+
let secondary_namespace = secondary_namespace.to_string();
98+
let this = Arc::clone(&self.inner);
99+
100+
Box::pin(async move {
101+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
102+
.await
103+
.unwrap_or_else(|e| {
104+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
105+
})
106+
})
107+
}
108+
}
109+
110+
mod test {
111+
use crate::{fs_store::FilesystemStore, fs_store_async::FilesystemStoreAsync};
112+
use lightning::util::persist::KVStore;
113+
use std::sync::Arc;
114+
115+
#[tokio::test]
116+
async fn read_write_remove_list_persist() {
117+
let mut temp_path = std::env::temp_dir();
118+
temp_path.push("test_read_write_remove_list_persist");
119+
let fs_store = Arc::new(FilesystemStore::new(temp_path));
120+
let fs_store_async = FilesystemStoreAsync::new(Arc::clone(&fs_store));
121+
122+
let data1 = [42u8; 32];
123+
let data2 = [43u8; 32];
124+
125+
let primary_namespace = "testspace";
126+
let secondary_namespace = "testsubspace";
127+
let key = "testkey";
128+
129+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
130+
// that eventual consistency works.
131+
let fut1 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data1);
132+
let fut2 = fs_store_async.write(primary_namespace, secondary_namespace, key, &data2);
133+
134+
fut2.await.unwrap();
135+
fut1.await.unwrap();
136+
137+
// Test list.
138+
let listed_keys =
139+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
140+
assert_eq!(listed_keys.len(), 1);
141+
assert_eq!(listed_keys[0], key);
142+
143+
// Test read. We expect to read data2, as the write call was initiated later.
144+
let read_data =
145+
fs_store_async.read(primary_namespace, secondary_namespace, key).await.unwrap();
146+
assert_eq!(data2, &*read_data);
147+
148+
// Test remove.
149+
fs_store_async.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
150+
151+
let listed_keys =
152+
fs_store_async.list(primary_namespace, secondary_namespace).await.unwrap();
153+
assert_eq!(listed_keys.len(), 0);
154+
}
155+
}

lightning-persister/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
extern crate criterion;
1010

1111
pub mod fs_store;
12+
pub mod fs_store_async;
1213

1314
mod utils;
1415

0 commit comments

Comments
 (0)