@@ -2,6 +2,7 @@ package sweepbatcher
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
6
7
"sync"
7
8
"time"
@@ -46,36 +47,35 @@ const (
46
47
type BatcherStore interface {
47
48
// FetchUnconfirmedSweepBatches fetches all the batches from the
48
49
// database that are not in a confirmed state.
49
- FetchUnconfirmedSweepBatches (ctx context.Context ) ([]* dbBatch ,
50
- error )
50
+ FetchUnconfirmedSweepBatches (ctx context.Context ) ([]* dbBatch , error )
51
51
52
52
// InsertSweepBatch inserts a batch into the database, returning the id
53
53
// of the inserted batch.
54
- InsertSweepBatch (ctx context.Context ,
55
- batch * dbBatch ) (int32 , error )
54
+ InsertSweepBatch (ctx context.Context , batch * dbBatch ) (int32 , error )
55
+
56
+ // DropBatch drops a batch from the database. This should only be used
57
+ // when a batch is empty.
58
+ DropBatch (ctx context.Context , id int32 ) error
56
59
57
60
// UpdateSweepBatch updates a batch in the database.
58
- UpdateSweepBatch (ctx context.Context ,
59
- batch * dbBatch ) error
61
+ UpdateSweepBatch (ctx context.Context , batch * dbBatch ) error
60
62
61
63
// ConfirmBatch confirms a batch by setting its state to confirmed.
62
64
ConfirmBatch (ctx context.Context , id int32 ) error
63
65
64
66
// FetchBatchSweeps fetches all the sweeps that belong to a batch.
65
- FetchBatchSweeps (ctx context.Context ,
66
- id int32 ) ([]* dbSweep , error )
67
+ FetchBatchSweeps (ctx context.Context , id int32 ) ([]* dbSweep , error )
67
68
68
69
// UpsertSweep inserts a sweep into the database, or updates an existing
69
70
// sweep if it already exists.
70
71
UpsertSweep (ctx context.Context , sweep * dbSweep ) error
71
72
72
73
// GetSweepStatus returns the completed status of the sweep.
73
- GetSweepStatus (ctx context.Context , swapHash lntypes.Hash ) (
74
- bool , error )
74
+ GetSweepStatus (ctx context.Context , swapHash lntypes.Hash ) (bool , error )
75
75
76
76
// GetParentBatch returns the parent batch of a (completed) sweep.
77
- GetParentBatch (ctx context.Context , swapHash lntypes.Hash ) (
78
- * dbBatch , error )
77
+ GetParentBatch (ctx context.Context , swapHash lntypes.Hash ) (* dbBatch ,
78
+ error )
79
79
80
80
// TotalSweptAmount returns the total amount swept by a (confirmed)
81
81
// batch.
@@ -135,7 +135,7 @@ type SpendNotifier struct {
135
135
}
136
136
137
137
var (
138
- ErrBatcherShuttingDown = fmt . Errorf ("batcher shutting down" )
138
+ ErrBatcherShuttingDown = errors . New ("batcher shutting down" )
139
139
)
140
140
141
141
// Batcher is a system that is responsible for accepting sweep requests and
@@ -306,7 +306,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
306
306
307
307
if batch .sweepExists (sweep .swapHash ) {
308
308
accepted , err := batch .addSweep (ctx , sweep )
309
- if err != nil {
309
+ if err != nil && ! errors . Is ( err , ErrBatchShuttingDown ) {
310
310
return err
311
311
}
312
312
@@ -321,7 +321,7 @@ func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,
321
321
// If one of the batches accepts the sweep, we provide it to that batch.
322
322
for _ , batch := range b .batches {
323
323
accepted , err := batch .addSweep (ctx , sweep )
324
- if err != nil && err != ErrBatchShuttingDown {
324
+ if err != nil && ! errors . Is ( err , ErrBatchShuttingDown ) {
325
325
return err
326
326
}
327
327
@@ -407,23 +407,23 @@ func (b *Batcher) spinUpBatch(ctx context.Context) (*batch, error) {
407
407
// spinUpBatchDB spins up a batch that already existed in storage, then
408
408
// returns it.
409
409
func (b * Batcher ) spinUpBatchFromDB (ctx context.Context , batch * batch ) error {
410
- cfg := batchConfig {
411
- maxTimeoutDistance : batch .cfg .maxTimeoutDistance ,
412
- batchConfTarget : defaultBatchConfTarget ,
413
- }
414
-
415
- rbfCache := rbfCache {
416
- LastHeight : batch .rbfCache .LastHeight ,
417
- FeeRate : batch .rbfCache .FeeRate ,
418
- }
419
-
420
410
dbSweeps , err := b .store .FetchBatchSweeps (ctx , batch .id )
421
411
if err != nil {
422
412
return err
423
413
}
424
414
425
415
if len (dbSweeps ) == 0 {
426
- return fmt .Errorf ("batch %d has no sweeps" , batch .id )
416
+ log .Infof ("skipping restored batch %d as it has no sweeps" ,
417
+ batch .id )
418
+
419
+ // It is safe to drop this empty batch as it has no sweeps.
420
+ err := b .store .DropBatch (ctx , batch .id )
421
+ if err != nil {
422
+ log .Warnf ("unable to drop empty batch %d: %v" ,
423
+ batch .id , err )
424
+ }
425
+
426
+ return nil
427
427
}
428
428
429
429
primarySweep := dbSweeps [0 ]
@@ -439,6 +439,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
439
439
sweeps [sweep .swapHash ] = * sweep
440
440
}
441
441
442
+ rbfCache := rbfCache {
443
+ LastHeight : batch .rbfCache .LastHeight ,
444
+ FeeRate : batch .rbfCache .FeeRate ,
445
+ }
446
+
442
447
batchKit := batchKit {
443
448
id : batch .id ,
444
449
batchTxid : batch .batchTxid ,
@@ -458,6 +463,11 @@ func (b *Batcher) spinUpBatchFromDB(ctx context.Context, batch *batch) error {
458
463
log : batchPrefixLogger (fmt .Sprintf ("%d" , batch .id )),
459
464
}
460
465
466
+ cfg := batchConfig {
467
+ maxTimeoutDistance : batch .cfg .maxTimeoutDistance ,
468
+ batchConfTarget : defaultBatchConfTarget ,
469
+ }
470
+
461
471
newBatch := NewBatchFromDB (cfg , batchKit )
462
472
463
473
// We add the batch to our map of batches and start it.
0 commit comments