Skip to content

Commit 470ac64

Browse files
committed
Merge branch 'master' into parser-options
2 parents 6c23101 + 783cfcb commit 470ac64

File tree

6 files changed

+413
-94
lines changed

6 files changed

+413
-94
lines changed

cron.go

+52-9
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
package cron
44

55
import (
6+
"log"
7+
"runtime"
68
"sort"
79
"time"
810
)
@@ -16,6 +18,8 @@ type Cron struct {
1618
add chan *Entry
1719
snapshot chan []*Entry
1820
running bool
21+
ErrorLog *log.Logger
22+
location *time.Location
1923
}
2024

2125
// Job is an interface for submitted cron jobs.
@@ -66,14 +70,21 @@ func (s byTime) Less(i, j int) bool {
6670
return s[i].Next.Before(s[j].Next)
6771
}
6872

69-
// New returns a new Cron job runner.
73+
// New returns a new Cron job runner, in the Local time zone.
7074
func New() *Cron {
75+
return NewWithLocation(time.Now().Location())
76+
}
77+
78+
// NewWithLocation returns a new Cron job runner.
79+
func NewWithLocation(location *time.Location) *Cron {
7180
return &Cron{
7281
entries: nil,
7382
add: make(chan *Entry),
7483
stop: make(chan struct{}),
7584
snapshot: make(chan []*Entry),
7685
running: false,
86+
ErrorLog: nil,
87+
location: location,
7788
}
7889
}
7990

@@ -87,7 +98,7 @@ func (c *Cron) AddFunc(spec string, cmd func()) error {
8798
return c.AddJob(spec, FuncJob(cmd))
8899
}
89100

