Skip to content

Commit 9e6a1c3

Browse files
authoredJan 3, 2023
common/mclock: add Alarm (ethereum#26333)
Alarm is a timer utility that simplifies code where a timer needs to be rescheduled over and over. Doing this can be tricky with time.Timer or time.AfterFunc because the channel requires draining in some cases. Alarm is optimized for use cases where items are tracked in a heap according to their expiry time, and a goroutine with a for/select loop wants to be woken up whenever the next item expires. In this application, the timer needs to be rescheduled when an item is added or removed from the heap. Using a timer naively, these updates will always require synchronization with the global runtime timer datastructure to update the timer using Reset. Alarm avoids this by tracking the next expiry time and only modifies the timer if it would need to fire earlier than already scheduled. As an example use, I have converted p2p.dialScheduler to use Alarm instead of AfterFunc.
1 parent c6a2f77 commit 9e6a1c3

File tree

3 files changed

+244
-35
lines changed

3 files changed

+244
-35
lines changed
 

‎common/mclock/alarm.go

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package mclock
18+
19+
import (
20+
"time"
21+
)
22+
23+
// Alarm sends timed notifications on a channel. This is very similar to a regular timer,
24+
// but is easier to use in code that needs to re-schedule the same timer over and over.
25+
//
26+
// When scheduling an Alarm, the channel returned by C() will receive a value no later
27+
// than the scheduled time. An Alarm can be reused after it has fired and can also be
28+
// canceled by calling Stop.
29+
type Alarm struct {
30+
ch chan struct{}
31+
clock Clock
32+
timer Timer
33+
deadline AbsTime
34+
}
35+
36+
// NewAlarm creates an Alarm.
37+
func NewAlarm(clock Clock) *Alarm {
38+
if clock == nil {
39+
panic("nil clock")
40+
}
41+
return &Alarm{
42+
ch: make(chan struct{}, 1),
43+
clock: clock,
44+
}
45+
}
46+
47+
// C returns the alarm notification channel. This channel remains identical for
48+
// the entire lifetime of the alarm, and is never closed.
49+
func (e *Alarm) C() <-chan struct{} {
50+
return e.ch
51+
}
52+
53+
// Stop cancels the alarm and drains the channel.
54+
// This method is not safe for concurrent use.
55+
func (e *Alarm) Stop() {
56+
// Clear timer.
57+
if e.timer != nil {
58+
e.timer.Stop()
59+
}
60+
e.deadline = 0
61+
62+
// Drain the channel.
63+
select {
64+
case <-e.ch:
65+
default:
66+
}
67+
}
68+
69+
// Schedule sets the alarm to fire no later than the given time. If the alarm was already
70+
// scheduled but has not fired yet, it may fire earlier than the newly-scheduled time.
71+
func (e *Alarm) Schedule(time AbsTime) {
72+
now := e.clock.Now()
73+
e.schedule(now, time)
74+
}
75+
76+
func (e *Alarm) schedule(now, newDeadline AbsTime) {
77+
if e.timer != nil {
78+
if e.deadline > now && e.deadline <= newDeadline {
79+
// Here, the current timer can be reused because it is already scheduled to
80+
// occur earlier than the new deadline.
81+
//
82+
// The e.deadline > now part of the condition is important. If the old
83+
// deadline lies in the past, we assume the timer has already fired and needs
84+
// to be rescheduled.
85+
return
86+
}
87+
e.timer.Stop()
88+
}
89+
90+
// Set the timer.
91+
d := time.Duration(0)
92+
if newDeadline < now {
93+
newDeadline = now
94+
} else {
95+
d = newDeadline.Sub(now)
96+
}
97+
e.timer = e.clock.AfterFunc(d, e.send)
98+
e.deadline = newDeadline
99+
}
100+
101+
func (e *Alarm) send() {
102+
select {
103+
case e.ch <- struct{}{}:
104+
default:
105+
}
106+
}

‎common/mclock/alarm_test.go

+116
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
// Copyright 2022 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package mclock
18+
19+
import "testing"
20+
21+
// This test checks basic functionality of Alarm.
22+
func TestAlarm(t *testing.T) {
23+
clk := new(Simulated)
24+
clk.Run(20)
25+
a := NewAlarm(clk)
26+
27+
a.Schedule(clk.Now() + 10)
28+
if recv(a.C()) {
29+
t.Fatal("Alarm fired before scheduled deadline")
30+
}
31+
if ntimers := clk.ActiveTimers(); ntimers != 1 {
32+
t.Fatal("clock has", ntimers, "active timers, want", 1)
33+
}
34+
clk.Run(5)
35+
if recv(a.C()) {
36+
t.Fatal("Alarm fired too early")
37+
}
38+
39+
clk.Run(5)
40+
if !recv(a.C()) {
41+
t.Fatal("Alarm did not fire")
42+
}
43+
if recv(a.C()) {
44+
t.Fatal("Alarm fired twice")
45+
}
46+
if ntimers := clk.ActiveTimers(); ntimers != 0 {
47+
t.Fatal("clock has", ntimers, "active timers, want", 0)
48+
}
49+
50+
a.Schedule(clk.Now() + 5)
51+
if recv(a.C()) {
52+
t.Fatal("Alarm fired before scheduled deadline when scheduling the second event")
53+
}
54+
55+
clk.Run(5)
56+
if !recv(a.C()) {
57+
t.Fatal("Alarm did not fire when scheduling the second event")
58+
}
59+
if recv(a.C()) {
60+
t.Fatal("Alarm fired twice when scheduling the second event")
61+
}
62+
}
63+
64+
// This test checks that scheduling an Alarm to an earlier time than the
65+
// one already scheduled works properly.
66+
func TestAlarmScheduleEarlier(t *testing.T) {
67+
clk := new(Simulated)
68+
clk.Run(20)
69+
a := NewAlarm(clk)
70+
71+
a.Schedule(clk.Now() + 50)
72+
clk.Run(5)
73+
a.Schedule(clk.Now() + 1)
74+
clk.Run(3)
75+
if !recv(a.C()) {
76+
t.Fatal("Alarm did not fire")
77+
}
78+
}
79+
80+
// This test checks that scheduling an Alarm to a later time than the
81+
// one already scheduled works properly.
82+
func TestAlarmScheduleLater(t *testing.T) {
83+
clk := new(Simulated)
84+
clk.Run(20)
85+
a := NewAlarm(clk)
86+
87+
a.Schedule(clk.Now() + 50)
88+
clk.Run(5)
89+
a.Schedule(clk.Now() + 100)
90+
clk.Run(50)
91+
if !recv(a.C()) {
92+
t.Fatal("Alarm did not fire")
93+
}
94+
}
95+
96+
// This test checks that scheduling an Alarm in the past makes it fire immediately.
97+
func TestAlarmNegative(t *testing.T) {
98+
clk := new(Simulated)
99+
clk.Run(50)
100+
a := NewAlarm(clk)
101+
102+
a.Schedule(-1)
103+
clk.Run(1) // needed to process timers
104+
if !recv(a.C()) {
105+
t.Fatal("Alarm did not fire for negative time")
106+
}
107+
}
108+
109+
func recv(ch <-chan struct{}) bool {
110+
select {
111+
case <-ch:
112+
return true
113+
default:
114+
return false
115+
}
116+
}

‎p2p/dial.go

+22-35
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,8 @@ type dialScheduler struct {
117117
staticPool []*dialTask
118118

119119
// The dial history keeps recently dialed nodes. Members of history are not dialed.
120-
history expHeap
121-
historyTimer mclock.Timer
122-
historyTimerTime mclock.AbsTime
120+
history expHeap
121+
historyTimer *mclock.Alarm
123122

124123
// for logStats
125124
lastStatsLog mclock.AbsTime
@@ -160,18 +159,20 @@ func (cfg dialConfig) withDefaults() dialConfig {
160159
}
161160

162161
func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupFunc) *dialScheduler {
162+
cfg := config.withDefaults()
163163
d := &dialScheduler{
164-
dialConfig: config.withDefaults(),
165-
setupFunc: setupFunc,
166-
dialing: make(map[enode.ID]*dialTask),
167-
static: make(map[enode.ID]*dialTask),
168-
peers: make(map[enode.ID]struct{}),
169-
doneCh: make(chan *dialTask),
170-
nodesIn: make(chan *enode.Node),
171-
addStaticCh: make(chan *enode.Node),
172-
remStaticCh: make(chan *enode.Node),
173-
addPeerCh: make(chan *conn),
174-
remPeerCh: make(chan *conn),
164+
dialConfig: cfg,
165+
historyTimer: mclock.NewAlarm(cfg.clock),
166+
setupFunc: setupFunc,
167+
dialing: make(map[enode.ID]*dialTask),
168+
static: make(map[enode.ID]*dialTask),
169+
peers: make(map[enode.ID]struct{}),
170+
doneCh: make(chan *dialTask),
171+
nodesIn: make(chan *enode.Node),
172+
addStaticCh: make(chan *enode.Node),
173+
remStaticCh: make(chan *enode.Node),
174+
addPeerCh: make(chan *conn),
175+
remPeerCh: make(chan *conn),
175176
}
176177
d.lastStatsLog = d.clock.Now()
177178
d.ctx, d.cancel = context.WithCancel(context.Background())
@@ -222,8 +223,7 @@ func (d *dialScheduler) peerRemoved(c *conn) {
222223
// loop is the main loop of the dialer.
223224
func (d *dialScheduler) loop(it enode.Iterator) {
224225
var (
225-
nodesCh chan *enode.Node
226-
historyExp = make(chan struct{}, 1)
226+
nodesCh chan *enode.Node
227227
)
228228

229229
loop:
@@ -236,7 +236,7 @@ loop:
236236
} else {
237237
nodesCh = nil
238238
}
239-
d.rearmHistoryTimer(historyExp)
239+
d.rearmHistoryTimer()
240240
d.logStats()
241241

242242
select {
@@ -297,7 +297,7 @@ loop:
297297
}
298298
}
299299

300-
case <-historyExp:
300+
case <-d.historyTimer.C():
301301
d.expireHistory()
302302

303303
case <-d.ctx.Done():
@@ -306,7 +306,7 @@ loop:
306306
}
307307
}
308308

