Skip to content

Commit 309881f

Browse files
committed
Add SqliteStore backend
We here add an `KVStore` implementation on SQLite, which becomes the new default for `Node::build`. The `FilesystemStore` is still configurable via `Node::build_with_fs_store`
1 parent 98b4bc1 commit 309881f

File tree

5 files changed

+304
-14
lines changed

5 files changed

+304
-14
lines changed

src/io/fs_store.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ pub struct FilesystemStore {
4545
}
4646

4747
impl FilesystemStore {
48-
pub(crate) fn new(dest_dir: PathBuf) -> Self {
48+
pub(crate) fn new(mut dest_dir: PathBuf) -> Self {
49+
dest_dir.push("fs_store");
4950
let locks = Mutex::new(HashMap::new());
5051
Self { dest_dir, locks }
5152
}

src/io/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
//! Objects and traits for data persistence.
22
33
pub(crate) mod fs_store;
4+
pub(crate) mod sqlite_store;
45
pub(crate) mod utils;
56

67
pub use fs_store::FilesystemStore;
8+
pub use sqlite_store::SqliteStore;
79

810
use lightning::util::persist::KVStorePersister;
911

src/io/sqlite_store.rs

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
use super::*;
2+
3+
use lightning::util::persist::KVStorePersister;
4+
use lightning::util::ser::Writeable;
5+
6+
use rusqlite::{named_params, Connection};
7+
8+
use std::fs;
9+
use std::io::Cursor;
10+
use std::path::PathBuf;
11+
use std::str::FromStr;
12+
use std::sync::{Arc, Mutex};
13+
14+
// The database file name.
15+
const SQLITE_DB_FILE: &str = "ldk_node.sqlite";
16+
17+
// The table in which we store all data.
18+
const KV_TABLE_NAME: &str = "ldk_node_data";
19+
20+
// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
21+
const SCHEMA_USER_VERSION: u16 = 1;
22+
23+
/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
24+
///
25+
/// [SQLite]: https://sqlite.org
26+
pub struct SqliteStore {
27+
connection: Arc<Mutex<Connection>>,
28+
}
29+
30+
impl SqliteStore {
31+
pub(crate) fn new(dest_dir: PathBuf) -> Self {
32+
let msg =
33+
format!("Failed to create database destination directory: {}", dest_dir.display());
34+
fs::create_dir_all(dest_dir.clone()).expect(&msg);
35+
let mut db_file_path = dest_dir.clone();
36+
db_file_path.push(SQLITE_DB_FILE);
37+
38+
let msg = format!("Failed to open/create database file: {}", db_file_path.display());
39+
let connection = Connection::open(db_file_path).expect(&msg);
40+
41+
let msg = format!("Failed to set PRAGMA user_version");
42+
connection
43+
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
44+
Ok(())
45+
})
46+
.expect(&msg);
47+
48+
let sql = format!(
49+
"CREATE TABLE IF NOT EXISTS {} (
50+
namespace TEXT NOT NULL,
51+
key TEXT NOT NULL,
52+
value BLOB, PRIMARY KEY ( namespace, key )
53+
);",
54+
KV_TABLE_NAME
55+
);
56+
let msg = format!("Failed to create table: {}", KV_TABLE_NAME);
57+
connection.execute(&sql, []).expect(&msg);
58+
59+
let connection = Arc::new(Mutex::new(connection));
60+
Self { connection }
61+
}
62+
}
63+
64+
impl KVStore for SqliteStore {
65+
type Reader = Cursor<Vec<u8>>;
66+
67+
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
68+
let sql =
69+
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
70+
71+
let msg = format!("Failed to read from key: {}/{}", namespace, key);
72+
let res = self
73+
.connection
74+
.lock()
75+
.unwrap()
76+
.query_row(
77+
&sql,
78+
named_params! {
79+
":namespace": namespace,
80+
":key": key,
81+
},
82+
|row| row.get(0),
83+
)
84+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::NotFound, msg))?;
85+
Ok(Cursor::new(res))
86+
}
87+
88+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
89+
let mut locked_conn = self.connection.lock().unwrap();
90+
91+
let msg = format!("Failed to start transaction");
92+
let sql_tx = locked_conn
93+
.transaction()
94+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
95+
96+
let sql = format!(
97+
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :data);",
98+
KV_TABLE_NAME
99+
);
100+
let msg = format!("Failed to write to key: {}/{}", namespace, key);
101+
sql_tx
102+
.execute(
103+
&sql,
104+
named_params! {
105+
":namespace": namespace,
106+
":key": key,
107+
":data": buf,
108+
},
109+
)
110+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
111+
112+
let msg = format!("Failed to commit transaction");
113+
sql_tx.commit().map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
114+
115+
Ok(())
116+
}
117+
118+
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
119+
let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
120+
let msg = format!("Failed to delete key: {}/{}", namespace, key);
121+
let changes = self
122+
.connection
123+
.lock()
124+
.unwrap()
125+
.execute(
126+
&sql,
127+
named_params! {
128+
":namespace": namespace,
129+
":key": key,
130+
},
131+
)
132+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
133+
134+
let was_present = changes != 0;
135+
136+
Ok(was_present)
137+
}
138+
139+
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
140+
let locked_conn = self.connection.lock().unwrap();
141+
142+
let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", KV_TABLE_NAME);
143+
let msg = format!("Failed to prepare statement");
144+
let mut stmt = locked_conn
145+
.prepare(&sql)
146+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
147+
148+
let mut keys = Vec::new();
149+
150+
let msg = format!("Failed to retrieve queried rows");
151+
let rows_iter = stmt
152+
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
153+
.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?;
154+
155+
for k in rows_iter {
156+
let msg = format!("Failed to retrieve queried rows");
157+
keys.push(k.map_err(|_| std::io::Error::new(std::io::ErrorKind::Other, msg))?);
158+
}
159+
160+
Ok(keys)
161+
}
162+
}
163+
164+
impl KVStorePersister for SqliteStore {
165+
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
166+
let msg = format!("Could not persist file for key {}.", prefixed_key);
167+
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
168+
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone())
169+
})?;
170+
171+
let parent_directory = dest_file_path.parent().ok_or(lightning::io::Error::new(
172+
lightning::io::ErrorKind::InvalidInput,
173+
msg.clone(),
174+
))?;
175+
let namespace = parent_directory.display().to_string();
176+
177+
let dest_without_namespace = dest_file_path
178+
.strip_prefix(&namespace)
179+
.map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
180+
let key = dest_without_namespace.display().to_string();
181+
182+
self.write(&namespace, &key, &object.encode())?;
183+
Ok(())
184+
}
185+
}
186+
187+
#[cfg(test)]
188+
mod tests {
189+
use super::*;
190+
use crate::test::utils::random_storage_path;
191+
use lightning::util::persist::KVStorePersister;
192+
use lightning::util::ser::Readable;
193+
194+
use proptest::prelude::*;
195+
proptest! {
196+
#[test]
197+
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
198+
let rand_dir = random_storage_path();
199+
200+
let sqlite_store = SqliteStore::new(rand_dir.into());
201+
let namespace = "testspace";
202+
let key = "testkey";
203+
204+
// Test the basic KVStore operations.
205+
sqlite_store.write(namespace, key, &data).unwrap();
206+
207+
let listed_keys = sqlite_store.list(namespace).unwrap();
208+
assert_eq!(listed_keys.len(), 1);
209+
assert_eq!(listed_keys[0], "testkey");
210+
211+
let mut reader = sqlite_store.read(namespace, key).unwrap();
212+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
213+
assert_eq!(data, read_data);
214+
215+
sqlite_store.remove(namespace, key).unwrap();
216+
217+
let listed_keys = sqlite_store.list(namespace).unwrap();
218+
assert_eq!(listed_keys.len(), 0);
219+
220+
// Test KVStorePersister
221+
let prefixed_key = format!("{}/{}", namespace, key);
222+
sqlite_store.persist(&prefixed_key, &data).unwrap();
223+
let mut reader = sqlite_store.read(namespace, key).unwrap();
224+
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
225+
assert_eq!(data, read_data);
226+
}
227+
}
228+
}

