Skip to content

Commit

Permalink
Resolve memory leak in expire function
Browse files Browse the repository at this point in the history
Resolves #10
  • Loading branch information
glinton committed Feb 3, 2017
1 parent dee08d9 commit a372185
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 35 deletions.
112 changes: 78 additions & 34 deletions drain/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (a BoltArchive) Write(msg logvac.Message) {
}
}

// Expire cleans up old logs
// Expire cleans up old logs by date or volume of logs
func (a BoltArchive) Expire() {
// if log-keep is "" expire is disabled
if config.LogKeep == "" {
Expand All @@ -189,8 +189,8 @@ func (a BoltArchive) Expire() {
for {
select {
case <-tick:
for k, v := range logKeep { // todo: maybe rather/also loop through buckets
switch v.(type) {
for bucketName, saveAmt := range logKeep { // todo: maybe rather/also loop through buckets
switch saveAmt.(type) {
case string:
var expireTime = time.Now().UnixNano()

Expand All @@ -205,7 +205,7 @@ func (a BoltArchive) Expire() {
NANO_SEC int64 = NANO_MIN / 60
)

match := r.FindStringSubmatch(v.(string)) // "2w"
match := r.FindStringSubmatch(saveAmt.(string)) // "2w"
if len(match) == 3 {
number, err := strconv.ParseInt(match[1], 0, 64)
if err != nil {
Expand All @@ -226,63 +226,107 @@ func (a BoltArchive) Expire() {
case "y": // year
duration = NANO_YEAR * number
default: // 2 weeks
config.Log.Debug("Keeping '%s' logs for 2 weeks", k)
config.Log.Debug("Keeping '%s' logs for 2 weeks", bucketName)
duration = NANO_WEEK * 2
}
}

expireTime = expireTime - duration

a.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(k))
a.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
config.Log.Trace("No logs of type '%s' found", k)
return fmt.Errorf("No logs of type '%s' found", k)
config.Log.Trace("No logs of type '%s' found", bucketName)
return fmt.Errorf("No logs of type '%s' found", bucketName)
}

c := bucket.Cursor()

// loop through and remove outdated logs
for kk, vv := c.First(); kk != nil; kk, vv = c.Next() {
for k, v := c.First(); k != nil; k, v = c.Next() {
var logMessage logvac.Message
err := json.Unmarshal([]byte(vv), &logMessage)
err := json.Unmarshal([]byte(v), &logMessage)
if err != nil {
config.Log.Fatal("Bad JSON syntax in log message - %s", err)
}
if logMessage.UTime < expireTime {
config.Log.Trace("Deleting expired log of type '%s'...", k)
config.Log.Trace("Deleting expired log of type '%s'...", bucketName)
err = c.Delete()
if err != nil {
config.Log.Trace("Failed to delete expired log - %s", err)
}
} else { // don't continue looping through newer logs (resource/file-lock hog)
break
}
}

config.Log.Trace("=======================================")
config.Log.Trace("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Trace("=======================================")
return nil
}) // db.Update
case float64:
// todo: maybe View, then Update within and remove only those marked records?
a.db.Update(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(k))
if bucket == nil {
config.Log.Trace("No logs of type '%s' found", k)
return fmt.Errorf("No logs of type '%s' found", k)
}
})
case float64, int:
records := int(saveAmt.(float64)) // assertion is slow, do it once (casting is fast)
if os.Getenv("LOGVAC_SEQUENTIAL") != "" {
// keep this around for smaller datasets. ...though since bucket.Stats() is so nonperformant maybe we should just get rid of it
a.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
config.Log.Trace("No logs of type '%s' found", bucketName)
return fmt.Errorf("No logs of type '%s' found", bucketName)
}

// trim the bucket to size
c := bucket.Cursor()
c.First()
// trim the bucket to size
c := bucket.Cursor()
c.First() // if we ever stop ordering by time (oldest first) we'll need to change this

// loop through and remove extra logs
for key_count := float64(bucket.Stats().KeyN); key_count > v.(float64); key_count-- {
config.Log.Trace("Deleting extra log of type '%s'...", k)
err = c.Delete()
if err != nil {
config.Log.Trace("Failed to delete extra log - %s", err)
// loop through and remove extra logs
for key_count := int(bucket.Stats().KeyN); key_count > records; key_count-- {
config.Log.Trace("Deleting extra log of type '%s'...", bucketName)
err = c.Delete()
if err != nil {
config.Log.Trace("Failed to delete extra log - %s", err)
}
c.Next()
}
c.Next()
}
return nil
}) // db.Update

config.Log.Trace("=======================================")
config.Log.Trace("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Trace("=======================================")
return nil
})
} else {
a.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
config.Log.Trace("No logs of type '%s' found", bucketName)
return fmt.Errorf("No logs of type '%s' found", bucketName)
}

// trim the bucket to size
c := bucket.Cursor()

rSaved := 0
// loop through and remove extra logs
// if we ever stop ordering by time (oldest first) we'll need to change cursor placement
for k, v := c.Last(); k != nil && v != nil; k, v = c.Prev() {
rSaved += 1
// if the number records we've traversed is larger than our limit, delet the current record
if rSaved > records {
config.Log.Trace("Deleting extra log of type '%s'...", bucketName)
err = c.Delete()
if err != nil {
config.Log.Trace("Failed to delete extra log - %s", err)
}
}
}

config.Log.Trace("=======================================")
config.Log.Trace("= DONE CHECKING/DELETING EXPIRED LOGS =")
config.Log.Trace("=======================================")
return nil
})
}
default:
config.Log.Fatal("Bad log-keep value")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ MD5=$(which md5 || which md5sum)

# for versioning
getCurrCommit() {
echo `git rev-parse HEAD | tr -d "[ \r\n\']"`
echo `git rev-parse --short HEAD | tr -d "[ \r\n\']"`
}

# for versioning
Expand Down

0 comments on commit a372185

Please sign in to comment.