Skip to content

Commit a2cee86

Browse files
committed
sweepbatcher/test: fix races in require.Eventually
The code inside require.Eventually runs in parallel with the event loops of the batcher and its batches. Accessing fields of the batcher and batches must be done within an event loop. To address this, testRunInEventLoop methods were added to the Batcher and batch types. Unit tests were then rewritten to use this approach when accessing batcher and batch fields. Additionally, in many cases, receive operations from RegisterSpendChannel were moved before require.Eventually. This prevents testRunInEventLoop from getting stuck in an event loop while blocked on a RegisterSpendChannel send operation.
1 parent 026890a commit a2cee86

File tree

3 files changed

+386
-198
lines changed

3 files changed

+386
-198
lines changed

sweepbatcher/sweep_batch.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,12 @@ type batch struct {
215215
// reorgChan is the channel over which reorg notifications are received.
216216
reorgChan chan struct{}
217217

218+
// testReqs is a channel where test requests are received.
219+
// This is used only in unit tests! The reason to have this is to
220+
// avoid data races in require.Eventually calls running in parallel
221+
// to the event loop. See method testRunInEventLoop().
222+
testReqs chan *testRequest
223+
218224
// errChan is the channel over which errors are received.
219225
errChan chan error
220226

@@ -352,6 +358,7 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
352358
spendChan: make(chan *chainntnfs.SpendDetail),
353359
confChan: make(chan *chainntnfs.TxConfirmation, 1),
354360
reorgChan: make(chan struct{}, 1),
361+
testReqs: make(chan *testRequest),
355362
errChan: make(chan error, 1),
356363
callEnter: make(chan struct{}),
357364
callLeave: make(chan struct{}),
@@ -396,6 +403,7 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
396403
spendChan: make(chan *chainntnfs.SpendDetail),
397404
confChan: make(chan *chainntnfs.TxConfirmation, 1),
398405
reorgChan: make(chan struct{}, 1),
406+
testReqs: make(chan *testRequest),
399407
errChan: make(chan error, 1),
400408
callEnter: make(chan struct{}),
401409
callLeave: make(chan struct{}),
@@ -756,6 +764,10 @@ func (b *batch) Run(ctx context.Context) error {
756764
return err
757765
}
758766

767+
case testReq := <-b.testReqs:
768+
testReq.handler()
769+
close(testReq.quit)
770+
759771
case err := <-blockErrChan:
760772
return err
761773

@@ -768,6 +780,36 @@ func (b *batch) Run(ctx context.Context) error {
768780
}
769781
}
770782

783+
// testRunInEventLoop runs a function in the event loop blocking until
784+
// the function returns. For unit tests only!
785+
func (b *batch) testRunInEventLoop(ctx context.Context, handler func()) {
786+
// If the event loop is finished, run the function.
787+
select {
788+
case <-b.stopping:
789+
handler()
790+
791+
return
792+
default:
793+
}
794+
795+
quit := make(chan struct{})
796+
req := &testRequest{
797+
handler: handler,
798+
quit: quit,
799+
}
800+
801+
select {
802+
case b.testReqs <- req:
803+
case <-ctx.Done():
804+
return
805+
}
806+
807+
select {
808+
case <-quit:
809+
case <-ctx.Done():
810+
}
811+
}
812+
771813
// timeout returns minimum timeout as block height among sweeps of the batch.
772814
// If the batch is empty, return -1.
773815
func (b *batch) timeout() int32 {

sweepbatcher/sweep_batcher.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,16 @@ var (
225225
ErrBatcherShuttingDown = errors.New("batcher shutting down")
226226
)
227227

228+
// testRequest is a function passed to an event loop and a channel used to
229+
// wait until the function is executed. This is used in unit tests only!
230+
type testRequest struct {
231+
// handler is the function to an event loop.
232+
handler func()
233+
234+
// quit is closed when the handler completes.
235+
quit chan struct{}
236+
}
237+
228238
// Batcher is a system that is responsible for accepting sweep requests and
229239
// placing them in appropriate batches. It will spin up new batches as needed.
230240
type Batcher struct {
@@ -234,6 +244,12 @@ type Batcher struct {
234244
// sweepReqs is a channel where sweep requests are received.
235245
sweepReqs chan SweepRequest
236246

247+
// testReqs is a channel where test requests are received.
248+
// This is used only in unit tests! The reason to have this is to
249+
// avoid data races in require.Eventually calls running in parallel
250+
// to the event loop. See method testRunInEventLoop().
251+
testReqs chan *testRequest
252+
237253
// errChan is a channel where errors are received.
238254
errChan chan error
239255

@@ -461,6 +477,7 @@ func NewBatcher(wallet lndclient.WalletKitClient,
461477
return &Batcher{
462478
batches: make(map[int32]*batch),
463479
sweepReqs: make(chan SweepRequest),
480+
testReqs: make(chan *testRequest),
464481
errChan: make(chan error, 1),
465482
quit: make(chan struct{}),
466483
initDone: make(chan struct{}),
@@ -528,6 +545,10 @@ func (b *Batcher) Run(ctx context.Context) error {
528545
return err
529546
}
530547

548+
case testReq := <-b.testReqs:
549+
testReq.handler()
550+
close(testReq.quit)
551+
531552
case err := <-b.errChan:
532553
log.Warnf("Batcher received an error: %v.", err)
533554
return err
@@ -551,6 +572,36 @@ func (b *Batcher) AddSweep(sweepReq *SweepRequest) error {
551572
}
552573
}
553574

575+
// testRunInEventLoop runs a function in the event loop blocking until
576+
// the function returns. For unit tests only!
577+
func (b *Batcher) testRunInEventLoop(ctx context.Context, handler func()) {
578+
// If the event loop is finished, run the function.
579+
select {
580+
case <-b.quit:
581+
handler()
582+
583+
return
584+
default:
585+
}
586+
587+
quit := make(chan struct{})
588+
req := &testRequest{
589+
handler: handler,
590+
quit: quit,
591+
}
592+
593+
select {
594+
case b.testReqs <- req:
595+
case <-ctx.Done():
596+
return
597+
}
598+
599+
select {
600+
case <-quit:
601+
case <-ctx.Done():
602+
}
603+
}
604+
554605
// handleSweep handles a sweep request by either placing it in an existing
555606
// batch, or by spinning up a new batch for it.
556607
func (b *Batcher) handleSweep(ctx context.Context, sweep *sweep,

0 commit comments

Comments
 (0)