90-
// AddFunc adds a Job to the Cron to be run on the given schedule.
101+
// AddJob adds a Job to the Cron to be run on the given schedule.
91102
func (c *Cron) AddJob(spec string, cmd Job) error {
92103
schedule, err := Parse(spec)
93104
if err != nil {
@@ -121,17 +132,37 @@ func (c *Cron) Entries() []*Entry {
121132
return c.entrySnapshot()
122133
}
123134

124-
// Start the cron scheduler in its own go-routine.
135+
// Location gets the time zone location
136+
func (c *Cron) Location() *time.Location {
137+
return c.location
138+
}
139+
140+
// Start the cron scheduler in its own go-routine, or no-op if already started.
125141
func (c *Cron) Start() {
142+
if c.running {
143+
return
144+
}
126145
c.running = true
127146
go c.run()
128147
}
129148

149+
func (c *Cron) runWithRecovery(j Job) {
150+
defer func() {
151+
if r := recover(); r != nil {
152+
const size = 64 << 10
153+
buf := make([]byte, size)
154+
buf = buf[:runtime.Stack(buf, false)]
155+
c.logf("cron: panic running job: %v\n%s", r, buf)
156+
}
157+
}()
158+
j.Run()
159+
}
160+
130161
// Run the scheduler.. this is private just due to the need to synchronize
131162
// access to the 'running' state variable.
132163
func (c *Cron) run() {
133164
// Figure out the next activation times for each entry.
134-
now := time.Now().Local()
165+
now := time.Now().In(c.location)
135166
for _, entry := range c.entries {
136167
entry.Next = entry.Schedule.Next(now)
137168
}
@@ -149,32 +180,44 @@ func (c *Cron) run() {
149180
effective = c.entries[0].Next
150181
}
151182

183+
timer := time.NewTimer(effective.Sub(now))
152184
select {
153-
case now = <-time.After(effective.Sub(now)):
185+
case now = <-timer.C:
154186
// Run every entry whose next time was this effective time.
155187
for _, e := range c.entries {
156188
if e.Next != effective {
157189
break
158190
}
159-
go e.Job.Run()
191+
go c.runWithRecovery(e.Job)
160192
e.Prev = e.Next
161-
e.Next = e.Schedule.Next(effective)
193+
e.Next = e.Schedule.Next(now)
162194
}
163195
continue
164196

165197
case newEntry := <-c.add:
166198
c.entries = append(c.entries, newEntry)
167-
newEntry.Next = newEntry.Schedule.Next(now)
199+
newEntry.Next = newEntry.Schedule.Next(time.Now().In(c.location))
168200

169201
case <-c.snapshot:
170202
c.snapshot <- c.entrySnapshot()
171203

172204
case <-c.stop:
205+
timer.Stop()
173206
return
174207
}
175208

176209
// 'now' should be updated after newEntry and snapshot cases.
177-
now = time.Now().Local()
210+
now = time.Now().In(c.location)
211+
timer.Stop()
212+
}
213+
}
214+
215+
// Logs an error to stderr or to the configured error log
216+
func (c *Cron) logf(format string, args ...interface{}) {
217+
if c.ErrorLog != nil {
218+
c.ErrorLog.Printf(format, args...)
219+
} else {
220+
log.Printf(format, args...)
178221
}
179222
}
180223

cron_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,38 @@ import (
1212
// compensate for a few milliseconds of runtime.
1313
const ONE_SECOND = 1*time.Second + 10*time.Millisecond
1414

15+
func TestFuncPanicRecovery(t *testing.T) {
16+
cron := New()
17+
cron.Start()
18+
defer cron.Stop()
19+
cron.AddFunc("* * * * * ?", func() { panic("YOLO") })
20+
21+
select {
22+
case <-time.After(ONE_SECOND):
23+
return
24+
}
25+
}
26+
27+
type DummyJob struct{}
28+
29+
func (d DummyJob) Run() {
30+
panic("YOLO")
31+
}
32+
33+
func TestJobPanicRecovery(t *testing.T) {
34+
var job DummyJob
35+
36+
cron := New()
37+
cron.Start()
38+
defer cron.Stop()
39+
cron.AddJob("* * * * * ?", job)
40+
41+
select {
42+
case <-time.After(ONE_SECOND):
43+
return
44+
}
45+
}
46+
1547
// Start and stop cron with no entries.
1648
func TestNoEntries(t *testing.T) {
1749
cron := New()
@@ -77,6 +109,22 @@ func TestAddWhileRunning(t *testing.T) {
77109
}
78110
}
79111

112+
// Test for #34. Adding a job after calling start results in multiple job invocations
113+
func TestAddWhileRunningWithDelay(t *testing.T) {
114+
cron := New()
115+
cron.Start()
116+
defer cron.Stop()
117+
time.Sleep(5 * time.Second)
118+
var calls = 0
119+
cron.AddFunc("* * * * * *", func() { calls += 1 })
120+
121+
<-time.After(ONE_SECOND)
122+
if calls != 1 {
123+
fmt.Printf("called %d times, expected 1\n", calls)
124+
t.Fail()
125+
}
126+
}
127+
80128
// Test timing with Entries.
81129
func TestSnapshotEntries(t *testing.T) {
82130
wg := &sync.WaitGroup{}
@@ -189,6 +237,33 @@ func TestLocalTimezone(t *testing.T) {
189237
}
190238
}
191239

240+
// Test that the cron is run in the given time zone (as opposed to local).
241+
func TestNonLocalTimezone(t *testing.T) {
242+
wg := &sync.WaitGroup{}
243+
wg.Add(1)
244+
245+
loc, err := time.LoadLocation("Atlantic/Cape_Verde")
246+
if err != nil {
247+
fmt.Printf("Failed to load time zone Atlantic/Cape_Verde: %+v", err)
248+
t.Fail()
249+
}
250+
251+
now := time.Now().In(loc)
252+
spec := fmt.Sprintf("%d %d %d %d %d ?",
253+
now.Second()+1, now.Minute(), now.Hour(), now.Day(), now.Month())
254+
255+
cron := NewWithLocation(loc)
256+
cron.AddFunc(spec, func() { wg.Done() })
257+
cron.Start()
258+
defer cron.Stop()
259+
260+
select {
261+
case <-time.After(ONE_SECOND):
262+
t.FailNow()
263+
case <-wait(wg):
264+
}
265+
}
266+
192267
// Test that calling stop before start silently returns without
193268
// blocking the stop channel.
194269
func TestStopWithoutStart(t *testing.T) {

0 commit comments

Comments
 (0)