Skip to content

Commit 5cf438d

Browse files
committed
Add SqliteStore backend
We here add an `KVStore` implementation on SQLite, which becomes the new default for `Node::build`. The `FilesystemStore` is still configurable via `Node::build_with_fs_store`
1 parent 5d0abd7 commit 5cf438d

File tree

5 files changed

+304
-14
lines changed

5 files changed

+304
-14
lines changed

src/io/fs_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ pub struct FilesystemStore {
4545
}
4646

4747
impl FilesystemStore {
48-
pub(crate) fn new(dest_dir: PathBuf) -> Self {
48+
pub(crate) fn new(mut dest_dir: PathBuf) -> Self {
49+
dest_dir.push("fs_store");
4950
let locks = Mutex::new(HashMap::new());
5051
Self { dest_dir, locks }
5152
}

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
//! Objects and traits for data persistence.
22
33
pub(crate) mod fs_store;
4+
pub(crate) mod sqlite_store;
45
pub(crate) mod utils;
56

67
pub use fs_store::FilesystemStore;
8+
pub use sqlite_store::SqliteStore;
79

810
use lightning::util::persist::KVStorePersister;
911

src/io/sqlite_store.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use super::*;
2+
3+
use lightning::util::persist::KVStorePersister;
4+
use lightning::util::ser::Writeable;
5+
6+
use rusqlite::{named_params, Connection};
7+
8+
use std::fs;
9+
use std::io::Cursor;
10+
use std::path::PathBuf;
11+
use std::str::FromStr;
12+
use std::sync::{Arc, Mutex};
13+
14+
// The database file name.
15+
const SQLITE_DB_FILE: &str = "ldk_node.sqlite";
16+
17+
// The table in which we store all data.
18+
const KV_TABLE_NAME: &str = "ldk_node_data";
19+
20+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
21+
const SCHEMA_USER_VERSION: u16 = 1;
22+
23+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
24+
///
25+
/// [SQLIte]: https://sqlite.org
26+
pub struct SqliteStore {
27+
connection: Arc<Mutex<Connection>>,
28+
}
29+
30+
impl SqliteStore {
31+
pub(crate) fn new(dest_dir: PathBuf) -> Self {
32+
let msg =
33+
format!("Failed to create database destination directory: {}", dest_dir.display());
34+
fs::create_dir_all(dest_dir.clone()).expect(&msg);
35+
let mut db_file_path = dest_dir.clone();
36+
db_file_path.push(SQLITE_DB_FILE);
37+
38+
let msg = format!("Failed to open/create database file: {}", db_file_path.display());
39+
let connection = Connection::open(db_file_path).expect(&msg);
40+
41+
let msg = format!("Failed to set PRAGMA user_version");
42+
connection
43+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
44+
Ok(())
45+
})
46+
.expect(&msg);
47+
48+
let sql = format!(
49+
"CREATE TABLE IF NOT EXISTS {} (
50+
namespace TEXT NOT NULL,
51+
key TEXT NOT NULL,
52+
value BLOB, PRIMARY KEY ( namespace, key )
53+
);",
54+
KV_TABLE_NAME
55+
);
56+
let msg = format!("Failed to create table: {}", KV_TABLE_NAME);
57+
connection.execute(&sql, []).expect(&msg);
58+
59+
let connection = Arc::new(Mutex::new(connection));
60+
Self { connection }
61+
}
62+
}
63+
64+
impl KVStore for SqliteStore {
65+
type Reader = Cursor<Vec<u8>>;
66+
67+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
68+
let sql =
69+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
70+
71+
let msg = format!("Failed to read from key: {}/{}", namespace, key);
72+
let res = self
73+
.connection
74+
.lock()
75+
.unwrap()
76+
.query_row(
77+
&sql,
78+
named_params! {
79+
":namespace": namespace,
80+
":key": key,
81+
},
82+
|row| row.get(0),
83+
)
84+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::NotFound, msg))?;
85+
Ok(Cursor::new(res))
86+
}
87+
88+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
89+
let mut locked_conn = self.connection.lock().unwrap();
90+
91+
let msg = format!("Failed to start transaction");
92+
let sql_tx = locked_conn
93+
.transaction()
94+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
95+
96+
let sql = format!(
97+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :data);",
98+
KV_TABLE_NAME
99+
);
100+
let msg = format!("Failed to write to key: {}/{}", namespace, key);
101+
sql_tx
102+
.execute(
103+
&sql,
104+
named_params! {
105+
":namespace": namespace,
106+
":key": key,
107+
":data": buf,
108+
},
109+
)
110+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
111+
112+
let msg = format!("Failed to commit transaction");
113+
sql_tx.commit().map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
114+
115+
Ok(())
116+
}
117+
118+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
119+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
120+
let msg = format!("Failed to delete key: {}/{}", namespace, key);
121+
let changes = self
122+
.connection
123+
.lock()
124+
.unwrap()
125+
.execute(
126+
&sql,
127+
named_params! {
128+
":namespace": namespace,
129+
":key": key,
130+
},
131+
)
132+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
133+
134+
let was_present = changes != 0;
135+
136+
Ok(was_present)
137+
}
138+
139+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
140+
let locked_conn = self.connection.lock().unwrap();
141+
142+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", KV_TABLE_NAME);
143+
let msg = format!("Failed to prepare statement");
144+
let mut stmt = locked_conn
145+
.prepare(&sql)
146+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
147+
148+
let mut keys = Vec::new();
149+
150+
let msg = format!("Failed to retrieve queried rows");
151+
let rows_iter = stmt
152+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
153+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
154+
155+
for k in rows_iter {
156+
let msg = format!("Failed to retrieve queried rows");
157+
keys.push(k.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?);
158+
}
159+
160+
Ok(keys)
161+
}
162+
}
163+
164+
impl KVStorePersister for SqliteStore {
165+
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
166+
let msg = format!("Could not persist file for key {}.", prefixed_key);
167+
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
168+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone())
169+
})?;
170+
171+
let parent_directory = dest_file_path.parent().ok_or(lightning::io::Error::new(
172+
lightning::io::ErrorKind::InvalidInput,
173+
msg.clone(),
174+
))?;
175+
let namespace = parent_directory.display().to_string();
176+
177+
let dest_without_namespace = dest_file_path
178+
.strip_prefix(&namespace)
179+
.map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
180+
let key = dest_without_namespace.display().to_string();
181+
182+
self.write(&namespace, &key, &object.encode())?;
183+
Ok(())
184+
}
185+
}
186+
187+
#[cfg(test)]
188+
mod tests {
189+
use super::*;
190+
use crate::test::utils::random_storage_path;
191+
use lightning::util::persist::KVStorePersister;
192+
use lightning::util::ser::Readable;
193+
194+
use proptest::prelude::*;
195+
proptest! {
196+
#[test]
197+
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
198+
let rand_dir = random_storage_path();
199+
200+
let sqlite_store = SqliteStore::new(rand_dir.into());
201+
let namespace = "testspace";
202+
let key = "testkey";
203+
204+
// Test the basic KVStore operations.
205+
sqlite_store.write(namespace, key, &data).unwrap();
206+
207+
let listed_keys = sqlite_store.list(namespace).unwrap();
208+
assert_eq!(listed_keys.len(), 1);
209+
assert_eq!(listed_keys[0], "testkey");
210+
211+
let mut reader = sqlite_store.read(namespace, key).unwrap();
212+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
213+
assert_eq!(data, read_data);
214+
215+
sqlite_store.remove(namespace, key).unwrap();
216+
217+
let listed_keys = sqlite_store.list(namespace).unwrap();
218+
assert_eq!(listed_keys.len(), 0);
219+
220+
// Test KVStorePersister
221+
let prefixed_key = format!("{}/{}", namespace, key);
222+
sqlite_store.persist(&prefixed_key, &data).unwrap();
223+
let mut reader = sqlite_store.read(namespace, key).unwrap();
224+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
225+
assert_eq!(data, read_data);
226+
}
227+
}
228+
}

