Skip to content

Commit

Permalink
Add ability to drain to 3rd party services
Browse files Browse the repository at this point in the history
Add support for papertrail
Resolves #18
  • Loading branch information
glinton committed Aug 22, 2017
1 parent 42b3fdc commit af788bf
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 8 deletions.
24 changes: 24 additions & 0 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package api
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
Expand All @@ -38,6 +39,11 @@ func Start(collector http.HandlerFunc) error {

router := pat.New()

router.Delete("/drains/{drainType}", handleRequest(deleteDrain))
router.Put("/drains/{drainType}", handleRequest(updateDrain))
router.Get("/drains", handleRequest(listDrains))
router.Post("/drains", handleRequest(addDrain))

router.Get("/add-token", handleRequest(addKey))
router.Get("/remove-token", handleRequest(removeKey))
router.Add("OPTIONS", "/", handleRequest(cors))
Expand Down Expand Up @@ -183,3 +189,21 @@ func GenerateArchiveEndpoint(archive drain.ArchiverDrain) http.HandlerFunc {
res.Write(append(body, byte('\n')))
}
}

// parseBody parses the request into v
func parseBody(req *http.Request, v interface{}) error {

b, err := ioutil.ReadAll(req.Body)
if err != nil {
return err
}
defer req.Body.Close()

config.Log.Trace("Parsed body - %s", b)

if err := json.Unmarshal(b, v); err != nil {
return err
}

return nil
}
83 changes: 83 additions & 0 deletions api/drains.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package api

import (
"fmt"
"net/http"

"github.com/nanopack/logvac/core"
"github.com/nanopack/logvac/drain"
)

func addDrain(rw http.ResponseWriter, req *http.Request) {
drainer := logvac.Drain{}

err := parseBody(req, &drainer)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to parse drain - %s", err.Error())))
return
}

err = drain.AddDrain(drainer)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to add drain - %s", err.Error())))
return
}

rw.WriteHeader(200)
rw.Write([]byte("success!\n"))
}

func updateDrain(rw http.ResponseWriter, req *http.Request) {
drainType := req.URL.Query().Get(":drainType")

drainer := logvac.Drain{}

err := parseBody(req, &drainer)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to parse drain - %s", err.Error())))
return
}

if drainer.Type != drainType {
err = drain.RemoveDrain(drainType)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to remove drain - %s", err.Error())))
return
}
}

err = drain.AddDrain(drainer)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to add drain - %s", err.Error())))
return
}

rw.WriteHeader(200)
rw.Write([]byte("success!\n"))
}

func deleteDrain(rw http.ResponseWriter, req *http.Request) {
drainType := req.URL.Query().Get(":drainType")

err := drain.RemoveDrain(drainType)
if err != nil {
rw.WriteHeader(400)
rw.Write([]byte(fmt.Sprintf("Failed to remove drain - %s", err.Error())))
return
}

rw.WriteHeader(200)
rw.Write([]byte("success!\n"))
}

func listDrains(rw http.ResponseWriter, req *http.Request) {
drains := drain.ListDrains()

rw.WriteHeader(200)
rw.Write([]byte(fmt.Sprintf("%q\n", drains)))
}
1 change: 1 addition & 0 deletions collector/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func parseMessage(b []byte) (msg logvac.Message) {
msg.Tag = append([]string{parsedData["tag"].(string)}, tTag...)
msg.Priority = adjust[parsedData["severity"].(int)] // parser guarantees [0,7]
msg.Content = parsedData["content"].(string)
msg.Raw = b
return
}
}
Expand Down
47 changes: 43 additions & 4 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package logvac