309-
d.stopHistoryTimer(historyExp)
309+
d.historyTimer.Stop()
310310
for range d.dialing {
311311
<-d.doneCh
312312
}
@@ -343,28 +343,15 @@ func (d *dialScheduler) logStats() {
343343

344344
// rearmHistoryTimer configures d.historyTimer to fire when the
345345
// next item in d.history expires.
346-
func (d *dialScheduler) rearmHistoryTimer(ch chan struct{}) {
347-
if len(d.history) == 0 || d.historyTimerTime == d.history.nextExpiry() {
346+
func (d *dialScheduler) rearmHistoryTimer() {
347+
if len(d.history) == 0 {
348348
return
349349
}
350-
d.stopHistoryTimer(ch)
351-
d.historyTimerTime = d.history.nextExpiry()
352-
timeout := time.Duration(d.historyTimerTime - d.clock.Now())
353-
d.historyTimer = d.clock.AfterFunc(timeout, func() { ch <- struct{}{} })
354-
}
355-
356-
// stopHistoryTimer stops the timer and drains the channel it sends on.
357-
func (d *dialScheduler) stopHistoryTimer(ch chan struct{}) {
358-
if d.historyTimer != nil && !d.historyTimer.Stop() {
359-
<-ch
360-
}
350+
d.historyTimer.Schedule(d.history.nextExpiry())
361351
}
362352

363353
// expireHistory removes expired items from d.history.
364354
func (d *dialScheduler) expireHistory() {
365-
d.historyTimer.Stop()
366-
d.historyTimer = nil
367-
d.historyTimerTime = 0
368355
d.history.expire(d.clock.Now(), func(hkey string) {
369356
var id enode.ID
370357
copy(id[:], hkey)

0 commit comments

Comments
 (0)
Please sign in to comment.