src/lib.rs

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub use types::NetAddress;
102102
use event::{EventHandler, EventQueue};
103103
use gossip::GossipSource;
104104
use io::fs_store::FilesystemStore;
105+
use io::sqlite_store::SqliteStore;
105106
use io::{KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_NAMESPACE};
106107
use payment_store::PaymentStore;
107108
pub use payment_store::{PaymentDetails, PaymentDirection, PaymentStatus};
@@ -317,11 +318,21 @@ impl Builder {
317318
self
318319
}
319320

321+
/// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options
322+
/// previously configured.
323+
pub fn build(&self) -> Arc<Node<SqliteStore>> {
324+
let storage_dir = self.config.storage_dir_path.clone();
325+
fs::create_dir_all(storage_dir.clone()).expect("Failed to create LDK data directory");
326+
let kv_store = Arc::new(SqliteStore::new(storage_dir.into()).into());
327+
self.build_with_store(kv_store)
328+
}
329+
320330
/// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options
321331
/// previously configured.
322-
pub fn build(&self) -> Arc<Node<FilesystemStore>> {
323-
let ldk_data_dir = format!("{}/ldk", self.config.storage_dir_path);
324-
let kv_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into()));
332+
pub fn build_with_fs_store(&self) -> Arc<Node<FilesystemStore>> {
333+
let storage_dir = self.config.storage_dir_path.clone();
334+
fs::create_dir_all(storage_dir.clone()).expect("Failed to create LDK data directory");
335+
let kv_store = Arc::new(FilesystemStore::new(storage_dir.into()));
325336
self.build_with_store(kv_store)
326337
}
327338

