Skip to content

Commit b324166

Browse files
committed
Simplify KVStore interface
Previously `KVStore::write` would return a `TransactionalWrite` that would allow `Writeables` to be written directly and only be persisted upon calling `TransactionalWrite::commit()`. We however received feedback from implementors that this API is considered unnecessarily complex for a K-V store. Therefore we here refactor `KVStore::write` to take a `buf: &[u8]` directly and serialize the data into corresponding buffers before calling `KVStore::write`, thereby reducing the complexity for any implementation of the `KVStore` interface.
1 parent 8ba0604 commit b324166

File tree

7 files changed

+138
-297
lines changed

7 files changed

+138
-297
lines changed

src/event.rs

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ use crate::payment_store::{
77
PaymentDetails, PaymentDetailsUpdate, PaymentDirection, PaymentStatus, PaymentStore,
88
};
99

10-
use crate::io::{
11-
KVStore, TransactionalWrite, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE,
12-
};
10+
use crate::io::{KVStore, EVENT_QUEUE_PERSISTENCE_KEY, EVENT_QUEUE_PERSISTENCE_NAMESPACE};
1311
use crate::logger::{log_error, log_info, Logger};
1412

1513
use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
@@ -134,7 +132,7 @@ where
134132
{
135133
let mut locked_queue = self.queue.lock().unwrap();
136134
locked_queue.push_back(event);
137-
self.write_queue_and_commit(&locked_queue)?;
135+
self.persist_queue(&locked_queue)?;
138136
}
139137

140138
self.notifier.notify_one();
@@ -156,46 +154,26 @@ where
156154
{
157155
let mut locked_queue = self.queue.lock().unwrap();
158156
locked_queue.pop_front();
159-
self.write_queue_and_commit(&locked_queue)?;
157+
self.persist_queue(&locked_queue)?;
160158
}
161159
self.notifier.notify_one();
162160
Ok(())
163161
}
164162

165-
fn write_queue_and_commit(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
166-
let mut writer = self
167-
.kv_store
168-
.write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY)
163+
fn persist_queue(&self, locked_queue: &VecDeque<Event>) -> Result<(), Error> {
164+
let data = EventQueueSerWrapper(locked_queue).encode();
165+
self.kv_store
166+
.write(EVENT_QUEUE_PERSISTENCE_NAMESPACE, EVENT_QUEUE_PERSISTENCE_KEY, &data)
169167
.map_err(|e| {
170168
log_error!(
171169
self.logger,
172-
"Getting writer for key {}/{} failed due to: {}",
170+
"Write for key {}/{} failed due to: {}",
173171
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
174172
EVENT_QUEUE_PERSISTENCE_KEY,
175173
e
176174
);
177175
Error::PersistenceFailed
178176
})?;
179-
EventQueueSerWrapper(locked_queue).write(&mut writer).map_err(|e| {
180-
log_error!(
181-
self.logger,
182-
"Writing event queue data to key {}/{} failed due to: {}",
183-
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
184-
EVENT_QUEUE_PERSISTENCE_KEY,
185-
e
186-
);
187-
Error::PersistenceFailed
188-
})?;
189-
writer.commit().map_err(|e| {
190-
log_error!(
191-
self.logger,
192-
"Committing event queue data to key {}/{} failed due to: {}",
193-
EVENT_QUEUE_PERSISTENCE_NAMESPACE,
194-
EVENT_QUEUE_PERSISTENCE_KEY,
195-
e
196-
);
197-
Error::PersistenceFailed
198-
})?;
199177
Ok(())
200178
}
201179
}

src/io/fs_store.rs

Lines changed: 88 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#[cfg(target_os = "windows")]
22
extern crate winapi;
33

4-
use super::{KVStore, TransactionalWrite};
4+
use super::KVStore;
55

