@@ -123,7 +123,7 @@ impl LeftoversStore for EthereumLedgerRedisStore {
123
123
uncredited_settlement_amount
124
124
) ;
125
125
let mut pipe = redis:: pipe ( ) ;
126
- pipe. set (
126
+ pipe. lpush (
127
127
format ! ( "uncredited_settlement_amount:{}" , account_id) ,
128
128
uncredited_settlement_amount. to_string ( ) ,
129
129
)
@@ -147,24 +147,36 @@ impl LeftoversStore for EthereumLedgerRedisStore {
147
147
trace ! ( "Loading uncredited_settlement_amount {:?}" , account_id) ;
148
148
let mut pipe = redis:: pipe ( ) ;
149
149
// Loads the value and resets it to 0
150
- pipe. getset ( format ! ( "uncredited_settlement_amount:{}" , account_id) , 0 ) ;
150
+ pipe. lrange (
151
+ format ! ( "uncredited_settlement_amount:{}" , account_id) ,
152
+ 0 ,
153
+ -1 ,
154
+ ) ;
155
+ pipe. del ( format ! ( "uncredited_settlement_amount:{}" , account_id) )
156
+ . ignore ( ) ;
151
157
Box :: new (
152
158
pipe. query_async ( self . connection . clone ( ) )
153
159
. map_err ( move |err| {
154
160
error ! ( "Error loading uncredited_settlement_amount {:?}: " , err)
155
161
} )
156
162
. and_then (
157
- move |( _conn, uncredited_settlement_amount) : ( _ , Vec < String > ) | {
158
- // redis.rs returns a bulk value for some reason, length is
159
- // always 1
160
- if uncredited_settlement_amount. len ( ) == 1 {
161
- if let Ok ( uncredited_settlement_amount) =
162
- BigUint :: from_str ( & uncredited_settlement_amount[ 0 ] )
163
- {
164
- Box :: new ( ok ( uncredited_settlement_amount) )
165
- } else {
166
- Box :: new ( ok ( Zero :: zero ( ) ) )
163
+ move |( _conn, uncredited_settlement_amounts) : ( _ , Vec < Vec < String > > ) | {
164
+ if uncredited_settlement_amounts. len ( ) == 1 {
165
+ let uncredited_settlement_amounts =
166
+ uncredited_settlement_amounts[ 0 ] . clone ( ) ;
167
+ let mut total_amount = BigUint :: zero ( ) ;
168
+ for uncredited_settlement_amount in uncredited_settlement_amounts {
169
+ let amount = if let Ok ( amount) =
170
+ BigUint :: from_str ( & uncredited_settlement_amount)
171
+ {
172
+ amount
173
+ } else {
174
+ // could not convert to bigint
175
+ return Box :: new ( err ( ( ) ) ) ;
176
+ } ;
177
+ total_amount += amount;
167
178
}
179
+ Box :: new ( ok ( total_amount) )
168
180
} else {
169
181
Box :: new ( ok ( Zero :: zero ( ) ) )
170
182
}
@@ -351,27 +363,31 @@ mod tests {
351
363
block_on, test_eth_store as test_store,
352
364
} ;
353
365
use super :: * ;
366
+ use futures:: future:: join_all;
354
367
use std:: iter:: FromIterator ;
355
368
use std:: str:: FromStr ;
356
369
357
370
#[ test]
358
371
fn saves_and_pops_uncredited_settlement_amount_properly ( ) {
359
- let amount = BigUint :: from ( 100u64 ) ;
360
- let acc = "0" . to_string ( ) ;
361
372
block_on ( test_store ( ) . and_then ( |( store, context) | {
362
- store
363
- . save_uncredited_settlement_amount ( acc. clone ( ) , amount. clone ( ) )
364
- . map_err ( |err| eprintln ! ( "Redis error: {:?}" , err) )
365
- . and_then ( move |_| {
366
- store
367
- . load_uncredited_settlement_amount ( acc)
368
- . map_err ( |err| eprintln ! ( "Redis error: {:?}" , err) )
369
- . and_then ( move |ret| {
370
- assert_eq ! ( amount, ret) ;
371
- let _ = context;
372
- Ok ( ( ) )
373
- } )
374
- } )
373
+ let amount = BigUint :: from ( 100u64 ) ;
374
+ let acc = "0" . to_string ( ) ;
375
+ join_all ( vec ! [
376
+ store. save_uncredited_settlement_amount( acc. clone( ) , amount. clone( ) ) ,
377
+ store. save_uncredited_settlement_amount( acc. clone( ) , amount. clone( ) ) ,
378
+ store. save_uncredited_settlement_amount( acc. clone( ) , amount. clone( ) ) ,
379
+ ] )
380
+ . map_err ( |err| eprintln ! ( "Redis error: {:?}" , err) )
381
+ . and_then ( move |_| {
382
+ store
383
+ . load_uncredited_settlement_amount ( acc)
384
+ . map_err ( |err| eprintln ! ( "Redis error: {:?}" , err) )
385
+ . and_then ( move |ret| {
386
+ assert_eq ! ( ret, BigUint :: from( 300u64 ) ) ;
387
+ let _ = context;
388
+ Ok ( ( ) )
389
+ } )
390
+ } )
375
391
} ) )
376
392
. unwrap ( )
377
393
}
0 commit comments