Skip to content

Commit db7766e

Browse files
author
Josh Cox
committed
Merge pull request #37 from seh/make-app-monitoring-safe
Provide a new means to update a Eureka application record periodically
2 parents 196ad6d + c9259b5 commit db7766e

File tree

2 files changed

+186
-1
lines changed

2 files changed

+186
-1
lines changed

connection.go

+133-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package fargo
44

55
import (
66
"math/rand"
7+
"sync"
78
"time"
89
)
910

@@ -80,7 +81,7 @@ func NewConn(address ...string) (e EurekaConnection) {
8081
}
8182

8283
// UpdateApp creates a goroutine that continues to keep an application updated
83-
// with its status in Eureka
84+
// with its status in Eureka.
8485
func (e *EurekaConnection) UpdateApp(app *Application) {
8586
go func() {
8687
for {
@@ -93,3 +94,134 @@ func (e *EurekaConnection) UpdateApp(app *Application) {
9394
}
9495
}()
9596
}
97+
98+
// AppUpdate is the outcome of an attempt to get a fresh snapshot of a Eureka
99+
// application's state, together with an error that may have occurred in that
100+
// attempt. If the Err field is nil, the App field will be non-nil.
101+
type AppUpdate struct {
102+
App *Application
103+
Err error
104+
}
105+
106+
func sendAppUpdatesEvery(d time.Duration, produce func() AppUpdate, c chan<- AppUpdate, done <-chan struct{}) {
107+
t := time.NewTicker(d)
108+
defer t.Stop()
109+
for {
110+
select {
111+
case <-done:
112+
close(c)
113+
return
114+
case <-t.C:
115+
// Drop attempted sends when the consumer hasn't received the last buffered update.
116+
select {
117+
case c <- produce():
118+
default:
119+
}
120+
}
121+
}
122+
}
123+
124+
// ScheduleAppUpdates starts polling for updates to the Eureka application with
125+
// the given name, using the connection's configured polling interval as its
126+
// period. It sends the outcome of each update attempt to the returned channel,
127+
// and continues until the supplied done channel is either closed or has a value
128+
// available. Once done sending updates to the returned channel, it closes it.
129+
//
130+
// If await is true, it sends at least one application update outcome to the
131+
// returned channel before returning.
132+
func (e *EurekaConnection) ScheduleAppUpdates(name string, await bool, done <-chan struct{}) <-chan AppUpdate {
133+
produce := func() AppUpdate {
134+
app, err := e.GetApp(name)
135+
return AppUpdate{app, err}
136+
}
137+
c := make(chan AppUpdate, 1)
138+
if await {
139+
c <- produce()
140+
}
141+
go sendAppUpdatesEvery(time.Duration(e.PollInterval)*time.Second, produce, c, done)
142+
return c
143+
}
144+
145+
// An AppSource holds a periodically updated copy of a Eureka application.
146+
type AppSource struct {
147+
m *sync.RWMutex
148+
app *Application
149+
done chan<- struct{}
150+
}
151+
152+
// NewAppSource returns a new AppSource that offers a periodically updated copy
153+
// of the Eureka application with the given name, using the connection's
154+
// configured polling interval as its period.
155+
//
156+
// If await is true, it waits for the first application update to complete
157+
// before returning, though it's possible that that first update attempt could
158+
// fail, so that a subsequent call to CopyLatestAppTo would return false.
159+
func (e *EurekaConnection) NewAppSource(name string, await bool) *AppSource {
160+
done := make(chan struct{})
161+
updates := e.ScheduleAppUpdates(name, await, done)
162+
s := &AppSource{
163+
done: done,
164+
}
165+
if await {
166+
if u := <-updates; u.Err != nil {
167+
s.app = u.App
168+
}
169+
}
170+
go func() {
171+
for u := range updates {
172+
if u.Err != nil {
173+
s.m.Lock()
174+
s.app = u.App
175+
s.m.Unlock()
176+
}
177+
}
178+
}()
179+
return s
180+
}
181+
182+
// Latest returns the most recently acquired Eureke application, if any. If the
183+
// most recent update attempt failed, or if no update attempt has yet to
184+
// complete, it returns nil.
185+
func (s *AppSource) Latest() *Application {
186+
if s == nil {
187+
return nil
188+
}
189+
s.m.RLock()
190+
defer s.m.RUnlock()
191+
return s.app
192+
}
193+
194+
// CopyLatestTo copies the most recently acquired Eureka application to dst, if
195+
// any, and returns true if such an application was available. If no preceding
196+
// update attempt had succeeded, such that no application is available to be
197+
// copied, it returns false.
198+
func (s *AppSource) CopyLatestTo(dst *Application) bool {
199+
if s == nil {
200+
return false
201+
}
202+
s.m.RLock()
203+
defer s.m.RUnlock()
204+
if s.app == nil {
205+
return false
206+
}
207+
*dst = *s.app
208+
return true
209+
}
210+
211+
// Stop turns off an AppSource, so that it will no longer attempt to update its
212+
// latest application.
213+
//
214+
// It is safe to call Latest or CopyLatestTo on a stopped source.
215+
func (s *AppSource) Stop() {
216+
if s == nil {
217+
return
218+
}
219+
// Allow multiple calls to Stop by precluding repeated attempts to close an
220+
// already closed channel.
221+
s.m.Lock()
222+
defer s.m.Unlock()
223+
if s.done != nil {
224+
close(s.done)
225+
s.done = nil
226+
}
227+
}

example_appupdate_test.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package fargo
2+
3+
// MIT Licensed (see README.md) - Copyright (c) 2013 Hudl <@Hudl>
4+
5+
import (
6+
"fmt"
7+
"time"
8+
)
9+
10+
func ExampleEurekaConnection_ScheduleAppUpdates(e *EurekaConnection) {
11+
done := make(chan struct{})
12+
time.AfterFunc(2*time.Minute, func() {
13+
close(done)
14+
})
15+
name := "my_app"
16+
fmt.Printf("Monitoring application %q.\n", name)
17+
for update := range e.ScheduleAppUpdates(name, true, done) {
18+
if update.Err != nil {
19+
fmt.Printf("Most recent request for application %q failed: %v\n", name, update.Err)
20+
continue
21+
}
22+
fmt.Printf("Application %q has %d instances.\n", name, len(update.App.Instances))
23+
}
24+
fmt.Printf("Done monitoring application %q.\n", name)
25+
}
26+
27+
func ExampleAppSource_Latest(e *EurekaConnection) {
28+
name := "my_app"
29+
source := e.NewAppSource(name, false)
30+
defer source.Stop()
31+
time.Sleep(30 * time.Second)
32+
if app := source.Latest(); app != nil {
33+
fmt.Printf("Application %q has %d instances\n.", name, len(app.Instances))
34+
}
35+
time.Sleep(time.Minute)
36+
if app := source.Latest(); app == nil {
37+
fmt.Printf("No application named %q is available.\n", name)
38+
}
39+
}
40+
41+
func ExampleAppSource_CopyLatestTo(e *EurekaConnection) {
42+
name := "my_app"
43+
source := e.NewAppSource(name, true)
44+
defer source.Stop()
45+
var app Application
46+
if !source.CopyLatestTo(&app) {
47+
fmt.Printf("No application named %q is available.\n", name)
48+
}
49+
time.Sleep(time.Minute)
50+
if source.CopyLatestTo(&app) {
51+
fmt.Printf("Application %q has %d instances\n.", name, len(app.Instances))
52+
}
53+
}

0 commit comments

Comments
 (0)