import (
"io"
"sync"
"time"

Expand Down Expand Up @@ -43,15 +44,24 @@ type (
Type string `json:"type"` // Can be set if logs are submitted via http (deploy logs)
Priority int `json:"priority"`
Content string `json:"message"`
Raw []byte `json:"raw"`
}

// Logvac defines the structure for the default logvac object
Logvac struct {
drains map[string]drainChannels
}

// Drain is a function that "drains a Message"
Drain func(Message)
// Drain defines a third party log drain endpoint (generally, only raw logs get drained)
Drain struct {
Type string `json:"type"` // type of service ("papertrail")
URI string `json:"endpoint"` // uri of endpoint "log6.papertrailapp.com:199900"
AuthKey string `json:"key,omitempty"` // key or user for authentication
AuthSecret string `json:"secret,omitempty"` // password or secret for authentication
}

// DrainFunc is a function that "drains a Message"
DrainFunc func(Message)

drainChannels struct {
send chan Message
Expand Down Expand Up @@ -83,11 +93,11 @@ func (l *Logvac) close() {
}

// AddDrain adds a drain to the listeners and sets its logger
func AddDrain(tag string, drain Drain) {
func AddDrain(tag string, drain DrainFunc) {
Vac.addDrain(tag, drain)
}

func (l *Logvac) addDrain(tag string, drain Drain) {
func (l *Logvac) addDrain(tag string, drain DrainFunc) {
channels := drainChannels{
done: make(chan bool),
send: make(chan Message),
Expand Down Expand Up @@ -143,3 +153,32 @@ func (l *Logvac) writeMessage(msg Message) {
}
group.Wait()
}

func (m Message) eof() bool {
return len(m.Raw) == 0
}

func (m *Message) readByte() byte {
// this function assumes that eof() check was done before
b := m.Raw[0]
m.Raw = m.Raw[1:]
return b
}

func (m *Message) Read(p []byte) (n int, err error) {
if m.eof() {
err = io.EOF
return
}

if c := cap(p); c > 0 {
for n < c {
p[n] = m.readByte()
n++
if m.eof() {
break
}
}
}
return
}
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestClose(t *testing.T) {
}

// writeDrain creates a drain from an io.Writer
func writeDrain(writer io.Writer) logvac.Drain {
func writeDrain(writer io.Writer) logvac.DrainFunc {
return func(msg logvac.Message) {
data, err := json.Marshal(msg)
if err != nil {
Expand Down
58 changes: 56 additions & 2 deletions drain/drain.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ type (
Init() error
// Publish publishes the tagged data
Publish(msg logvac.Message)
// Close closes the drain
Close() error
}
)

var (
Publisher PublisherDrain // default publish drain
Archiver ArchiverDrain // default archive drain
Publisher PublisherDrain // default publish drain
Archiver ArchiverDrain // default archive drain
drains map[string]PublisherDrain // contains the third party drains configured (todo: need to persist)
)

func init() {
drains = make(map[string]PublisherDrain, 0)
}

// Init initializes the archiver and publisher drains if configured
func Init() error {
// initialize archiver
Expand Down Expand Up @@ -130,3 +137,50 @@ func publishInit() error {
}
return nil
}

// AddDrain starts draining to a third party log service.
func AddDrain(d logvac.Drain) error {
switch d.Type {
case "papertrail":
// if it already exists, close it and create a new one
if _, ok := drains["papertrail"]; ok {
drains["papertrail"].Close()
}
// pTrail, err := NewPapertrailClient("logs6.papertrailapp.com:19900")
pTrail, err := NewPapertrailClient(d.URI)
if err != nil {
return fmt.Errorf("Failed to create papertrail client - %s", err)
}
err = pTrail.Init()
if err != nil {
return fmt.Errorf("Papertrail failed to initialize - %s", err)
}
drains["papertrail"] = pTrail
default:
return fmt.Errorf("Drain type not supported")
}

return nil
}

// RemoveDrain stops draining to a third party log service.
func RemoveDrain(drainType string) error {
if _, ok := drains[drainType]; !ok {
return nil
}
return drains[drainType].Close()
}

// GetDrain shows the drain information.
func GetDrain(d logvac.Drain) (*PublisherDrain, error) {
drain, ok := drains[d.Type]
if !ok {
return nil, fmt.Errorf("Drain not found")
}
return &drain, nil
}

// ListDrains shows all the drains configured.
func ListDrains() map[string]PublisherDrain {
return drains
}
3 changes: 2 additions & 1 deletion drain/mist.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (m *Mist) retryPublish(tags []string, data string) error {
}

// Close cleanly closes the mist client
func (m *Mist) Close() {
func (m *Mist) Close() error {
m.mist.Close()
return nil
}
51 changes: 51 additions & 0 deletions drain/papertrail.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package drain

import (
"fmt"
"io"
"net"

"github.com/nanopack/logvac/core"
)

// Papertrail drain implements the publisher interface for publishing logs to papertrail.
type Papertrail struct {
Conn io.WriteCloser
}

// NewPapertrailClient creates a new mist publisher
func NewPapertrailClient(uri string) (*Papertrail, error) {
addr, err := net.ResolveUDPAddr("udp", uri)
if err != nil {
return nil, fmt.Errorf("Failed to resolve papertrail address - %s", err.Error())
}

Conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return nil, fmt.Errorf("Failed to dial papertrail - %s", err.Error())
}

return &Papertrail{Conn}, nil
}

// Init initializes a connection to mist
func (p Papertrail) Init() error {

// add drain
logvac.AddDrain("papertrail", p.Publish)

return nil
}

// Publish utilizes mist's Publish to "drain" a log message
func (p Papertrail) Publish(msg logvac.Message) {
p.Conn.Write(msg.Raw)
}

// Close closes the connection to papertrail.
func (p *Papertrail) Close() error {
if p.Conn == nil {
return nil
}
return p.Conn.Close()
}
Loading

0 comments on commit af788bf

Please sign in to comment.