Skip to content

Commit a53f74a

Browse files
committed
Add SqliteStore backend
We here add an `KVStore` implementation on SQLite, which becomes the new default for `Node::build`. The `FilesystemStore` is still configurable via `Node::build_with_fs_store`
1 parent ca28e17 commit a53f74a

File tree

6 files changed

+331
-61
lines changed

6 files changed

+331
-61
lines changed

src/io/fs_store.rs

Lines changed: 6 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
#[cfg(target_os = "windows")]
22
extern crate winapi;
33

4-
use super::KVStore;
4+
use super::*;
55

66
use std::collections::HashMap;
77
use std::fs;
88
use std::io::{BufReader, Read, Write};
99
use std::path::{Path, PathBuf};
10-
use std::str::FromStr;
1110
use std::sync::{Arc, Mutex, RwLock};
1211

1312
#[cfg(not(target_os = "windows"))]
@@ -45,7 +44,8 @@ pub struct FilesystemStore {
4544
}
4645

4746
impl FilesystemStore {
48-
pub(crate) fn new(dest_dir: PathBuf) -> Self {
47+
pub(crate) fn new(mut dest_dir: PathBuf) -> Self {
48+
dest_dir.push("fs_store");
4949
let locks = Mutex::new(HashMap::new());
5050
Self { dest_dir, locks }
5151
}
@@ -248,34 +248,15 @@ impl Read for FilesystemReader {
248248

249249
impl KVStorePersister for FilesystemStore {
250250
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
251-
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
252-
let msg = format!("Could not persist file for key {}.", prefixed_key);
253-
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
254-
})?;
255-
256-
let parent_directory = dest_file_path.parent().ok_or_else(|| {
257-
let msg = format!("Could not persist file for key {}.", prefixed_key);
258-
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
259-
})?;
260-
let namespace = parent_directory.display().to_string();
261-
262-
let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| {
263-
let msg = format!("Could not persist file for key {}.", prefixed_key);
264-
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
265-
})?;
266-
let key = dest_without_namespace.display().to_string();
267-
268-
self.write(&namespace, &key, &object.encode())?;
269-
Ok(())
251+
let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?;
252+
self.write(&namespace, &key, &object.encode())
270253
}
271254
}
272255

273256
#[cfg(test)]
274257
mod tests {
275258
use super::*;
276259
use crate::test::utils::random_storage_path;
277-
use lightning::util::persist::KVStorePersister;
278-
use lightning::util::ser::Readable;
279260

280261
use proptest::prelude::*;
281262
proptest! {
@@ -284,31 +265,8 @@ mod tests {
284265
let rand_dir = random_storage_path();
285266

286267
let fs_store = FilesystemStore::new(rand_dir.into());
287-
let namespace = "testspace";
288-
let key = "testkey";
289-
290-
// Test the basic KVStore operations.
291-
fs_store.write(namespace, key, &data).unwrap();
292-
293-
let listed_keys = fs_store.list(namespace).unwrap();
294-
assert_eq!(listed_keys.len(), 1);
295-
assert_eq!(listed_keys[0], "testkey");
296-
297-
let mut reader = fs_store.read(namespace, key).unwrap();
298-
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
299-
assert_eq!(data, read_data);
300-
301-
fs_store.remove(namespace, key).unwrap();
302-
303-
let listed_keys = fs_store.list(namespace).unwrap();
304-
assert_eq!(listed_keys.len(), 0);
305268

306-
// Test KVStorePersister
307-
let prefixed_key = format!("{}/{}", namespace, key);
308-
fs_store.persist(&prefixed_key, &data).unwrap();
309-
let mut reader = fs_store.read(namespace, key).unwrap();
310-
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
311-
assert_eq!(data, read_data);
269+
do_read_write_remove_list_persist(&data, &fs_store);
312270
}
313271
}
314272
}

src/io/mod.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
//! Objects and traits for data persistence.
22
33
pub(crate) mod fs_store;
4+
pub(crate) mod sqlite_store;
45
pub(crate) mod utils;
56

67
pub use fs_store::FilesystemStore;
8+
pub use sqlite_store::SqliteStore;
79

