Skip to content

Commit

Permalink
Refactor code, update README
Browse files Browse the repository at this point in the history
  • Loading branch information
x-cray committed Apr 15, 2016
1 parent c134f0a commit ec61538
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 48 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,3 @@ _testmain.go

# Built binary
marathon-registrator

22 changes: 21 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,31 @@
[![Docker Pulls](https://img.shields.io/docker/pulls/xcray/marathon-registrator.svg)](https://hub.docker.com/r/xcray/marathon-registrator/)
[![](https://badge.imagelayers.io/xcray/marathon-registrator:latest.svg)](https://imagelayers.io/?images=xcray/marathon-registrator:latest 'Get your own badge on imagelayers.io')

Consul service registry bridge for Marathon.
Consul service registry bridge for Marathon. It monitors services running by Marathon and syncs them to Consul. Heavily inspired by [registrator](https://github.com/gliderlabs/registrator).

# Features
* Automatically registers/deregisters Marathon tasks as services in Consul.
* Uses new Marathon [Event Stream](https://mesosphere.github.io/marathon/docs/rest-api.html#event-stream) (e.g. /v2/events) for getting service updates. No need to reconfigure Marathon to use webhooks.
* Automatic cleanup of dangling services from service registry.
* Designed with extensibility in mind: service scheduler and service registry are abstractions which may have different implementations. Currently, there is only Marathon scheduler and Consul service registry implemented.

# Usage

# Development
Install dependencies:
```shell
$ make deps
```

Run tests:
```shell
$ make test
```

Generate mocks for interfaces (run when you modify mocked object interface):
```shell
$ make mocks
```

# License
MIT
16 changes: 8 additions & 8 deletions consul/consul_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
consulAPI "github.com/hashicorp/consul/api"
)

type consulAdapter struct {
type Adapter struct {
client *consulAPI.Client
dryRun bool
}

func New(uri *url.URL, dryRun bool) (*consulAdapter, error) {
func New(uri *url.URL, dryRun bool) (*Adapter, error) {
config := consulAPI.DefaultConfig()
config.Address = uri.Host
config.Scheme = uri.Scheme
Expand All @@ -27,14 +27,14 @@ func New(uri *url.URL, dryRun bool) (*consulAdapter, error) {
return nil, err
}

return &consulAdapter{
return &Adapter{
client: client,
dryRun: dryRun,
}, nil
}

// Ping will try to connect to consul by attempting to retrieve the current leader.
func (r *consulAdapter) Ping() error {
func (r *Adapter) Ping() error {
status := r.client.Status()
leader, err := status.Leader()
if err != nil {
Expand All @@ -45,7 +45,7 @@ func (r *consulAdapter) Ping() error {
return nil
}

func (r *consulAdapter) Register(group *types.ServiceGroup) error {
func (r *Adapter) Register(group *types.ServiceGroup) error {
for _, service := range group.Services {
if r.dryRun {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -82,7 +82,7 @@ func (r *consulAdapter) Register(group *types.ServiceGroup) error {
return nil
}

func (r *consulAdapter) Deregister(group *types.ServiceGroup) error {
func (r *Adapter) Deregister(group *types.ServiceGroup) error {
for _, service := range group.Services {
if r.dryRun {
log.WithFields(log.Fields{
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *consulAdapter) Deregister(group *types.ServiceGroup) error {
return nil
}

func (r *consulAdapter) AdvertiseAddr() (string, error) {
func (r *Adapter) AdvertiseAddr() (string, error) {
info, err := r.client.Agent().Self()
if err != nil {
return "", err
Expand All @@ -138,7 +138,7 @@ func groupID(serviceID string) string {
return serviceID
}

func (r *consulAdapter) Services() ([]*types.ServiceGroup, error) {
func (r *Adapter) Services() ([]*types.ServiceGroup, error) {
services, err := r.client.Agent().Services()
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions marathon/address_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
log "github.com/Sirupsen/logrus"
)

// AddressResolver is the interface for address resolver implementations.
type AddressResolver interface {
Resolve(hostname string) (string, error)
}
Expand Down
27 changes: 16 additions & 11 deletions marathon/marathon_adapter.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package marathon

import (
"errors"
"fmt"
"net/url"
"strconv"
"strings"

"github.com/x-cray/marathon-registrator/types"

"errors"
log "github.com/Sirupsen/logrus"
marathonClient "github.com/gambol99/go-marathon"
)
Expand All @@ -26,12 +26,14 @@ var (
}
)

type marathonAdapter struct {
client MarathonClient
// Adapter is the implementation of RegistryAdapter for Marathon.
type Adapter struct {
client Client
resolver AddressResolver
}

func New(marathonURL string) (*marathonAdapter, error) {
// New creates a new Adapter.
func New(marathonURL string) (*Adapter, error) {
config := marathonClient.NewDefaultConfig()
config.URL = marathonURL
config.RequestTimeout = 60 // 60 seconds
Expand All @@ -43,15 +45,17 @@ func New(marathonURL string) (*marathonAdapter, error) {
return nil, err
}

return &marathonAdapter{
return &Adapter{
client: client,
resolver: &defaultAddressResolver{},
}, nil
}

func (m *marathonAdapter) ListenForEvents(channel types.EventsChannel) error {
// ListenForEvents subscribes to Marathon events and publishes them to channel.
func (m *Adapter) ListenForEvents(channel types.EventsChannel) error {
update := make(marathonClient.EventsChannel, 5)
if err := m.client.AddEventsListener(update, marathonClient.EVENTS_APPLICATIONS|marathonClient.EVENT_FRAMEWORK_MESSAGE); err != nil {
eventTypes := marathonClient.EVENTS_APPLICATIONS | marathonClient.EVENT_FRAMEWORK_MESSAGE
if err := m.client.AddEventsListener(update, eventTypes); err != nil {
return err
}

Expand All @@ -66,13 +70,13 @@ func (m *marathonAdapter) ListenForEvents(channel types.EventsChannel) error {
return nil
}

func (m *marathonAdapter) toServiceHealthCheck(marathonHealthCheck *marathonClient.HealthCheck) (result *types.ServiceHealthCheck) {
func (m *Adapter) toServiceHealthCheck(marathonHealthCheck *marathonClient.HealthCheck) (result *types.ServiceHealthCheck) {
result = &types.ServiceHealthCheck{}

return
}

func (m *marathonAdapter) toServiceEvent(marathonEvent *marathonClient.Event) (result *types.ServiceEvent) {
func (m *Adapter) toServiceEvent(marathonEvent *marathonClient.Event) (result *types.ServiceEvent) {
// Instantiate result object.
result = &types.ServiceEvent{
OriginalEvent: marathonEvent.Event,
Expand Down Expand Up @@ -185,7 +189,7 @@ func isHealthy(task *marathonClient.Task, app *marathonClient.Application) bool
return true
}

func (m *marathonAdapter) toServiceGroup(task *marathonClient.Task, app *marathonClient.Application) (*types.ServiceGroup, error) {
func (m *Adapter) toServiceGroup(task *marathonClient.Task, app *marathonClient.Application) (*types.ServiceGroup, error) {
taskIP, err := m.resolver.Resolve(task.Host)
if err != nil {
return nil, err
Expand Down Expand Up @@ -227,7 +231,8 @@ func (m *marathonAdapter) toServiceGroup(task *marathonClient.Task, app *maratho
return serviceGroup, nil
}

func (m *marathonAdapter) Services() ([]*types.ServiceGroup, error) {
// Services returns the list of registered services.
func (m *Adapter) Services() ([]*types.ServiceGroup, error) {
params := make(url.Values)
params.Add("embed", "apps.tasks")
applications, err := m.client.Applications(params)
Expand Down
20 changes: 10 additions & 10 deletions marathon/marathon_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestMarathonAdapter(t *testing.T) {
var _ = Describe("MarathonAdapter", func() {
var (
mockCtrl *gomock.Controller
client *MockMarathonClient
client *MockClient
resolver *MockAddressResolver
)

Expand Down Expand Up @@ -218,7 +218,7 @@ var _ = Describe("MarathonAdapter", func() {

BeforeEach(func() {
mockCtrl = gomock.NewController(GinkgoT())
client = NewMockMarathonClient(mockCtrl)
client = NewMockClient(mockCtrl)
resolver = NewMockAddressResolver(mockCtrl)
})

Expand All @@ -230,7 +230,7 @@ var _ = Describe("MarathonAdapter", func() {
It("Should forward Marathon client errors", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(nil, errors.New("marathon-error"))
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
_, err := marathonAdapter.Services()
Expand All @@ -243,7 +243,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(singlePortApplications, nil)
resolver.EXPECT().Resolve("web.eu-west-1.internal").Return("", errors.New("resolve-error"))
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
_, err := marathonAdapter.Services()
Expand All @@ -256,7 +256,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(inconsistentPortsApplications, nil)
resolver.EXPECT().Resolve(gomock.Any()).Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
_, err := marathonAdapter.Services()
Expand All @@ -269,7 +269,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(unhealthyApplications, nil)
resolver.EXPECT().Resolve(gomock.Any()).Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
services, err := marathonAdapter.Services()
Expand All @@ -293,7 +293,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(singlePortApplications, nil)
resolver.EXPECT().Resolve("web.eu-west-1.internal").Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
services, err := marathonAdapter.Services()
Expand Down Expand Up @@ -321,7 +321,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(singlePortApplicationsWithLabels, nil)
resolver.EXPECT().Resolve("web.eu-west-1.internal").Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
services, err := marathonAdapter.Services()
Expand Down Expand Up @@ -349,7 +349,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(multiPortSimpleApplications, nil)
resolver.EXPECT().Resolve("web.eu-west-1.internal").Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
services, err := marathonAdapter.Services()
Expand Down Expand Up @@ -386,7 +386,7 @@ var _ = Describe("MarathonAdapter", func() {
// Arrange.
client.EXPECT().Applications(gomock.Any()).Return(multiPortComplexDockerApplications, nil)
resolver.EXPECT().Resolve("web.eu-west-1.internal").Return("10.10.10.20", nil).AnyTimes()
marathonAdapter := &marathonAdapter{client: client, resolver: resolver}
marathonAdapter := &Adapter{client: client, resolver: resolver}

// Act.
services, err := marathonAdapter.Services()
Expand Down
3 changes: 2 additions & 1 deletion marathon/marathon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
marathonClient "github.com/gambol99/go-marathon"
)

type MarathonClient interface {
// Client is the excerpt interface from Marathon API client to generate mocks.
type Client interface {
Applications(url.Values) (*marathonClient.Applications, error)
AddEventsListener(channel marathonClient.EventsChannel, filter int) error
RemoveEventsListener(channel marathonClient.EventsChannel)
Expand Down
32 changes: 16 additions & 16 deletions marathon/marathon_client_mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,52 @@ import (
url "net/url"
)

// Mock of MarathonClient interface
type MockMarathonClient struct {
// Mock of Client interface
type MockClient struct {
ctrl *gomock.Controller
recorder *_MockMarathonClientRecorder
recorder *_MockClientRecorder
}

// Recorder for MockMarathonClient (not exported)
type _MockMarathonClientRecorder struct {
mock *MockMarathonClient
// Recorder for MockClient (not exported)
type _MockClientRecorder struct {
mock *MockClient
}

func NewMockMarathonClient(ctrl *gomock.Controller) *MockMarathonClient {
mock := &MockMarathonClient{ctrl: ctrl}
mock.recorder = &_MockMarathonClientRecorder{mock}
func NewMockClient(ctrl *gomock.Controller) *MockClient {
mock := &MockClient{ctrl: ctrl}
mock.recorder = &_MockClientRecorder{mock}
return mock
}

func (_m *MockMarathonClient) EXPECT() *_MockMarathonClientRecorder {
func (_m *MockClient) EXPECT() *_MockClientRecorder {
return _m.recorder
}

func (_m *MockMarathonClient) Applications(_param0 url.Values) (*go_marathon.Applications, error) {
func (_m *MockClient) Applications(_param0 url.Values) (*go_marathon.Applications, error) {
ret := _m.ctrl.Call(_m, "Applications", _param0)
ret0, _ := ret[0].(*go_marathon.Applications)
ret1, _ := ret[1].(error)
return ret0, ret1
}

func (_mr *_MockMarathonClientRecorder) Applications(arg0 interface{}) *gomock.Call {
func (_mr *_MockClientRecorder) Applications(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Applications", arg0)
}

func (_m *MockMarathonClient) AddEventsListener(channel go_marathon.EventsChannel, filter int) error {
func (_m *MockClient) AddEventsListener(channel go_marathon.EventsChannel, filter int) error {
ret := _m.ctrl.Call(_m, "AddEventsListener", channel, filter)
ret0, _ := ret[0].(error)
return ret0
}

func (_mr *_MockMarathonClientRecorder) AddEventsListener(arg0, arg1 interface{}) *gomock.Call {
func (_mr *_MockClientRecorder) AddEventsListener(arg0, arg1 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "AddEventsListener", arg0, arg1)
}

func (_m *MockMarathonClient) RemoveEventsListener(channel go_marathon.EventsChannel) {
func (_m *MockClient) RemoveEventsListener(channel go_marathon.EventsChannel) {
_m.ctrl.Call(_m, "RemoveEventsListener", channel)
}

func (_mr *_MockMarathonClientRecorder) RemoveEventsListener(arg0 interface{}) *gomock.Call {
func (_mr *_MockClientRecorder) RemoveEventsListener(arg0 interface{}) *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "RemoveEventsListener", arg0)
}

0 comments on commit ec61538

Please sign in to comment.