66
use std::collections::HashMap;
77
use std::fs;
8-
use std::io::{BufReader, BufWriter, Read, Write};
8+
use std::io::{BufReader, Read, Write};
99
use std::path::{Path, PathBuf};
1010
use std::str::FromStr;
1111
use std::sync::{Arc, Mutex, RwLock};
@@ -52,28 +52,84 @@ impl FilesystemStore {
5252

5353
impl KVStore for FilesystemStore {
5454
type Reader = FilesystemReader;
55-
type Writer = FilesystemWriter;
5655

5756
fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
5857
let mut outer_lock = self.locks.lock().unwrap();
5958
let lock_key = (namespace.to_string(), key.to_string());
6059
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
6160

62-
let mut dest_file = self.dest_dir.clone();
63-
dest_file.push(namespace);
64-
dest_file.push(key);
65-
FilesystemReader::new(dest_file, inner_lock_ref)
61+
let mut dest_file_path = self.dest_dir.clone();
62+
dest_file_path.push(namespace);
63+
dest_file_path.push(key);
64+
FilesystemReader::new(dest_file_path, inner_lock_ref)
6665
}
6766

68-
fn write(&self, namespace: &str, key: &str) -> std::io::Result<Self::Writer> {
67+
fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
6968
let mut outer_lock = self.locks.lock().unwrap();
7069
let lock_key = (namespace.to_string(), key.to_string());
7170
let inner_lock_ref = Arc::clone(&outer_lock.entry(lock_key).or_default());
71+
let _guard = inner_lock_ref.write().unwrap();
72+
73+
let mut dest_file_path = self.dest_dir.clone();
74+
dest_file_path.push(namespace);
75+
dest_file_path.push(key);
76+
77+
let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display());
78+
let parent_directory = dest_file_path
79+
.parent()
80+
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?
81+
.to_path_buf();
82+
fs::create_dir_all(parent_directory.clone())?;
83+
84+
// Do a crazy dance with lots of fsync()s to be overly cautious here...
85+
// We never want to end up in a state where we've lost the old data, or end up using the
86+
// old data on power loss after we've returned.
87+
// The way to atomically write a file on Unix platforms is:
88+
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
89+
let mut tmp_file_path = dest_file_path.clone();
90+
let mut rng = thread_rng();
91+
let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
92+
let ext = format!("{}.tmp", rand_str);
93+
tmp_file_path.set_extension(ext);
94+
95+
let mut tmp_file = fs::File::create(&tmp_file_path)?;
96+
tmp_file.write_all(&buf)?;
97+
tmp_file.sync_all()?;
7298

73-
let mut dest_file = self.dest_dir.clone();
74-
dest_file.push(namespace);
75-
dest_file.push(key);
76-
FilesystemWriter::new(dest_file, inner_lock_ref)
99+
#[cfg(not(target_os = "windows"))]
100+
{
101+
fs::rename(&tmp_file_path, &dest_file_path)?;
102+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory.clone())?;
103+
unsafe {
104+
libc::fsync(dir_file.as_raw_fd());
105+
}
106+
}
107+
108+
#[cfg(target_os = "windows")]
109+
{
110+
if dest_file_path.exists() {
111+
unsafe {
112+
winapi::um::winbase::ReplaceFileW(
113+
path_to_windows_str(dest_file_path).as_ptr(),
114+
path_to_windows_str(tmp_file_path).as_ptr(),
115+
std::ptr::null(),
116+
winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
117+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
118+
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
119+
)
120+
};
121+
} else {
122+
call!(unsafe {
123+
winapi::um::winbase::MoveFileExW(
124+
path_to_windows_str(tmp_file_path).as_ptr(),
125+
path_to_windows_str(dest_file_path).as_ptr(),
126+
winapi::um::winbase::MOVEFILE_WRITE_THROUGH
127+
| winapi::um::winbase::MOVEFILE_REPLACE_EXISTING,
128+
)
129+
});
130+
}
131+
}
132+
Ok(())
77133
}
78134