@@ -331,12 +342,6 @@ impl Builder {
331342
) -> Arc<Node<K>> {
332343
let config = Arc::new(self.config.clone());
333344

334-
let ldk_data_dir = format!("{}/ldk", config.storage_dir_path);
335-
fs::create_dir_all(ldk_data_dir.clone()).expect("Failed to create LDK data directory");
336-
337-
let bdk_data_dir = format!("{}/bdk", config.storage_dir_path);
338-
fs::create_dir_all(bdk_data_dir.clone()).expect("Failed to create BDK data directory");
339-
340345
// Initialize the Logger
341346
let log_file_path = format!("{}/ldk_node.log", config.storage_dir_path);
342347
let logger = Arc::new(FilesystemLogger::new(log_file_path));
@@ -371,7 +376,8 @@ impl Builder {
371376
)
372377
.expect("Failed to derive on-chain wallet name");
373378

374-
let database_path = format!("{}/{}.sqlite", bdk_data_dir, wallet_name);
379+
let database_path =
380+
format!("{}/bdk_wallet_{}.sqlite", config.storage_dir_path, wallet_name);
375381
let database = SqliteDatabase::new(database_path);
376382

377383
let bdk_wallet = bdk::Wallet::new(
@@ -470,8 +476,12 @@ impl Builder {
470476
) {
471477
Ok(monitors) => monitors,
472478
Err(e) => {
473-
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
474-
panic!("Failed to read channel monitors: {}", e.to_string());
479+
if e.kind() == std::io::ErrorKind::NotFound {
480+
Vec::new()
481+
} else {
482+
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
483+
panic!("Failed to read channel monitors: {}", e.to_string());
484+
}
475485
}
476486
};
477487

@@ -669,7 +679,7 @@ impl Builder {
669679

670680
/// This type alias is required as Uniffi doesn't support generics, i.e., we can only expose the
671681
/// concretized types via this aliasing hack.
672-
type LDKNode = Node<FilesystemStore>;
682+
type LDKNode = Node<SqliteStore>;
673683

674684
/// The main interface object of LDK Node, wrapping the necessary LDK and BDK functionalities.
675685
///

src/test/functional_tests.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,55 @@ fn start_stop_reinit() {
321321
reinitialized_node.stop().unwrap();
322322
}
323323

324+
#[test]
325+
fn start_stop_reinit_fs_store() {
326+
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();
327+
let esplora_url = electrsd.esplora_url.as_ref().unwrap();
328+
let config = random_config(&esplora_url);
329+
let node = Builder::from_config(config.clone()).build_with_fs_store();
330+
let expected_node_id = node.node_id();
331+
332+
let funding_address = node.new_funding_address().unwrap();
333+
let expected_amount = Amount::from_sat(100000);
334+
335+
premine_and_distribute_funds(&bitcoind, &electrsd, vec![funding_address], expected_amount);
336+
assert_eq!(node.onchain_balance().unwrap().get_total(), 0);
337+
338+
node.start().unwrap();
339+
assert_eq!(node.start(), Err(Error::AlreadyRunning));
340+
341+
node.sync_wallets().unwrap();
342+
assert_eq!(node.onchain_balance().unwrap().get_spendable(), expected_amount.to_sat());
343+
344+
node.stop().unwrap();
345+
assert_eq!(node.stop(), Err(Error::NotRunning));
346+
347+
node.start().unwrap();
348+
assert_eq!(node.start(), Err(Error::AlreadyRunning));
349+
350+
node.stop().unwrap();
351+
assert_eq!(node.stop(), Err(Error::NotRunning));
352+
drop(node);
353+
354+
let reinitialized_node = Builder::from_config(config).build_with_fs_store();
355+
assert_eq!(reinitialized_node.node_id(), expected_node_id);
356+
357+
reinitialized_node.start().unwrap();
358+
359+
assert_eq!(
360+
reinitialized_node.onchain_balance().unwrap().get_spendable(),
361+
expected_amount.to_sat()
362+
);
363+
364+
reinitialized_node.sync_wallets().unwrap();
365+
assert_eq!(
366+
reinitialized_node.onchain_balance().unwrap().get_spendable(),
367+
expected_amount.to_sat()
368+
);
369+
370+
reinitialized_node.stop().unwrap();
371+
}
372+
324373
#[test]
325374
fn onchain_spend_receive() {
326375
let (bitcoind, electrsd) = setup_bitcoind_and_electrsd();

0 commit comments

Comments
 (0)