Skip to content

Commit

Permalink
Merge pull request #15 from nanopack/feature/multi-tag
Browse files Browse the repository at this point in the history
Add ability to specify multiple tags for a log
  • Loading branch information
glinton authored Aug 14, 2017
2 parents 27e49a8 + 8f17c38 commit 42b3fdc
Show file tree
Hide file tree
Showing 14 changed files with 425 additions and 54 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,5 @@ logvac
logvac.json
*.cover*
db
vendor/*/

3 changes: 2 additions & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,11 @@ func verify(fn http.HandlerFunc) http.HandlerFunc {
// note: javascript number precision may cause unexpected results (missing logs within 100 nanosecond window)
func GenerateArchiveEndpoint(archive drain.ArchiverDrain) http.HandlerFunc {
return func(res http.ResponseWriter, req *http.Request) {
// /logs?id=&type=app&start=0&end=0&limit=50
query := req.URL.Query()

host := query.Get("id")
tag := query.Get("tag")
tag := query["tag"]

kind := query.Get("type")
if kind == "" {
Expand Down
3 changes: 3 additions & 0 deletions boxfile.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ deploy.config:
- go get github.com/nanopack/mist
- cp $(which mist) $APP_DIR/mist
- cp $CODE_DIR/config.json $APP_DIR/config.json
- cp $CODE_DIR/example-narc.conf $APP_DIR/narc.conf

data.storage:
image: nanobox/unfs
Expand All @@ -31,3 +32,5 @@ web.logvac:
start:
mist: './mist --server --listeners "tcp://0.0.0.0:1445"'
logvac: './logvac -c config.json'
debug-mist: './mist subscribe --tags log' # for debugging, send logs to `/var/log/gonano/logvac/current` to view in `deploy dry-run` output
debug-narc: 'narcd narc.conf' # for debugging, send logs to `/var/log/gonano/logvac/current` to view in `deploy dry-run` output
2 changes: 1 addition & 1 deletion collector/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func GenerateHttpCollector() http.HandlerFunc {
// keep body as "message" and make up priority
msg.Content = string(body)
msg.Priority = 2
msg.Tag = "http-raw"
msg.Tag = []string{"http-raw"}
}

if msg.Type == "" {
Expand Down
7 changes: 5 additions & 2 deletions collector/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ func parseMessage(b []byte) (msg logvac.Message) {
// config.Log.Trace("Parsed data: %s", parsedData)
msg.Time = time.Now()
msg.UTime = msg.Time.UnixNano()
msg.Id = parsedData["hostname"].(string)
msg.Tag = parsedData["tag"].(string)
// if setting multiple tags in id, set hostname first
tTag := strings.Split(parsedData["hostname"].(string), ",")
msg.Id = tTag[0]
// combine all id's (split on ',') and add as tags
msg.Tag = append([]string{parsedData["tag"].(string)}, tTag...)
msg.Priority = adjust[parsedData["severity"].(int)] // parser guarantees [0,7]
msg.Content = parsedData["content"].(string)
return
Expand Down
19 changes: 16 additions & 3 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,26 @@ type (
Trace(string, ...interface{})
}

// DEPRECATED: Use Message. OldMessage defines the structure of an old log message
// I did what I hate most about docker, changed an exported struct definition. Sorry any client
// using this.. at least I left it around?
OldMessage struct {
Time time.Time `json:"time"`
UTime int64 `json:"utime"`
Id string `json:"id"` // If setting multiple tags in id (syslog), set hostname first
Tag string `json:"tag"`
Type string `json:"type"` // Can be set if logs are submitted via http (deploy logs)
Priority int `json:"priority"`
Content string `json:"message"`
}

// Message defines the structure of a log message
Message struct {
Time time.Time `json:"time"`
UTime int64 `json:"utime"`
Id string `json:"id"` // ignoreifempty?
Tag string `json:"tag"` // ignoreifempty? // []string?
Type string `json:"type"`
Id string `json:"id"` // ignoreifempty? // If setting multiple tags in id (syslog), set hostname first
Tag []string `json:"tag"` // ignoreifempty?
Type string `json:"type"` // Can be set if logs are submitted via http (deploy logs)
Priority int `json:"priority"`
Content string `json:"message"`
}
Expand Down
2 changes: 1 addition & 1 deletion core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestAddDrain(t *testing.T) {
Time: time.Now(),
UTime: time.Now().UnixNano(),
Id: "myhost",
Tag: "test[drains]",
Tag: []string{"test[drains]"},
Type: "app",
Priority: 4,
Content: "This is quite important",
Expand Down
127 changes: 90 additions & 37 deletions drain/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ func NewBoltArchive(path string) (*BoltArchive, error) {
}

// Init initializes the archiver drain
func (a BoltArchive) Init() error {
func (a *BoltArchive) Init() error {
// add drain
logvac.AddDrain("historical", a.Write)

return nil
}

// Close closes the bolt db
func (a BoltArchive) Close() {
func (a *BoltArchive) Close() {
a.db.Close()
}

// Slice returns a slice of logs based on the name, offset, limit, and log-level
func (a BoltArchive) Slice(name, host, tag string, offset, end, limit int64, level int) ([]logvac.Message, error) {
func (a *BoltArchive) Slice(name, host string, tag []string, offset, end, limit int64, level int) ([]logvac.Message, error) {
var messages []logvac.Message

err := a.db.View(func(tx *bolt.Tx) error {
Expand Down Expand Up @@ -105,20 +105,51 @@ func (a BoltArchive) Slice(name, host, tag string, offset, end, limit int64, lev
// todo: make limit be len(bucket)? if limit < 0
for ; k != nil && limit > 0; k, v = c.Prev() {
msg := logvac.Message{}
if err := json.Unmarshal(v, &msg); err != nil {
return fmt.Errorf("Couldn't unmarshal message - %s", err)
}
oMsg := logvac.OldMessage{} // old message (type has changed for multi-tenancy)
// if specified end is reached, be done
if string(k) == final.String() {
limit = 0
}

// unmarshal to check if match.. seems expensive
if err := json.Unmarshal(v, &msg); err != nil {
// for backwards compatibility (needed for approx 2 weeks only until old logs get cleaned up)
if err2 := json.Unmarshal(v, &oMsg); err2 != nil {
return fmt.Errorf("Couldn't unmarshal message - %s - %s", err, err2)
}
// convert old message to new message for saving
msg.Time = oMsg.Time
msg.UTime = oMsg.UTime
msg.Id = oMsg.Id
msg.Tag = []string{oMsg.Tag}
msg.Type = oMsg.Type
msg.Priority = oMsg.Priority
msg.Content = oMsg.Content

// return fmt.Errorf("Couldn't unmarshal message - %s", err)
}

if msg.Priority >= level {
if msg.Id == host || host == "" {
// todo: negate here if tag[0] == "!"
if msg.Tag == tag || tag == "" {
if host == "" || msg.Id == host {
// todo: negate here if tag starts with "!"
if len(tag) == 0 {
limit--

// prepend messages with new message (display newest last)
messages = append([]logvac.Message{msg}, messages...)
} else {
for x := range msg.Tag {
for y := range tag {
if tag[y] == "" || msg.Tag[x] == tag[y] {
limit--

// prepend messages with new message (display newest last)
messages = append([]logvac.Message{msg}, messages...)

return nil
}
}
}
}
}
}
Expand All @@ -135,7 +166,7 @@ func (a BoltArchive) Slice(name, host, tag string, offset, end, limit int64, lev
}

// Write writes the message to database
func (a BoltArchive) Write(msg logvac.Message) {
func (a *BoltArchive) Write(msg logvac.Message) {
config.Log.Trace("Bolt archive writing...")
err := a.db.Batch(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists([]byte(msg.Type))
Expand Down Expand Up @@ -166,9 +197,10 @@ func (a BoltArchive) Write(msg logvac.Message) {
}

// Expire cleans up old logs by date or volume of logs
func (a BoltArchive) Expire() {
func (a *BoltArchive) Expire() {
// if log-keep is "" expire is disabled
if config.LogKeep == "" {
config.Log.Debug("Log expiration disabled")
return
}

Expand All @@ -183,28 +215,33 @@ func (a BoltArchive) Expire() {
config.CleanFreq = 60
}

config.Log.Trace("LogKeep - %v; CleanFreq - %d", logKeep, config.CleanFreq)

// clean up every minute // todo: maybe 5mins?
tick := time.Tick(time.Duration(config.CleanFreq) * time.Second)

r, _ := regexp.Compile("([0-9]+)([a-za-z]+)")
var (
NANO_MIN int64 = 60000000000
NANO_HOUR int64 = NANO_MIN * 60
NANO_DAY int64 = NANO_HOUR * 24
NANO_WEEK int64 = NANO_DAY * 7
NANO_YEAR int64 = NANO_WEEK * 52
duration int64 = NANO_WEEK * 2
NANO_SEC int64 = NANO_MIN / 60
)

for {
select {
case <-tick:
for bucketName, saveAmt := range logKeep { // todo: maybe rather/also loop through buckets
config.Log.Trace("bucketName - %s; saveAmt - %v", bucketName, saveAmt)
switch saveAmt.(type) {
case string:
var expireTime = time.Now().UnixNano()

r, _ := regexp.Compile("([0-9]+)([a-zA-Z]+)")
var (
NANO_MIN int64 = 60000000000
NANO_HOUR int64 = NANO_MIN * 60
NANO_DAY int64 = NANO_HOUR * 24
NANO_WEEK int64 = NANO_DAY * 7
NANO_YEAR int64 = NANO_WEEK * 52
duration int64 = NANO_WEEK * 2
NANO_SEC int64 = NANO_MIN / 60
)

match := r.FindStringSubmatch(saveAmt.(string)) // "2w"
config.Log.Trace("SaveAmt - %v, match - %v", saveAmt, match)
if len(match) == 3 {
number, err := strconv.ParseInt(match[1], 0, 64)
if err != nil {
Expand All @@ -213,16 +250,22 @@ func (a BoltArchive) Expire() {
}
switch match[2] {
case "s": // second // for testing
config.Log.Debug("Keeping logs for %d seconds", number)
duration = NANO_SEC * number
case "m": // minute
config.Log.Debug("Keeping logs for %d minutes", number)
duration = NANO_MIN * number
case "h": // hour
config.Log.Debug("Keeping logs for %d hours", number)
duration = NANO_HOUR * number
case "d": // day
config.Log.Debug("Keeping logs for %d days", number)
duration = NANO_DAY * number
case "w": // week
config.Log.Debug("Keeping logs for %d weeks", number)
duration = NANO_WEEK * number
case "y": // year
config.Log.Debug("Keeping logs for %d years", number)
duration = NANO_YEAR * number
default: // 2 weeks
config.Log.Debug("Keeping '%s' logs for 2 weeks", bucketName)
Expand All @@ -231,42 +274,50 @@ func (a BoltArchive) Expire() {
}

expireTime = expireTime - duration
eTime := &bytes.Buffer{}
if err := binary.Write(eTime, binary.BigEndian, expireTime); err != nil {
config.Log.Error("Failed to convert expire time to binary - %s", err.Error())
continue
}

config.Log.Debug("Starting age cleanup batch...")
a.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
config.Log.Trace("No logs of type '%s' found", bucketName)
config.Log.Debug("No logs of type '%s' found", bucketName)
return fmt.Errorf("No logs of type '%s' found", bucketName)
}

c := bucket.Cursor()

var err error

// loop through and remove outdated logs
for k, v := c.First(); k != nil; k, v = c.Next() {
var logMessage logvac.Message
err := json.Unmarshal([]byte(v), &logMessage)
if err != nil {
config.Log.Fatal("Bad JSON syntax in log message - %s", err)
}
if logMessage.UTime < expireTime {
for k, _ := c.First(); k != nil; k, _ = c.Next() {
// if logMessage.UTime < expireTime {
if bytes.Compare(k, eTime.Bytes()) == -1 {
config.Log.Trace("Deleting expired log of type '%s'...", bucketName)
err = c.Delete()
if err != nil {
config.Log.Trace("Failed to delete expired log - %s", err)
config.Log.Debug("Failed to delete expired log - %s", err)
}
config.Log.Trace("Deleted log")
} else { // don't continue looping through newer logs (resource/file-lock hog)
config.Log.Trace("Done with old logs")
break
}
}

config.Log.Trace("=======================================")
config.Log.Trace("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Trace("=======================================")
config.Log.Debug("=======================================")
config.Log.Debug("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Debug("=======================================")
return nil
})
config.Log.Trace("Done defining batch")
case float64, int:
records := int(saveAmt.(float64)) // assertion is slow, do it once (casting is fast)

config.Log.Debug("Starting record cleanup batch...")
a.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
Expand All @@ -292,17 +343,19 @@ func (a BoltArchive) Expire() {
}
}

config.Log.Trace("=======================================")
config.Log.Trace("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Trace("=======================================")
config.Log.Debug("=======================================")
config.Log.Debug("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Debug("=======================================")
return nil
})
default:
// todo: we should pre-parse these values and exit on startup, not x minutes into running
config.Log.Fatal("Bad log-keep value")
os.Exit(1)
}
} // range logKeep
case <-a.Done:
config.Log.Debug("Done recieved on channel. (Cleanup halting)")
return
}
}
Expand Down
10 changes: 5 additions & 5 deletions drain/boltdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestWrite(t *testing.T) {
Time: time.Now(),
UTime: time.Now().UnixNano(),
Id: "myhost",
Tag: "test[bolt]",
Tag: []string{"test[bolt]"},
Type: "app",
Priority: 4,
Content: "This is a test message",
Expand All @@ -49,7 +49,7 @@ func TestWrite(t *testing.T) {
Time: time.Now(),
UTime: time.Now().UnixNano(),
Id: "myhost",
Tag: "test[expire]",
Tag: []string{"test[expire]"},
Type: "deploy",
Priority: 4,
Content: "This is another test message",
Expand All @@ -60,7 +60,7 @@ func TestWrite(t *testing.T) {
drain.Archiver.Write(messages[1])

// test successful write
appMsgs, err := drain.Archiver.Slice("app", "", "", 0, 0, 100, 0)
appMsgs, err := drain.Archiver.Slice("app", "", []string{""}, 0, 0, 100, 0)
if err != nil {
t.Error(err)
t.FailNow()
Expand All @@ -81,7 +81,7 @@ func TestExpire(t *testing.T) {
drain.Archiver.(*drain.BoltArchive).Done <- true

// test successful clean
appMsgs, err := drain.Archiver.Slice("app", "", "", 0, 0, 100, 0)
appMsgs, err := drain.Archiver.Slice("app", "", []string{""}, 0, 0, 100, 0)
if err != nil {
t.Error(err)
t.FailNow()
Expand All @@ -93,7 +93,7 @@ func TestExpire(t *testing.T) {
}

// test successful clean
depMsgs, err := drain.Archiver.Slice("deploy", "", "", 0, 0, 100, 0)
depMsgs, err := drain.Archiver.Slice("deploy", "", []string{""}, 0, 0, 100, 0)
if err != nil {
t.Error(err)
t.FailNow()
Expand Down
Loading

0 comments on commit 42b3fdc

Please sign in to comment.