810
use lightning::util::persist::KVStorePersister;
911

1012
use std::io::Read;
13+
use std::path::PathBuf;
14+
use std::str::FromStr;
1115

1216
// The namespacs and keys LDK uses for persisting
1317
pub(crate) const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
@@ -72,3 +76,61 @@ pub trait KVStore: KVStorePersister {
7276
/// Will return an empty list if the `namespace` is unknown.
7377
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>>;
7478
}
79+
80+
fn get_namespace_and_key_from_prefixed(
81+
prefixed_key: &str,
82+
) -> lightning::io::Result<(String, String)> {
83+
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
84+
let msg = format!("Could not persist file for key {}.", prefixed_key);
85+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
86+
})?;
87+
88+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
89+
let msg = format!("Could not persist file for key {}.", prefixed_key);
90+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
91+
})?;
92+
let namespace = parent_directory.display().to_string();
93+
94+
let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| {
95+
let msg = format!("Could not persist file for key {}.", prefixed_key);
96+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
97+
})?;
98+
let key = dest_without_namespace.display().to_string();
99+
100+
Ok((namespace, key))
101+
}
102+
103+
#[cfg(test)]
104+
fn do_read_write_remove_list_persist<K: KVStore>(data: &[u8; 32], kv_store: &K) {
105+
use lightning::util::ser::Readable;
106+
107+
let namespace = "testspace";
108+
let key = "testkey";
109+
110+
// Test the basic KVStore operations.
111+
kv_store.write(namespace, key, data).unwrap();
112+
113+
// Test empty namespace is allowed, but not empty key.
114+
kv_store.write("", key, data).unwrap();
115+
assert!(kv_store.write(namespace, "", data).is_err());
116+
117+
let listed_keys = kv_store.list(namespace).unwrap();
118+
assert_eq!(listed_keys.len(), 1);
119+
assert_eq!(listed_keys[0], key);
120+
121+
let mut reader = kv_store.read(namespace, key).unwrap();
122+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
123+
assert_eq!(*data, read_data);
124+
125+
kv_store.remove(namespace, key).unwrap();
126+
127+
let listed_keys = kv_store.list(namespace).unwrap();
128+
assert_eq!(listed_keys.len(), 0);
129+
130+
// Test KVStorePersister
131+
let prefixed_key = format!("{}/{}", namespace, key);
132+
kv_store.persist(&prefixed_key, &data).unwrap();
133+
let mut reader = kv_store.read(namespace, key).unwrap();
134+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
135+
assert_eq!(*data, read_data);
136+
}

