Skip to content

Commit 4a28052

Browse files
committed
services: improve Notary service locking
1. Allow parallel handling of notary requests with different main transaction. 2. Move expired request removal to PostBlock. Close #4077. Signed-off-by: Anna Shaleva <[email protected]>
1 parent c00829e commit 4a28052

File tree

1 file changed

+46
-16
lines changed

1 file changed

+46
-16
lines changed

pkg/services/notary/notary.go

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ type (
5252
// started is a status bool to protect from double start/shutdown.
5353
started atomic.Bool
5454

55-
// reqMtx protects requests list.
55+
// reqMtx protects the request list from concurrent requests addition/removal.
56+
// Use per-request locks instead of this one to perform request-changing operations.
5657
reqMtx sync.RWMutex
5758
// requests represents a map of main transactions which needs to be completed
5859
// with the associated fallback transactions grouped by the main transaction hash
@@ -89,6 +90,7 @@ const defaultTxChannelCapacity = 100
8990
type (
9091
// request represents Notary service request.
9192
request struct {
93+
lock sync.RWMutex
9294
// isSent indicates whether the main transaction was successfully sent to the network.
9395
isSent bool
9496
main *transaction.Transaction
@@ -117,7 +119,8 @@ type (
117119
)
118120

119121
// isMainCompleted denotes whether all signatures for the main transaction were collected.
120-
func (r request) isMainCompleted() bool {
122+
// The caller must hold the request RLock.
123+
func (r *request) isMainCompleted() bool {
121124
if r.witnessInfo == nil {
122125
return false
123126
}
@@ -254,12 +257,14 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
254257
zap.String("verification error", validationErr.Error()))
255258
}
256259
n.reqMtx.Lock()
257-
defer n.reqMtx.Unlock()
258260
r, exists := n.requests[payload.MainTransaction.Hash()]
259261
if exists {
262+
r.lock.Lock() // RLock doesn't fit here since we modify r.minNotValidBefore below.
260263
if slices.ContainsFunc(r.fallbacks, func(fb *transaction.Transaction) bool {
261264
return fb.Hash().Equals(payload.FallbackTransaction.Hash())
262265
}) {
266+
r.lock.Unlock()
267+
n.reqMtx.Unlock()
263268
return // then we already have processed this request
264269
}
265270
r.minNotValidBefore = min(r.minNotValidBefore, nvbFallback)
@@ -270,8 +275,10 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
270275
main: payload.MainTransaction.Copy(),
271276
minNotValidBefore: nvbFallback,
272277
}
278+
r.lock.Lock()
273279
n.requests[payload.MainTransaction.Hash()] = r
274280
}
281+
n.reqMtx.Unlock()
275282
if r.witnessInfo == nil && validationErr == nil {
276283
r.witnessInfo = newInfo
277284
}
@@ -282,8 +289,10 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
282289
// the copy.
283290
r.fallbacks = append(r.fallbacks, payload.FallbackTransaction.Copy())
284291
if exists && r.isMainCompleted() || validationErr != nil {
292+
r.lock.Unlock()
285293
return
286294
}
295+
287296
mainHash := hash.NetSha256(uint32(n.Network), r.main).BytesBE()
288297
for i, w := range payload.MainTransaction.Scripts {
289298
if len(w.InvocationScript) == 0 || // check that signature for this witness was provided
@@ -338,6 +347,7 @@ func (n *Notary) OnNewRequest(payload *payload.P2PNotaryRequest) {
338347
zap.Error(err))
339348
}
340349
}
350+
r.lock.Unlock()
341351
}
342352

343353
// OnRequestRemoval is a callback which is called after fallback transaction is removed
@@ -348,19 +358,31 @@ func (n *Notary) OnRequestRemoval(pld *payload.P2PNotaryRequest) {
348358
}
349359

