Skip to content

Commit

Permalink
Fix deregistration of services with colliding exposed port numbers. A…
Browse files Browse the repository at this point in the history
…dd docs, fix grammar
  • Loading branch information
x-cray committed Aug 30, 2016
1 parent faca2c1 commit b92f07a
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 14 deletions.
26 changes: 13 additions & 13 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Bridge struct {
scheduler types.SchedulerAdapter
schedulerServiceGroups map[string]*types.ServiceGroup
registry types.RegistryAdapter
registryAdvertizeAddr string
registryAdvertiseAddr string
config *types.Config
}

Expand Down Expand Up @@ -78,7 +78,7 @@ func (b *Bridge) processServiceEvent(event *types.ServiceEvent) error {
case types.ServiceStopped:
// Service stopped, deregister and remove it from cache.
// Only consider services registered on current registry's advertized address.
if event.IP == b.registryAdvertizeAddr {
if event.IP == b.registryAdvertiseAddr {
if group := b.cachedServiceGroup(event.ServiceID, "deregister"); group != nil {
b.registry.Deregister(group)
delete(b.schedulerServiceGroups, event.ServiceID)
Expand All @@ -90,7 +90,7 @@ func (b *Bridge) processServiceEvent(event *types.ServiceEvent) error {
// Service went up, register it.
if group := b.cachedServiceGroup(event.ServiceID, "register"); group != nil {
// Only consider services registered on current registry's advertized address.
if group.IP == b.registryAdvertizeAddr {
if group.IP == b.registryAdvertiseAddr {
b.registry.Register(group)
} else {
logSkipMessage(group.IP)
Expand Down Expand Up @@ -147,7 +147,7 @@ func (b *Bridge) Sync() error {

log.WithField("prefix", "bridge").Infof("Received %d services from registry", len(registryServiceGroups))

// Build ip:port-indexed service map.
// Build service:ip:port-indexed service map.
registryServicesMap := make(map[string]*serviceGroupPair)
for _, group := range registryServiceGroups {
for _, service := range group.Services {
Expand All @@ -169,8 +169,8 @@ func (b *Bridge) Sync() error {
service := schedulerService.service

// If service is not yet registered we need to register it.
// Only consider healthy services registered on current registry's advertized address.
if group.IP == b.registryAdvertizeAddr && service.Healthy && registryServicesMap[group.ServiceKey(service)] == nil {
// Only consider healthy services registered on current registry's advertised address.
if group.IP == b.registryAdvertiseAddr && service.Healthy && registryServicesMap[group.ServiceKey(service)] == nil {
err := b.registry.Register(group)
if err != nil {
return err
Expand Down Expand Up @@ -204,30 +204,30 @@ func (b *Bridge) Sync() error {
func (b *Bridge) refreshSchedulerServices() (map[string]*serviceGroupPair, error) {
log.WithField("prefix", "bridge").Info("Refreshing scheduler services")

// Get registry's advertize address.
// Get registry's advertise address.
addr, err := b.registry.AdvertiseAddr()
if err != nil {
return nil, err
}

b.registryAdvertizeAddr = addr
b.registryAdvertiseAddr = addr

// Get services from scheduler.
schedulerServiceGroups, err := b.scheduler.Services()
if err != nil {
return nil, err
}

log.WithField("prefix", "bridge").Infof("Registry advertize address is %s", b.registryAdvertizeAddr)
log.WithField("prefix", "bridge").Infof("Registry advertise address is %s", b.registryAdvertiseAddr)

// Build 2 maps of services:
// ServiceID-indexed and ip:port-indexed
ipPortServices := make(map[string]*serviceGroupPair)
// ServiceID-indexed and service:ip:port-indexed
servicesMap := make(map[string]*serviceGroupPair)
b.schedulerServiceGroups = make(map[string]*types.ServiceGroup)
for _, group := range schedulerServiceGroups {
b.schedulerServiceGroups[group.ID] = group
for _, service := range group.Services {
ipPortServices[group.ServiceKey(service)] = &serviceGroupPair{
servicesMap[group.ServiceKey(service)] = &serviceGroupPair{
service: service,
group: group,
}
Expand All @@ -239,5 +239,5 @@ func (b *Bridge) refreshSchedulerServices() (map[string]*serviceGroupPair, error
len(schedulerServiceGroups),
)

return ipPortServices, nil
return servicesMap, nil
}
64 changes: 64 additions & 0 deletions bridge/bridge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,70 @@ var _ = Describe("Bridge", func() {
bridge.Sync()
})

It("Should deregister 1 service (with colliding exposed port number) absent from scheduler but present in registry", func() {
// Arrange.
schedulerServices := []*types.ServiceGroup{
&types.ServiceGroup{
ID: "app_server_5877d4d2-7b4b-11e5-b945-56847afe9799",
IP: "10.10.10.10",
Services: []*types.Service{
&types.Service{
ID: "app_server_5877d4d2-7b4b-11e5-b945-56847afe9799:3000",
Name: "app-server",
Healthy: true,
OriginalPort: 3000,
ExposedPort: 31046,
},
},
},
}
registryServices := []*types.ServiceGroup{
&types.ServiceGroup{
ID: "db_server_2c033893-7993-11e5-8878-56847afe9799",
IP: "10.10.10.10",
Services: []*types.Service{
&types.Service{
ID: "db_server_2c033893-7993-11e5-8878-56847afe9799:27017",
Name: "db-server",
ExposedPort: 31046,
},
},
},
&types.ServiceGroup{
ID: "app_server_5877d4d2-7b4b-11e5-b945-56847afe9799",
IP: "10.10.10.10",
Services: []*types.Service{
&types.Service{
ID: "app_server_5877d4d2-7b4b-11e5-b945-56847afe9799:3000",
Name: "app-server",
ExposedPort: 31046,
},
},
},
}
schedulerAdapter.EXPECT().Services().Return(schedulerServices, nil)
registryAdapter.EXPECT().Services().Return(registryServices, nil)
registryAdapter.EXPECT().AdvertiseAddr().Return("10.10.10.10", nil)
registryAdapter.EXPECT().Deregister(gomock.Any()).Do(func(group *types.ServiceGroup) {
Ω(group.IP).Should(Equal("10.10.10.10"))
Ω(group.Services).Should(HaveLen(1))
service := group.Services[0]

Ω(service.ID).Should(Equal("db_server_2c033893-7993-11e5-8878-56847afe9799:27017"))
Ω(service.Name).Should(Equal("db-server"))
Ω(service.ExposedPort).Should(Equal(31046))
}).Return(nil).Times(1)
registryAdapter.EXPECT().Register(gomock.Any()).Times(0)

bridge := &Bridge{
scheduler: schedulerAdapter,
registry: registryAdapter,
}

// Act.
bridge.Sync()
})

It("Should register 2 services absent from registry but present in scheduler", func() {
// Arrange.
schedulerServices := []*types.ServiceGroup{
Expand Down
7 changes: 6 additions & 1 deletion types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,17 @@ type RegistryAdapter interface {
AdvertiseAddr() (string, error)
}

// ServiceGroup represents the collection of services which expose multiple ports.
// Most of the time it will hold the single Service instance, but if the service exposes multiple ports, it will contain
// multiple services named by appending exposed port number to them, i.e. foo-service-3000, foo-service-4001, etc.
type ServiceGroup struct {
ID string
IP string
Services []*Service
HealthChecks []*ServiceHealthCheck
}

// Service represents a single entry in the service registry.
type Service struct {
ID string
Name string
Expand All @@ -35,6 +39,7 @@ type Service struct {
ExposedPort int
}

// ServiceHealthCheck represents health check definition.
type ServiceHealthCheck struct {
ID string
Name string
Expand All @@ -45,7 +50,7 @@ type ServiceHealthCheck struct {
}

func (group *ServiceGroup) ServiceKey(service *Service) string {
return fmt.Sprintf("%s:%d", group.IP, service.ExposedPort)
return fmt.Sprintf("%s:%s:%d", service.Name, group.IP, service.ExposedPort)
}

type ServiceAction int
Expand Down

0 comments on commit b92f07a

Please sign in to comment.