src/io/sqlite_store.rs

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
use super::*;
2+
3+
use lightning::util::persist::KVStorePersister;
4+
use lightning::util::ser::Writeable;
5+
6+
use rusqlite::{named_params, Connection};
7+
8+
use std::fs;
9+
use std::io::Cursor;
10+
use std::path::PathBuf;
11+
use std::sync::{Arc, Mutex};
12+
13+
// The database file name.
14+
const SQLITE_DB_FILE: &str = "ldk_node.sqlite";
15+
16+
// The table in which we store all data.
17+
const KV_TABLE_NAME: &str = "ldk_node_data";
18+
19+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
20+
const SCHEMA_USER_VERSION: u16 = 1;
21+
22+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
23+
///
24+
/// [SQLite]: https://sqlite.org
25+
pub struct SqliteStore {
26+
connection: Arc<Mutex<Connection>>,
27+
}
28+
29+
impl SqliteStore {
30+
pub(crate) fn new(dest_dir: PathBuf) -> Self {
31+
fs::create_dir_all(dest_dir.clone()).unwrap_or_else(|_| {
32+
panic!("Failed to create database destination directory: {}", dest_dir.display())
33+
});
34+
let mut db_file_path = dest_dir.clone();
35+
db_file_path.push(SQLITE_DB_FILE);
36+
37+
let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| {
38+
panic!("Failed to open/create database file: {}", db_file_path.display())
39+
});
40+
41+
connection
42+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
43+
Ok(())
44+
})
45+
.unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version"));
46+
47+
let sql = format!(
48+
"CREATE TABLE IF NOT EXISTS {} (
49+
namespace TEXT NOT NULL,
50+
key TEXT NOT NULL CHECK (key <> ''),
51+
value BLOB, PRIMARY KEY ( namespace, key )
52+
);",
53+
KV_TABLE_NAME
54+
);
55+
connection
56+
.execute(&sql, [])
57+
.unwrap_or_else(|_| panic!("Failed to create table: {}", KV_TABLE_NAME));
58+
59+
let connection = Arc::new(Mutex::new(connection));
60+
Self { connection }
61+
}
62+
}
63+
64+
impl KVStore for SqliteStore {
65+
type Reader = Cursor<Vec<u8>>;
66+
67+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
68+
let locked_conn = self.connection.lock().unwrap();
69+
let sql =
70+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
71+
72+
let res = locked_conn
73+
.query_row(
74+
&sql,
75+
named_params! {
76+
":namespace": namespace,
77+
":key": key,
78+
},
79+
|row| row.get(0),
80+
)
81+
.map_err(|e| match e {
82+
rusqlite::Error::QueryReturnedNoRows => {
83+
let msg =
84+
format!("Failed to read as key could not be found: {}/{}", namespace, key);
85+
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
86+
}
87+
e => {
88+
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
89+
std::io::Error::new(std::io::ErrorKind::Other, msg)
90+
}
91+
})?;
92+
Ok(Cursor::new(res))
93+
}
94+
95+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
96+
let locked_conn = self.connection.lock().unwrap();
97+
98+
let sql = format!(
99+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
100+
KV_TABLE_NAME
101+
);
102+
103+
locked_conn
104+
.execute(
105+
&sql,
106+
named_params! {
107+
":namespace": namespace,
108+
":key": key,
109+
":value": buf,
110+
},
111+
)
112+
.map(|_| ())
113+
.map_err(|e| {
114+
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
115+
std::io::Error::new(std::io::ErrorKind::Other, msg)
116+
})
117+
}
118+
119+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
120+
let locked_conn = self.connection.lock().unwrap();
121+
122+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
123+
let changes = locked_conn
124+
.execute(
125+
&sql,
126+
named_params! {
127+
":namespace": namespace,
128+
":key": key,
129+
},
130+
)
131+
.map_err(|e| {
132+
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
133+
std::io::Error::new(std::io::ErrorKind::Other, msg)
134+
})?;
135+
136+
let was_present = changes != 0;
137+
138+
Ok(was_present)
139+
}
140+
141+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
142+
let locked_conn = self.connection.lock().unwrap();
143+
144+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", KV_TABLE_NAME);
145+
let mut stmt = locked_conn.prepare(&sql).map_err(|e| {
146+
let msg = format!("Failed to prepare statement: {}", e);
147+
std::io::Error::new(std::io::ErrorKind::Other, msg)
148+
})?;
149+
150+
let mut keys = Vec::new();
151+
152+
let rows_iter = stmt
153+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
154+
.map_err(|e| {
155+
let msg = format!("Failed to retrieve queried rows: {}", e);
156+
std::io::Error::new(std::io::ErrorKind::Other, msg)
157+
})?;
158+
159+
for k in rows_iter {
160+
keys.push(k.map_err(|e| {
161+
let msg = format!("Failed to retrieve queried rows: {}", e);
162+
std::io::Error::new(std::io::ErrorKind::Other, msg)
163+
})?);
164+
}
165+
166+
Ok(keys)
167+
}
168+
}
169+
170+
impl KVStorePersister for SqliteStore {
171+
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
172+
let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?;
173+
self.write(&namespace, &key, &object.encode())
174+
}
175+
}
176+
177+
#[cfg(test)]
178+
mod tests {
179+
use super::*;
180+
use crate::test::utils::random_storage_path;
181+
182+
use proptest::prelude::*;
183+
proptest! {
184+
#[test]
185+
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
186+
let rand_dir = random_storage_path();
187+
let sqlite_store = SqliteStore::new(rand_dir.into());
188+
189+
do_read_write_remove_list_persist(&data, &sqlite_store);
190+
}
191+
}
192+
}

0 commit comments

Comments
 (0)