From af788bf0106f14d5c5894da031bde5cfe4e7c1c8 Mon Sep 17 00:00:00 2001 From: Greg Linton Date: Tue, 22 Aug 2017 14:58:11 -0600 Subject: [PATCH] Add ability to drain to 3rd party services Add support for papertrail Resolves #18 --- api/api.go | 24 ++++++++++++ api/drains.go | 83 ++++++++++++++++++++++++++++++++++++++++ collector/syslog.go | 1 + core/core.go | 47 +++++++++++++++++++++-- core/core_test.go | 2 +- drain/drain.go | 58 +++++++++++++++++++++++++++- drain/mist.go | 3 +- drain/papertrail.go | 51 ++++++++++++++++++++++++ drain/papertrail_test.go | 41 ++++++++++++++++++++ 9 files changed, 302 insertions(+), 8 deletions(-) create mode 100644 api/drains.go create mode 100644 drain/papertrail.go create mode 100644 drain/papertrail_test.go diff --git a/api/api.go b/api/api.go index 94d9ca4..e6a30a8 100644 --- a/api/api.go +++ b/api/api.go @@ -19,6 +19,7 @@ package api import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "regexp" "strconv" @@ -38,6 +39,11 @@ func Start(collector http.HandlerFunc) error { router := pat.New() + router.Delete("/drains/{drainType}", handleRequest(deleteDrain)) + router.Put("/drains/{drainType}", handleRequest(updateDrain)) + router.Get("/drains", handleRequest(listDrains)) + router.Post("/drains", handleRequest(addDrain)) + router.Get("/add-token", handleRequest(addKey)) router.Get("/remove-token", handleRequest(removeKey)) router.Add("OPTIONS", "/", handleRequest(cors)) @@ -183,3 +189,21 @@ func GenerateArchiveEndpoint(archive drain.ArchiverDrain) http.HandlerFunc { res.Write(append(body, byte('\n'))) } } + +// parseBody parses the request into v +func parseBody(req *http.Request, v interface{}) error { + + b, err := ioutil.ReadAll(req.Body) + if err != nil { + return err + } + defer req.Body.Close() + + config.Log.Trace("Parsed body - %s", b) + + if err := json.Unmarshal(b, v); err != nil { + return err + } + + return nil +} diff --git a/api/drains.go b/api/drains.go new file mode 100644 index 0000000..c91d3d2 --- /dev/null +++ b/api/drains.go @@ -0,0 +1,83 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/nanopack/logvac/core" + "github.com/nanopack/logvac/drain" +) + +func addDrain(rw http.ResponseWriter, req *http.Request) { + drainer := logvac.Drain{} + + err := parseBody(req, &drainer) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to parse drain - %s", err.Error()))) + return + } + + err = drain.AddDrain(drainer) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to add drain - %s", err.Error()))) + return + } + + rw.WriteHeader(200) + rw.Write([]byte("success!\n")) +} + +func updateDrain(rw http.ResponseWriter, req *http.Request) { + drainType := req.URL.Query().Get(":drainType") + + drainer := logvac.Drain{} + + err := parseBody(req, &drainer) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to parse drain - %s", err.Error()))) + return + } + + if drainer.Type != drainType { + err = drain.RemoveDrain(drainType) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to remove drain - %s", err.Error()))) + return + } + } + + err = drain.AddDrain(drainer) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to add drain - %s", err.Error()))) + return + } + + rw.WriteHeader(200) + rw.Write([]byte("success!\n")) +} + +func deleteDrain(rw http.ResponseWriter, req *http.Request) { + drainType := req.URL.Query().Get(":drainType") + + err := drain.RemoveDrain(drainType) + if err != nil { + rw.WriteHeader(400) + rw.Write([]byte(fmt.Sprintf("Failed to remove drain - %s", err.Error()))) + return + } + + rw.WriteHeader(200) + rw.Write([]byte("success!\n")) +} + +func listDrains(rw http.ResponseWriter, req *http.Request) { + drains := drain.ListDrains() + + rw.WriteHeader(200) + rw.Write([]byte(fmt.Sprintf("%q\n", drains))) +} diff --git a/collector/syslog.go b/collector/syslog.go index 497c736..31f86f5 100644 --- a/collector/syslog.go +++ b/collector/syslog.go @@ -141,6 +141,7 @@ func parseMessage(b []byte) (msg logvac.Message) { msg.Tag = append([]string{parsedData["tag"].(string)}, tTag...) msg.Priority = adjust[parsedData["severity"].(int)] // parser guarantees [0,7] msg.Content = parsedData["content"].(string) + msg.Raw = b return } } diff --git a/core/core.go b/core/core.go index 52f5f2c..d6fbc68 100644 --- a/core/core.go +++ b/core/core.go @@ -3,6 +3,7 @@ package logvac import ( + "io" "sync" "time" @@ -43,6 +44,7 @@ type ( Type string `json:"type"` // Can be set if logs are submitted via http (deploy logs) Priority int `json:"priority"` Content string `json:"message"` + Raw []byte `json:"raw"` } // Logvac defines the structure for the default logvac object @@ -50,8 +52,16 @@ type ( drains map[string]drainChannels } - // Drain is a function that "drains a Message" - Drain func(Message) + // Drain defines a third party log drain endpoint (generally, only raw logs get drained) + Drain struct { + Type string `json:"type"` // type of service ("papertrail") + URI string `json:"endpoint"` // uri of endpoint "log6.papertrailapp.com:199900" + AuthKey string `json:"key,omitempty"` // key or user for authentication + AuthSecret string `json:"secret,omitempty"` // password or secret for authentication + } + + // DrainFunc is a function that "drains a Message" + DrainFunc func(Message) drainChannels struct { send chan Message @@ -83,11 +93,11 @@ func (l *Logvac) close() { } // AddDrain adds a drain to the listeners and sets its logger -func AddDrain(tag string, drain Drain) { +func AddDrain(tag string, drain DrainFunc) { Vac.addDrain(tag, drain) } -func (l *Logvac) addDrain(tag string, drain Drain) { +func (l *Logvac) addDrain(tag string, drain DrainFunc) { channels := drainChannels{ done: make(chan bool), send: make(chan Message), @@ -143,3 +153,32 @@ func (l *Logvac) writeMessage(msg Message) { } group.Wait() } + +func (m Message) eof() bool { + return len(m.Raw) == 0 +} + +func (m *Message) readByte() byte { + // this function assumes that eof() check was done before + b := m.Raw[0] + m.Raw = m.Raw[1:] + return b +} + +func (m *Message) Read(p []byte) (n int, err error) { + if m.eof() { + err = io.EOF + return + } + + if c := cap(p); c > 0 { + for n < c { + p[n] = m.readByte() + n++ + if m.eof() { + break + } + } + } + return +} diff --git a/core/core_test.go b/core/core_test.go index 34282b3..0f13ee3 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -89,7 +89,7 @@ func TestClose(t *testing.T) { } // writeDrain creates a drain from an io.Writer -func writeDrain(writer io.Writer) logvac.Drain { +func writeDrain(writer io.Writer) logvac.DrainFunc { return func(msg logvac.Message) { data, err := json.Marshal(msg) if err != nil { diff --git a/drain/drain.go b/drain/drain.go index 081a385..e098f3d 100644 --- a/drain/drain.go +++ b/drain/drain.go @@ -28,14 +28,21 @@ type ( Init() error // Publish publishes the tagged data Publish(msg logvac.Message) + // Close closes the drain + Close() error } ) var ( - Publisher PublisherDrain // default publish drain - Archiver ArchiverDrain // default archive drain + Publisher PublisherDrain // default publish drain + Archiver ArchiverDrain // default archive drain + drains map[string]PublisherDrain // contains the third party drains configured (todo: need to persist) ) +func init() { + drains = make(map[string]PublisherDrain, 0) +} + // Init initializes the archiver and publisher drains if configured func Init() error { // initialize archiver @@ -130,3 +137,50 @@ func publishInit() error { } return nil } + +// AddDrain starts draining to a third party log service. +func AddDrain(d logvac.Drain) error { + switch d.Type { + case "papertrail": + // if it already exists, close it and create a new one + if _, ok := drains["papertrail"]; ok { + drains["papertrail"].Close() + } + // pTrail, err := NewPapertrailClient("logs6.papertrailapp.com:19900") + pTrail, err := NewPapertrailClient(d.URI) + if err != nil { + return fmt.Errorf("Failed to create papertrail client - %s", err) + } + err = pTrail.Init() + if err != nil { + return fmt.Errorf("Papertrail failed to initialize - %s", err) + } + drains["papertrail"] = pTrail + default: + return fmt.Errorf("Drain type not supported") + } + + return nil +} + +// RemoveDrain stops draining to a third party log service. +func RemoveDrain(drainType string) error { + if _, ok := drains[drainType]; !ok { + return nil + } + return drains[drainType].Close() +} + +// GetDrain shows the drain information. +func GetDrain(d logvac.Drain) (*PublisherDrain, error) { + drain, ok := drains[d.Type] + if !ok { + return nil, fmt.Errorf("Drain not found") + } + return &drain, nil +} + +// ListDrains shows all the drains configured. +func ListDrains() map[string]PublisherDrain { + return drains +} diff --git a/drain/mist.go b/drain/mist.go index 7666eac..69a2370 100644 --- a/drain/mist.go +++ b/drain/mist.go @@ -90,6 +90,7 @@ func (m *Mist) retryPublish(tags []string, data string) error { } // Close cleanly closes the mist client -func (m *Mist) Close() { +func (m *Mist) Close() error { m.mist.Close() + return nil } diff --git a/drain/papertrail.go b/drain/papertrail.go new file mode 100644 index 0000000..82fadad --- /dev/null +++ b/drain/papertrail.go @@ -0,0 +1,51 @@ +package drain + +import ( + "fmt" + "io" + "net" + + "github.com/nanopack/logvac/core" +) + +// Papertrail drain implements the publisher interface for publishing logs to papertrail. +type Papertrail struct { + Conn io.WriteCloser +} + +// NewPapertrailClient creates a new mist publisher +func NewPapertrailClient(uri string) (*Papertrail, error) { + addr, err := net.ResolveUDPAddr("udp", uri) + if err != nil { + return nil, fmt.Errorf("Failed to resolve papertrail address - %s", err.Error()) + } + + Conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + return nil, fmt.Errorf("Failed to dial papertrail - %s", err.Error()) + } + + return &Papertrail{Conn}, nil +} + +// Init initializes a connection to mist +func (p Papertrail) Init() error { + + // add drain + logvac.AddDrain("papertrail", p.Publish) + + return nil +} + +// Publish utilizes mist's Publish to "drain" a log message +func (p Papertrail) Publish(msg logvac.Message) { + p.Conn.Write(msg.Raw) +} + +// Close closes the connection to papertrail. +func (p *Papertrail) Close() error { + if p.Conn == nil { + return nil + } + return p.Conn.Close() +} diff --git a/drain/papertrail_test.go b/drain/papertrail_test.go new file mode 100644 index 0000000..2844db0 --- /dev/null +++ b/drain/papertrail_test.go @@ -0,0 +1,41 @@ +package drain_test + +import ( + "bytes" + "testing" + + "github.com/nanopack/logvac/core" + "github.com/nanopack/logvac/drain" +) + +// Test adding drain. +func TestPTrailInit(t *testing.T) { + trailTest := &drain.Papertrail{} + trailTest.Close() + trailTest.Init() +} + +// Test writing and reading data, as well as closing. +func TestPTrailPublish(t *testing.T) { + var b WriteCloseBuffer + trailTest := &drain.Papertrail{Conn: &b} + if trailTest.Conn == nil { + t.Fatal("Failed to create a thing") + } + + msg := logvac.Message{Raw: []byte("This is a message\n")} + + trailTest.Publish(msg) + if b.String() != string(msg.Raw) { + t.Fatal("Failed to publish - '%s'", b.String()) + } + trailTest.Close() +} + +type WriteCloseBuffer struct { + bytes.Buffer +} + +func (cb WriteCloseBuffer) Close() error { + return nil +}