79135
fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
@@ -83,19 +139,20 @@ impl KVStore for FilesystemStore {
83139

84140
let _guard = inner_lock_ref.write().unwrap();
85141

86-
let mut dest_file = self.dest_dir.clone();
87-
dest_file.push(namespace);
88-
dest_file.push(key);
142+
let mut dest_file_path = self.dest_dir.clone();
143+
dest_file_path.push(namespace);
144+
dest_file_path.push(key);
89145

90-
if !dest_file.is_file() {
146+
if !dest_file_path.is_file() {
91147
return Ok(false);
92148
}
93149

94-
fs::remove_file(&dest_file)?;
150+
fs::remove_file(&dest_file_path)?;
95151
#[cfg(not(target_os = "windows"))]
96152
{
97-
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
98-
let parent_directory = dest_file
153+
let msg =
154+
format!("Could not retrieve parent directory of {}.", dest_file_path.display());
155+
let parent_directory = dest_file_path
99156
.parent()
100157
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?;
101158
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
@@ -110,14 +167,14 @@ impl KVStore for FilesystemStore {
110167
}
111168
}
112169

113-
if dest_file.is_file() {
170+
if dest_file_path.is_file() {
114171
return Err(std::io::Error::new(std::io::ErrorKind::Other, "Removing key failed"));
115172
}
116173

117174
if Arc::strong_count(&inner_lock_ref) == 2 {
118175
// It's safe to remove the lock entry if we're the only one left holding a strong
119176
// reference. Checking this is necessary to ensure we continue to distribute references to the
120-
// same lock as long as some Writers/Readers are around. However, we still want to
177+
// same lock as long as some Readers are around. However, we still want to
121178
// clean up the table when possible.
122179
//
123180
// Note that this by itself is still leaky as lock entries will remain when more Readers/Writers are
@@ -171,8 +228,8 @@ pub struct FilesystemReader {
171228
}
172229

173230
impl FilesystemReader {
174-
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
175-
let f = fs::File::open(dest_file.clone())?;
231+
fn new(dest_file_path: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
232+
let f = fs::File::open(dest_file_path.clone())?;
176233
let inner = BufReader::new(f);
177234
Ok(Self { inner, lock_ref })
178235
}
@@ -185,115 +242,26 @@ impl Read for FilesystemReader {
185242
}
186243
}
187244

188-
pub struct FilesystemWriter {
189-
dest_file: PathBuf,
190-
parent_directory: PathBuf,
191-
tmp_file: PathBuf,
192-
tmp_writer: BufWriter<fs::File>,
193-
lock_ref: Arc<RwLock<()>>,
194-
}
195-
196-
impl FilesystemWriter {
197-
fn new(dest_file: PathBuf, lock_ref: Arc<RwLock<()>>) -> std::io::Result<Self> {
198-
let msg = format!("Could not retrieve parent directory of {}.", dest_file.display());
199-
let parent_directory = dest_file
200-
.parent()
201-
.ok_or(std::io::Error::new(std::io::ErrorKind::InvalidInput, msg))?
202-
.to_path_buf();
203-
fs::create_dir_all(parent_directory.clone())?;
204-
205-
// Do a crazy dance with lots of fsync()s to be overly cautious here...
206-
// We never want to end up in a state where we've lost the old data, or end up using the
207-
// old data on power loss after we've returned.
208-
// The way to atomically write a file on Unix platforms is:
209-
// open(tmpname), write(tmpfile), fsync(tmpfile), close(tmpfile), rename(), fsync(dir)
210-
let mut tmp_file = dest_file.clone();
211-
let mut rng = thread_rng();
212-
let rand_str: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
213-
let ext = format!("{}.tmp", rand_str);
214-
tmp_file.set_extension(ext);
215-
216-
let tmp_writer = BufWriter::new(fs::File::create(&tmp_file)?);
217-
218-
Ok(Self { dest_file, parent_directory, tmp_file, tmp_writer, lock_ref })
219-
}
220-
}
221-
222-
impl Write for FilesystemWriter {
223-
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
224-
Ok(self.tmp_writer.write(buf)?)
225-
}
226-
227-
fn flush(&mut self) -> std::io::Result<()> {
228-
self.tmp_writer.flush()?;
229-
self.tmp_writer.get_ref().sync_all()?;
230-
Ok(())
231-
}
232-
}
233-
234-
impl TransactionalWrite for FilesystemWriter {
235-
fn commit(&mut self) -> std::io::Result<()> {
236-
self.flush()?;
237-
238-
let _guard = self.lock_ref.write().unwrap();
239-
// Fsync the parent directory on Unix.
240-
#[cfg(not(target_os = "windows"))]
241-
{
242-
fs::rename(&self.tmp_file, &self.dest_file)?;
243-
let dir_file = fs::OpenOptions::new().read(true).open(self.parent_directory.clone())?;
244-
unsafe {
245-
libc::fsync(dir_file.as_raw_fd());
246-
}
247-
}
248-
249-
#[cfg(target_os = "windows")]
250-
{
251-
if dest_file.exists() {
252-
unsafe {
253-
winapi::um::winbase::ReplaceFileW(
254-
path_to_windows_str(dest_file).as_ptr(),
255-
path_to_windows_str(tmp_file).as_ptr(),
256-
std::ptr::null(),
257-
winapi::um::winbase::REPLACEFILE_IGNORE_MERGE_ERRORS,
258-
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
259-
std::ptr::null_mut() as *mut winapi::ctypes::c_void,
260-
)
261-
};
262-
} else {
263-
call!(unsafe {
264-
winapi::um::winbase::MoveFileExW(
265-
path_to_windows_str(tmp_file).as_ptr(),
266-
path_to_windows_str(dest_file).as_ptr(),
267-
winapi::um::winbase::MOVEFILE_WRITE_THROUGH
268-
| winapi::um::winbase::MOVEFILE_REPLACE_EXISTING,
269-
)
270-
});
271-
}
272-
}
273-
Ok(())
274-
}
275-
}
276-
277245
impl KVStorePersister for FilesystemStore {
278246
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
279247
let msg = format!("Could not persist file for key {}.", prefixed_key);
280-
let dest_file = PathBuf::from_str(prefixed_key).map_err(|_| {
248+
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
281249
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg.clone())
282250
})?;
283251

284-
let parent_directory = dest_file.parent().ok_or(lightning::io::Error::new(
252+
let parent_directory = dest_file_path.parent().ok_or(lightning::io::Error::new(
285253
lightning::io::ErrorKind::InvalidInput,
286254
msg.clone(),
287255
))?;
288256
let namespace = parent_directory.display().to_string();
289257

290-
let dest_without_namespace = dest_file
258+
let dest_without_namespace = dest_file_path
291259
.strip_prefix(&namespace)
292260
.map_err(|_| lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg))?;
293261
let key = dest_without_namespace.display().to_string();
294-
let mut writer = self.write(&namespace, &key)?;
295-
object.write(&mut writer)?;
296-
Ok(writer.commit()?)
262+
263+
self.write(&namespace, &key, &object.encode())?;
264+
Ok(())
297265
}
298266
}
299267

@@ -302,7 +270,7 @@ mod tests {
302270
use super::*;
303271
use crate::test::utils::random_storage_path;
304272
use lightning::util::persist::KVStorePersister;
305-
use lightning::util::ser::{Readable, Writeable};
273+
use lightning::util::ser::Readable;
306274

307275
use proptest::prelude::*;
308276
proptest! {
@@ -315,9 +283,7 @@ mod tests {
315283
let key = "testkey";
316284

317285
// Test the basic KVStore operations.
318-
let mut writer = fs_store.write(namespace, key).unwrap();
319-
data.write(&mut writer).unwrap();
320-
writer.commit().unwrap();
286+
fs_store.write(namespace, key, &data).unwrap();
321287

322288
let listed_keys = fs_store.list(namespace).unwrap();
323289
assert_eq!(listed_keys.len(), 1);

0 commit comments

Comments
 (0)