350360
n.reqMtx.Lock()
351-
defer n.reqMtx.Unlock()
352361
r, ok := n.requests[pld.MainTransaction.Hash()]
362+
n.reqMtx.Unlock()
353363
if !ok {
354364
return
355365
}
366+
367+
var h util.Uint256
368+
r.lock.Lock()
356369
for i, fb := range r.fallbacks {
357370
if fb.Hash().Equals(pld.FallbackTransaction.Hash()) {
358371
r.fallbacks = append(r.fallbacks[:i], r.fallbacks[i+1:]...)
359372
break
360373
}
361374
}
362375
if len(r.fallbacks) == 0 {
363-
delete(n.requests, r.main.Hash())
376+
h = r.main.Hash()
377+
}
378+
r.lock.Unlock()
379+
380+
if !h.Equals(util.Uint256{}) {
381+
// Technically, there's a tiny race here, but it's not that important if all
382+
// previous fallbacks already accepted to the chain.
383+
n.reqMtx.Lock()
384+
delete(n.requests, h)
385+
n.reqMtx.Unlock()
364386
}
365387
}
366388

@@ -379,12 +401,18 @@ func (n *Notary) PostPersist() {
379401
defer n.reqMtx.Unlock()
380402
currHeight := n.Config.Chain.BlockHeight()
381403
for h, r := range n.requests {
404+
r.lock.Lock()
405+
if len(r.fallbacks) == 0 {
406+
delete(n.requests, r.main.Hash())
407+
continue
408+
}
382409
if !r.isSent && r.isMainCompleted() && r.minNotValidBefore > currHeight {
383410
if err := n.finalize(acc, r.main, h); err != nil {
384411
n.Config.Log.Error("failed to finalize main transaction after PostPersist, waiting for the next block to retry",
385412
zap.String("hash", r.main.Hash().StringLE()),
386413
zap.Error(err))
387414
}
415+
r.lock.Unlock()
388416
continue
389417
}
390418
if r.minNotValidBefore <= currHeight { // then at least one of the fallbacks can already be sent.
@@ -400,6 +428,7 @@ func (n *Notary) PostPersist() {
400428
}
401429
}
402430
}
431+
r.lock.Unlock()
403432
}
404433
}
405434

@@ -448,33 +477,37 @@ func (n *Notary) newTxCallbackLoop() {
448477
case tx := <-n.newTxs:
449478
isMain := tx.tx.Hash() == tx.mainHash
450479

451-
n.reqMtx.Lock()
480+
n.reqMtx.RLock()
452481
r, ok := n.requests[tx.mainHash]
453-
if !ok || isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
454-
n.reqMtx.Unlock()
482+
n.reqMtx.RUnlock()
483+
if !ok {
484+
continue
485+
}
486+
r.lock.Lock()
487+
if isMain && (r.isSent || r.minNotValidBefore <= n.Config.Chain.BlockHeight()) {
488+
r.lock.Unlock()
455489
continue
456490
}
491+
457492
if !isMain {
458493
// Ensure that fallback was not already completed.
459494
var isPending = slices.ContainsFunc(r.fallbacks, func(fb *transaction.Transaction) bool {
460495
return fb.Hash() == tx.tx.Hash()
461496
})
462497
if !isPending {
463-
n.reqMtx.Unlock()
498+
r.lock.Unlock()
464499
continue
465500
}
466501
}
467502

468-
n.reqMtx.Unlock()
469503
err := n.onTransaction(tx.tx)
470504
if err != nil {
471505
n.Config.Log.Error("new transaction callback finished with error",
472506
zap.Error(err),
473507
zap.Bool("is main", isMain))
508+
r.lock.Unlock()
474509
continue
475510
}
476-
477-
n.reqMtx.Lock()
478511
if isMain {
479512
r.isSent = true
480513
} else {
@@ -484,11 +517,8 @@ func (n *Notary) newTxCallbackLoop() {
484517
break
485518
}
486519
}
487-
if len(r.fallbacks) == 0 {
488-
delete(n.requests, tx.mainHash)
489-
}
490520
}
491-
n.reqMtx.Unlock()
521+
r.lock.Unlock()
492522
case <-n.stopCh:
493523
return
494524
}

0 commit comments

Comments
 (0)