Skip to content

Commit 65ce0ce

Browse files
committed
Poll the instances for a VIP address periodically
Following the pattern established by EurekaConnection's ScheduleAppUpdates method, attempt to retrieve the latest set of instances for a given VIP address periodically, feeding the outcome into a supplied channel until told to stop.
1 parent 694413e commit 65ce0ce

File tree

3 files changed

+112
-34
lines changed

3 files changed

+112
-34
lines changed

net.go

+108-30
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"net/http"
1212
"strconv"
1313
"strings"
14+
"time"
1415
)
1516

1617
func (e *EurekaConnection) generateURL(slugs ...string) string {
@@ -168,28 +169,20 @@ func WithStatus(status StatusType) InstanceQueryOption {
168169
}
169170
}
170171

171-
func makeUncheckedStatusMatchingOption(status StatusType) InstanceQueryOption {
172-
return func(o *instanceQueryOptions) error {
173-
retainIfStatusIs(status, o)
174-
return nil
175-
}
176-
}
177-
178172
// ThatAreUp restricts the set of instances returned to only those with status UP.
179173
//
180-
// Combining the options produced by this function with those produced by WithStatus applies their
181-
// logical disjunction.
182-
func ThatAreUp() InstanceQueryOption {
183-
return makeUncheckedStatusMatchingOption(UP)
174+
// Combining this function with the options produced by WithStatus applies their logical
175+
// disjunction.
176+
func ThatAreUp(o *instanceQueryOptions) error {
177+
retainIfStatusIs(UP, o)
178+
return nil
184179
}
185180

186181
// Shuffled requests randomizing the order of the sequence of instances returned, using the default
187182
// shared rand.Source.
188-
func Shuffled() InstanceQueryOption {
189-
return func(o *instanceQueryOptions) error {
190-
o.intn = rand.Intn
191-
return nil
192-
}
183+
func Shuffled(o *instanceQueryOptions) error {
184+
o.intn = rand.Intn
185+
return nil
193186
}
194187

195188
// ShuffledWith requests randomizing the order of the sequence of instances returned, using the
@@ -374,36 +367,121 @@ func (e *EurekaConnection) getInstancesByVIPAddress(addr string, opts instanceQu
374367
return instances, nil
375368
}
376369

370+
func mergeInstanceQueryOptions(defaults instanceQueryOptions, opts []InstanceQueryOption) (instanceQueryOptions, error) {
371+
for _, o := range opts {
372+
if o != nil {
373+
if err := o(&defaults); err != nil {
374+
return instanceQueryOptions{}, err
375+
}
376+
}
377+
}
378+
return defaults, nil
379+
}
380+
381+
func collectInstanceQueryOptions(opts []InstanceQueryOption) (instanceQueryOptions, error) {
382+
return mergeInstanceQueryOptions(instanceQueryOptions{}, opts)
383+
}
384+
377385
// GetInstancesByVIPAddress returns the set of instances registered with the given VIP address,
378386
// potentially filtered per the constraints supplied as options.
379387
//
380388
// NB: The VIP address is case-sensitive, and must match the address used at registration time.
381389
func (e *EurekaConnection) GetInstancesByVIPAddress(addr string, opts ...InstanceQueryOption) ([]*Instance, error) {
382-
var mergedOptions instanceQueryOptions
383-
for _, o := range opts {
384-
if o != nil {
385-
if err := o(&mergedOptions); err != nil {
386-
return nil, err
387-
}
388-
}
390+
options, err := collectInstanceQueryOptions(opts)
391+
if err != nil {
392+
return nil, err
389393
}
390-
return e.getInstancesByVIPAddress(addr, mergedOptions)
394+
return e.getInstancesByVIPAddress(addr, options)
391395
}
392396

393397
// GetInstancesBySecureVIPAddress returns the set of instances registered with the given secure
394398
// VIP address, potentially filtered per the constraints supplied as options.
395399
//
396400
// NB: The secure VIP address is case-sensitive, and must match the address used at registration time.
397401
func (e *EurekaConnection) GetInstancesBySecureVIPAddress(addr string, opts ...InstanceQueryOption) ([]*Instance, error) {
398-
mergedOptions := instanceQueryOptions{secure: true}
399-
for _, o := range opts {
400-
if o != nil {
401-
if err := o(&mergedOptions); err != nil {
402-
return nil, err
402+
options, err := mergeInstanceQueryOptions(instanceQueryOptions{secure: true}, opts)
403+
if err != nil {
404+
return nil, err
405+
}
406+
return e.getInstancesByVIPAddress(addr, options)
407+
}
408+
409+
// InstanceSetUpdate is the outcome of an attempt to get a fresh snapshot of a Eureka VIP address's
410+
// set of instances, together with an error that may have occurred in that attempt. If the Err field
411+
// is nil, the Instances field will be populated—though possibly with an empty set.
412+
type InstanceSetUpdate struct {
413+
Instances []*Instance
414+
Err error
415+
}
416+
417+
func sendVIPAddressUpdatesEvery(d time.Duration, produce func() InstanceSetUpdate, c chan<- InstanceSetUpdate, done <-chan struct{}) {
418+
t := time.NewTicker(d)
419+
defer t.Stop()
420+
for {
421+
select {
422+
case <-done:
423+
close(c)
424+
return
425+
case <-t.C:
426+
// Drop attempted sends when the consumer hasn't received the last buffered update.
427+
select {
428+
case c <- produce():
429+
default:
403430
}
404431
}
405432
}
406-
return e.getInstancesByVIPAddress(addr, mergedOptions)
433+
}
434+
435+
func (e *EurekaConnection) scheduleVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts instanceQueryOptions) <-chan InstanceSetUpdate {
436+
produce := func() InstanceSetUpdate {
437+
instances, err := e.getInstancesByVIPAddress(addr, opts)
438+
return InstanceSetUpdate{instances, err}
439+
}
440+
c := make(chan InstanceSetUpdate, 1)
441+
if await {
442+
c <- produce()
443+
}
444+
go sendVIPAddressUpdatesEvery(time.Duration(e.PollInterval)*time.Second, produce, c, done)
445+
return c
446+
}
447+
448+
// ScheduleVIPAddressUpdates starts polling for updates to the set of instances registered with the
449+
// given Eureka VIP address, potentially filtered per the constraints supplied as options, using the
450+
// connection's configured polling interval as its period. It sends the outcome of each update
451+
// attempt to the returned channel, and continues until the supplied done channel is either closed
452+
// or has a value available. Once done sending updates to the returned channel, it closes it.
453+
//
454+
// If await is true, it sends at least one instance set update outcome to the returned channel
455+
// before returning.
456+
//
457+
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
458+
// intended updates.
459+
func (e *EurekaConnection) ScheduleVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) {
460+
options, err := collectInstanceQueryOptions(opts)
461+
if err != nil {
462+
return nil, err
463+
}
464+
return e.scheduleVIPAddressUpdates(addr, await, done, options), nil
465+
}
466+
467+
// ScheduleSecureVIPAddressUpdates starts polling for updates to the set of instances registered
468+
// with the given secure Eureka VIP address, potentially filtered per the constraints supplied as
469+
// options, using the connection's configured polling interval as its period. It sends the outcome
470+
// of each update attempt to the returned channel, and continues until the supplied done channel is
471+
// either closed or has a value available. Once done sending updates to the returned channel, it
472+
// closes it.
473+
//
474+
// If await is true, it sends at least one instance set update outcome to the returned channel
475+
// before returning.
476+
//
477+
// It returns an error if any of the supplied options are invalid, precluding it from scheduling the
478+
// intended updates.
479+
func (e *EurekaConnection) ScheduleSecureVIPAddressUpdates(addr string, await bool, done <-chan struct{}, opts ...InstanceQueryOption) (<-chan InstanceSetUpdate, error) {
480+
options, err := mergeInstanceQueryOptions(instanceQueryOptions{secure: true}, opts)
481+
if err != nil {
482+
return nil, err
483+
}
484+
return e.scheduleVIPAddressUpdates(addr, await, done, options), nil
407485
}
408486

409487
// RegisterInstance will register the given Instance with eureka if it is not already registered,

net_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestInstanceQueryOptions(t *testing.T) {
8686
Convey("A shuffling directive", t, func() {
8787
Convey("using the global Rand instance", func() {
8888
var opts instanceQueryOptions
89-
err := Shuffled()(&opts)
89+
err := Shuffled(&opts)
9090
So(err, ShouldBeNil)
9191
So(opts.intn, ShouldNotBeNil)
9292
So(opts.intn(1), ShouldEqual, 0)
@@ -107,7 +107,7 @@ func TestInstanceQueryOptions(t *testing.T) {
107107
func TestFilterInstancesInApps(t *testing.T) {
108108
Convey("A predicate should preserve only those instances", t, func() {
109109
Convey("with status UP", func() {
110-
areUp := instancePredicateFrom(t, ThatAreUp())
110+
areUp := instancePredicateFrom(t, ThatAreUp)
111111
Convey("from an empty set of applications", func() {
112112
So(filterInstancesInApps(nil, areUp), ShouldBeEmpty)
113113
})

tests/net_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func TestGetSingleInstanceByVIPAddress(t *testing.T) {
149149
So(instances, ShouldHaveLength, 1)
150150
So(instances[0].VipAddress, ShouldEqual, vipAddress)
151151
Convey("requesting the instances by that VIP address with status UP should provide that one", func() {
152-
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp())
152+
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp)
153153
So(err, ShouldBeNil)
154154
So(instances, ShouldHaveLength, 1)
155155
So(instances[0].VipAddress, ShouldEqual, vipAddress)
@@ -165,7 +165,7 @@ func TestGetSingleInstanceByVIPAddress(t *testing.T) {
165165
So(instances, ShouldHaveLength, 1)
166166
Convey("And selecting instances with status UP should provide none", func() {
167167
// Ensure that we tolerate a nil option safely.
168-
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp(), nil)
168+
instances, err := e.GetInstancesByVIPAddress(vipAddress, fargo.ThatAreUp, nil)
169169
So(err, ShouldBeNil)
170170
So(instances, ShouldBeEmpty)
171171
})

0 commit comments

Comments
 (0)