@@ -82,30 +82,33 @@ impl_writeable_tlv_based_enum!(Event,
82
82
} ;
83
83
) ;
84
84
85
- pub struct EventQueue < K : Deref >
85
+ pub struct EventQueue < K : Deref , L : Deref >
86
86
where
87
87
K :: Target : KVStore ,
88
+ L :: Target : Logger ,
88
89
{
89
90
queue : Mutex < VecDeque < Event > > ,
90
91
notifier : Condvar ,
91
92
kv_store : K ,
93
+ logger : L ,
92
94
}
93
95
94
- impl < K : Deref > EventQueue < K >
96
+ impl < K : Deref , L : Deref > EventQueue < K , L >
95
97
where
96
98
K :: Target : KVStore ,
99
+ L :: Target : Logger ,
97
100
{
98
- pub ( crate ) fn new ( kv_store : K ) -> Self {
101
+ pub ( crate ) fn new ( kv_store : K , logger : L ) -> Self {
99
102
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( VecDeque :: new ( ) ) ;
100
103
let notifier = Condvar :: new ( ) ;
101
- Self { queue, notifier, kv_store }
104
+ Self { queue, notifier, kv_store, logger }
102
105
}
103
106
104
107
pub ( crate ) fn add_event ( & self , event : Event ) -> Result < ( ) , Error > {
105
108
{
106
109
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
107
110
locked_queue. push_back ( event) ;
108
- self . persist_queue ( & locked_queue) ?;
111
+ self . write_queue_and_commit ( & locked_queue) ?;
109
112
}
110
113
111
114
self . notifier . notify_one ( ) ;
@@ -122,37 +125,64 @@ where
122
125
{
123
126
let mut locked_queue = self . queue . lock ( ) . unwrap ( ) ;
124
127
locked_queue. pop_front ( ) ;
125
- self . persist_queue ( & locked_queue) ?;
128
+ self . write_queue_and_commit ( & locked_queue) ?;
126
129
}
127
130
self . notifier . notify_one ( ) ;
128
131
Ok ( ( ) )
129
132
}
130
133
131
- fn persist_queue ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
134
+ fn write_queue_and_commit ( & self , locked_queue : & VecDeque < Event > ) -> Result < ( ) , Error > {
132
135
let mut writer = self
133
136
. kv_store
134
137
. write ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
135
- . map_err ( |_| Error :: PersistenceFailed ) ?;
136
- EventQueueSerWrapper ( locked_queue)
137
- . write ( & mut writer)
138
- . map_err ( |_| Error :: PersistenceFailed ) ?;
139
- writer. commit ( ) . map_err ( |_| Error :: PersistenceFailed ) ?;
138
+ . map_err ( |e| {
139
+ log_error ! (
140
+ self . logger,
141
+ "Getting writer for key {}/{} failed due to: {}" ,
142
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
143
+ EVENT_QUEUE_PERSISTENCE_KEY ,
144
+ e
145
+ ) ;
146
+ Error :: PersistenceFailed
147
+ } ) ?;
148
+ EventQueueSerWrapper ( locked_queue) . write ( & mut writer) . map_err ( |e| {
149
+ log_error ! (
150
+ self . logger,
151
+ "Writing event queue data to key {}/{} failed due to: {}" ,
152
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
153
+ EVENT_QUEUE_PERSISTENCE_KEY ,
154
+ e
155
+ ) ;
156
+ Error :: PersistenceFailed
157
+ } ) ?;
158
+ writer. commit ( ) . map_err ( |e| {
159
+ log_error ! (
160
+ self . logger,
161
+ "Committing event queue data to key {}/{} failed due to: {}" ,
162
+ EVENT_QUEUE_PERSISTENCE_NAMESPACE ,
163
+ EVENT_QUEUE_PERSISTENCE_KEY ,
164
+ e
165
+ ) ;
166
+ Error :: PersistenceFailed
167
+ } ) ?;
140
168
Ok ( ( ) )
141
169
}
142
170
}
143
171
144
- impl < K : Deref > ReadableArgs < K > for EventQueue < K >
172
+ impl < K : Deref , L : Deref > ReadableArgs < ( K , L ) > for EventQueue < K , L >
145
173
where
146
174
K :: Target : KVStore ,
175
+ L :: Target : Logger ,
147
176
{
148
177
#[ inline]
149
178
fn read < R : lightning:: io:: Read > (
150
- reader : & mut R , kv_store : K ,
179
+ reader : & mut R , args : ( K , L ) ,
151
180
) -> Result < Self , lightning:: ln:: msgs:: DecodeError > {
181
+ let ( kv_store, logger) = args;
152
182
let read_queue: EventQueueDeserWrapper = Readable :: read ( reader) ?;
153
183
let queue: Mutex < VecDeque < Event > > = Mutex :: new ( read_queue. 0 ) ;
154
184
let notifier = Condvar :: new ( ) ;
155
- Ok ( Self { queue, notifier, kv_store } )
185
+ Ok ( Self { queue, notifier, kv_store, logger } )
156
186
}
157
187
}
158
188
@@ -189,11 +219,11 @@ where
189
219
L :: Target : Logger ,
190
220
{
191
221
wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > ,
192
- event_queue : Arc < EventQueue < K > > ,
222
+ event_queue : Arc < EventQueue < K , L > > ,
193
223
channel_manager : Arc < ChannelManager > ,
194
224
network_graph : Arc < NetworkGraph > ,
195
225
keys_manager : Arc < KeysManager > ,
196
- payment_store : Arc < PaymentInfoStorage < K > > ,
226
+ payment_store : Arc < PaymentInfoStorage < K , L > > ,
197
227
tokio_runtime : Arc < tokio:: runtime:: Runtime > ,
198
228
logger : L ,
199
229
_config : Arc < Config > ,
@@ -205,9 +235,9 @@ where
205
235
L :: Target : Logger ,
206
236
{
207
237
pub fn new (
208
- wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K > > ,
238
+ wallet : Arc < Wallet < bdk:: database:: SqliteDatabase > > , event_queue : Arc < EventQueue < K , L > > ,
209
239
channel_manager : Arc < ChannelManager > , network_graph : Arc < NetworkGraph > ,
210
- keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K > > ,
240
+ keys_manager : Arc < KeysManager > , payment_store : Arc < PaymentInfoStorage < K , L > > ,
211
241
tokio_runtime : Arc < tokio:: runtime:: Runtime > , logger : L , _config : Arc < Config > ,
212
242
) -> Self {
213
243
Self {
@@ -555,12 +585,13 @@ where
555
585
#[ cfg( test) ]
556
586
mod tests {
557
587
use super :: * ;
558
- use crate :: test:: utils:: TestStore ;
588
+ use crate :: test:: utils:: { TestLogger , TestStore } ;
559
589
560
590
#[ test]
561
591
fn event_queue_persistence ( ) {
562
592
let store = Arc :: new ( TestStore :: new ( ) ) ;
563
- let event_queue = EventQueue :: new ( Arc :: clone ( & store) ) ;
593
+ let logger = Arc :: new ( TestLogger :: new ( ) ) ;
594
+ let event_queue = EventQueue :: new ( Arc :: clone ( & store) , Arc :: clone ( & logger) ) ;
564
595
565
596
let expected_event = Event :: ChannelReady { channel_id : [ 23u8 ; 32 ] , user_channel_id : 2323 } ;
566
597
event_queue. add_event ( expected_event. clone ( ) ) . unwrap ( ) ;
@@ -577,7 +608,7 @@ mod tests {
577
608
. get_persisted_bytes ( EVENT_QUEUE_PERSISTENCE_NAMESPACE , EVENT_QUEUE_PERSISTENCE_KEY )
578
609
. unwrap ( ) ;
579
610
let deser_event_queue =
580
- EventQueue :: read ( & mut & persisted_bytes[ ..] , Arc :: clone ( & store) ) . unwrap ( ) ;
611
+ EventQueue :: read ( & mut & persisted_bytes[ ..] , ( Arc :: clone ( & store) , logger ) ) . unwrap ( ) ;
581
612
assert_eq ! ( deser_event_queue. next_event( ) , expected_event) ;
582
613
assert ! ( !store. get_and_clear_did_persist( ) ) ;
583
614
0 commit comments