src/lib.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ pub use event::Event;
100100
use event::{EventHandler, EventQueue};
101101
use gossip::GossipSource;
102102
use io::fs_store::FilesystemStore;
103+
use io::sqlite_store::SqliteStore;
103104
use io::{KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE};
104105
use payment_store::PaymentStore;
105106
pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
@@ -312,11 +313,21 @@ impl Builder {
312313
self
313314
}
314315

316+
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
317+
/// previously configured.
318+
pub fn build(&self) -> Arc<Node<SqliteStore>> {
319+
let storage_dir = self.config.storage_dir_path.clone();
320+
fs::create_dir_all(storage_dir.clone()).expect("Failed to create LDK data directory");
321+
let kv_store = Arc::new(SqliteStore::new(storage_dir.into()).into());
322+
self.build_with_store(kv_store)
323+
}
324+
315325
/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
316326
/// previously configured.
317-
pub fn build(&self) -> Arc<Node<FilesystemStore>> {
318-
let ldk_data_dir = format!("{}/ldk", self.config.storage_dir_path);
319-
let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into()));
327+
pub fn build_with_fs_store(&self) -> Arc<Node<FilesystemStore>> {
328+
let storage_dir = self.config.storage_dir_path.clone();
329+
fs::create_dir_all(storage_dir.clone()).expect("Failed to create LDK data directory");
330+
let kv_store = Arc::new(FilesystemStore::new(storage_dir.into()));
320331
self.build_with_store(kv_store)
321332
}
322333

