Skip to content

Commit 5671c61

Browse files
committed
use a priority lock instead of a semaphore
so that there is no case of infinite accumulation of pending peers in the queue. also adds a connectedness check before adding the peer.
1 parent 14213be commit 5671c61

File tree

2 files changed

+19
-31
lines changed

2 files changed

+19
-31
lines changed

notify.go

+9-15
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,11 @@ func (p *PubSubNotif) Connected(n network.Network, c network.Conn) {
2323
}
2424

2525
go func() {
26-
select {
27-
case <-p.newPeersSema:
28-
case <-p.ctx.Done():
29-
return
30-
}
31-
26+
p.newPeersPrioLk.RLock()
27+
p.newPeersMx.Lock()
3228
p.newPeersPend[c.RemotePeer()] = struct{}{}
33-
p.newPeersSema <- struct{}{}
29+
p.newPeersMx.Unlock()
30+
p.newPeersPrioLk.RUnlock()
3431

3532
select {
3633
case p.newPeers <- struct{}{}:
@@ -59,20 +56,17 @@ func (p *PubSubNotif) Initialize() {
5956
return true
6057
}
6158

62-
select {
63-
case <-p.newPeersSema:
64-
case <-p.ctx.Done():
65-
return
66-
}
67-
59+
p.newPeersPrioLk.RLock()
60+
p.newPeersMx.Lock()
6861
for _, pid := range p.host.Network().Peers() {
6962
if isTransient(pid) {
7063
continue
7164
}
65+
7266
p.newPeersPend[pid] = struct{}{}
7367
}
74-
75-
p.newPeersSema <- struct{}{}
68+
p.newPeersMx.Unlock()
69+
p.newPeersPrioLk.RUnlock()
7670

7771
select {
7872
case p.newPeers <- struct{}{}:

pubsub.go

+10-16
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,10 @@ type PubSub struct {
9090
rmTopic chan *rmTopicReq
9191

9292
// a notification channel for new peer connections accumulated
93-
newPeers chan struct{}
94-
newPeersSema chan struct{}
95-
newPeersPend map[peer.ID]struct{}
93+
newPeers chan struct{}
94+
newPeersPrioLk sync.RWMutex
95+
newPeersMx sync.Mutex
96+
newPeersPend map[peer.ID]struct{}
9697

9798
// a notification channel for new outoging peer streams
9899
newPeerStream chan network.Stream
@@ -234,7 +235,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
234235
signPolicy: StrictSign,
235236
incoming: make(chan *RPC, 32),
236237
newPeers: make(chan struct{}, 1),
237-
newPeersSema: make(chan struct{}, 1),
238238
newPeersPend: make(map[peer.ID]struct{}),
239239
newPeerStream: make(chan network.Stream),
240240
newPeerError: make(chan peer.ID),
@@ -264,8 +264,6 @@ func NewPubSub(ctx context.Context, h host.Host, rt PubSubRouter, opts ...Option
264264
counter: uint64(time.Now().UnixNano()),
265265
}
266266

267-
ps.newPeersSema <- struct{}{}
268-
269267
for _, opt := range opts {
270268
err := opt(ps)
271269
if err != nil {
@@ -615,16 +613,8 @@ func (p *PubSub) processLoop(ctx context.Context) {
615613
}
616614

617615
func (p *PubSub) handlePendingPeers() {
618-
select {
619-
case <-p.newPeersSema:
620-
defer func() {
621-
p.newPeersSema <- struct{}{}
622-
}()
623-
624-
default:
625-
// contention, return and wait for the next notification without blocking the event loop
626-
return
627-
}
616+
p.newPeersPrioLk.Lock()
617+
defer p.newPeersPrioLk.Unlock()
628618

629619
if len(p.newPeersPend) == 0 {
630620
return
@@ -634,6 +624,10 @@ func (p *PubSub) handlePendingPeers() {
634624
p.newPeersPend = make(map[peer.ID]struct{})
635625

636626
for pid := range newPeers {
627+
if p.host.Network().Connectedness(pid) != network.Connected {
628+
continue
629+
}
630+
637631
if _, ok := p.peers[pid]; ok {
638632
log.Debug("already have connection to peer: ", pid)
639633
continue

0 commit comments

Comments
 (0)