@@ -326,12 +337,6 @@ impl Builder {
326337
) -> Arc<Node<K>> {
327338
let config = Arc::new(self.config.clone());
328339

329-
let ldk_data_dir = format!("{}/ldk", config.storage_dir_path);
330-
fs::create_dir_all(ldk_data_dir.clone()).expect("Failed to create LDK data directory");
331-
332-
let bdk_data_dir = format!("{}/bdk", config.storage_dir_path);
333-
fs::create_dir_all(bdk_data_dir.clone()).expect("Failed to create BDK data directory");
334-
335340
// Initialize the Logger
336341
let log_file_path = format!("{}/ldk_node.log", config.storage_dir_path);
337342
let logger = Arc::new(FilesystemLogger::new(log_file_path));
@@ -366,7 +371,8 @@ impl Builder {
366371
)
367372
.expect("Failed to derive on-chain wallet name");
368373

369-
let database_path = format!("{}/{}.sqlite", bdk_data_dir, wallet_name);
374+
let database_path =
375+
format!("{}/bdk_wallet_{}.sqlite", config.storage_dir_path, wallet_name);
370376
let database = SqliteDatabase::new(database_path);
371377

372378
let bdk_wallet = bdk::Wallet::new(
@@ -465,8 +471,12 @@ impl Builder {
465471
) {
466472
Ok(monitors) => monitors,
467473
Err(e) => {
468-
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
469-
panic!("Failed to read channel monitors: {}", e.to_string());
474+
if e.kind() == std::io::ErrorKind::NotFound {
475+
Vec::new()
476+
} else {
477+
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
478+
panic!("Failed to read channel monitors: {}", e.to_string());
479+
}
470480
}
471481
};
472482

@@ -663,7 +673,7 @@ impl Builder {
663673

664674
/// This type alias is required as Uniffi doesn't support generics, i.e., we can only expose the
665675
/// concretized types via this aliasing hack.
666-
type LDKNode = Node<FilesystemStore>;
676+
type LDKNode = Node<SqliteStore>;
667677

668678
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
669679
///

src/test/functional_tests.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,55 @@ fn start_stop_reinit() {
312312
reinitialized_node.stop().unwrap();
313313
}
314314

315+
#[test]
316+
fn start_stop_reinit_fs_store() {
317+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
318+
let esplora_url = electrsd.esplora_url.as_ref().unwrap();
319+
let config = random_config(&esplora_url);
320+
let node = Builder::from_config(config.clone()).build_with_fs_store();
321+
let expected_node_id = node.node_id();
322+
323+
let funding_address = node.new_funding_address().unwrap();
324+
let expected_amount = Amount::from_sat(100000);
325+
326+
premine_and_distribute_funds(&bitcoind, &electrsd, vec![funding_address], expected_amount);
327+
assert_eq!(node.onchain_balance().unwrap().get_total(), 0);
328+
329+
node.start().unwrap();
330+
assert_eq!(node.start(), Err(Error::AlreadyRunning));
331+
332+
node.sync_wallets().unwrap();
333+
assert_eq!(node.onchain_balance().unwrap().get_spendable(), expected_amount.to_sat());
334+
335+
node.stop().unwrap();
336+
assert_eq!(node.stop(), Err(Error::NotRunning));
337+
338+
node.start().unwrap();
339+
assert_eq!(node.start(), Err(Error::AlreadyRunning));
340+
341+
node.stop().unwrap();
342+
assert_eq!(node.stop(), Err(Error::NotRunning));
343+
drop(node);
344+
345+
let reinitialized_node = Builder::from_config(config).build_with_fs_store();
346+
assert_eq!(reinitialized_node.node_id(), expected_node_id);
347+
348+
reinitialized_node.start().unwrap();
349+
350+
assert_eq!(
351+
reinitialized_node.onchain_balance().unwrap().get_spendable(),
352+
expected_amount.to_sat()
353+
);
354+
355+
reinitialized_node.sync_wallets().unwrap();
356+
assert_eq!(
357+
reinitialized_node.onchain_balance().unwrap().get_spendable(),
358+
expected_amount.to_sat()
359+
);
360+
361+
reinitialized_node.stop().unwrap();
362+
}
363+
315364
#[test]
316365
fn onchain_spend_receive() {
317366
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();

0 commit comments

Comments
 (0)