Skip to content
This repository has been archived by the owner on Aug 23, 2022. It is now read-only.

Major Optimizations #23

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 3 additions & 17 deletions goforget/decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,41 +7,27 @@ import (
"time"
)

var MAX_ITER = 1000

func Poisson(lambda float64) int {
if lambda == 0.0 {
return 0
}
e := math.Exp(-1.0 * lambda)
if e < 1e-8 {
return math.MaxInt32
}

counter := MAX_ITER
r := rand.Float64()
k := int(0)
p := e
for p < r {
k += 1
e *= lambda / float64(k)
p += e
if counter == 0 {
return -1
}
}
return k
}

func Decay(count, Z, t int, rate float64) int {
return DecayTime(count, Z, t, rate, time.Now())
func Decay(Z, t int, rate float64) int {
return DecayTime(Z, t, rate, time.Now())
}

func DecayTime(count, Z, t int, rate float64, now time.Time) int {
if count < 1 {
return 0.0
}

func DecayTime(Z, t int, rate float64, now time.Time) int {
dt := int(now.Unix()) - t

lambda := rate * float64(dt)
Expand Down
62 changes: 42 additions & 20 deletions goforget/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,30 @@ func (vm ValueMap) MarshalJSON() ([]byte, error) {
}

type Distribution struct {
Name string `json:"distribution"`
Z int `json:"Z"`
T int
Data ValueMap `json:"data"`
Rate float64 `json:"rate"`
Prune bool `json:"prune"`
Name string `json:"distribution"`
Z int `json:"Z"`
T int
Data ValueMap `json:"data"`
Rate float64 `json:"rate"`
Prune bool `json:"prune"`
LastSyncT int `json:"last_sync_time"`
numEntries int

isFull bool
hasDecayed bool
}

func (d *Distribution) GetNMostProbable(N int) error {
data, err := GetNMostProbable(d.Name, N)
if err != nil || len(data) != 3 {
if err != nil || len(data) != 4 {
return fmt.Errorf("Could not fetch data for %s: %s", d.Name, err)
}

d.Z, _ = redis.Int(data[1], nil)
d.T, _ = redis.Int(data[2], nil)
d.numEntries, _ = redis.Int(data[1], nil)
d.Z, _ = redis.Int(data[2], nil)
d.T, _ = redis.Int(data[3], nil)
d.Data = make(map[string]*Value)
d.LastSyncT = d.T

d.addMultiBulkCounts(data[0])
return nil
Expand All @@ -57,15 +61,15 @@ func (d *Distribution) GetField(fields ...string) error {
data, err := GetField(d.Name, fields...)

N := len(fields)
if err != nil || len(data) != 2+N {
if err != nil || len(data) != 3+N {
return fmt.Errorf("Could not retrieve field")
}

Z, _ := redis.Int(data[N], nil)
T, _ := redis.Int(data[N+1], nil)
d.numEntries, _ = redis.Int(data[N], nil)
d.Z, _ = redis.Int(data[N+1], nil)
d.T, _ = redis.Int(data[N+2], nil)
d.LastSyncT = d.T

d.Z = Z
d.T = T
d.Data = make(map[string]*Value)
var count int
for i, field := range fields {
Expand All @@ -91,16 +95,18 @@ func (d *Distribution) Fill() error {
log.Printf("Could not read _T from distribution %s: %s", d.Name, err)
}
d.T = T
d.LastSyncT = d.T

// TODO: don't use the dist map to speed things up!
d.Data = make(map[string]*Value)
d.Rate = *defaultRate
d.numEntries = len(data)

d.addMultiBulkCounts(data[1])
d.isFull = true

d.Normalize()
d.calcProbabilities()

d.isFull = true
return nil
}

Expand Down Expand Up @@ -151,25 +157,41 @@ func (d *Distribution) calcProbabilities() {
}

func (d *Distribution) Decay() {
if len(d.Data) == 0 {
return
}

startingZ := d.Z
now := time.Now()
for k, v := range d.Data {
l := DecayTime(v.Count, d.Z, d.T, d.Rate, now)
Z := 0
sumDecay := 0
for k, _ := range d.Data {
l := DecayTime(d.Z, d.T, d.Rate, now)
if l >= d.Data[k].Count {
if d.Prune {
l = d.Data[k].Count
} else {
l = d.Data[k].Count - 1
}
}
sumDecay += l
d.Data[k].Count -= l
d.Z -= l
Z += d.Data[k].Count
}
if d.isFull {
d.Z = Z
} else {
d.Z -= (sumDecay) / len(d.Data) * d.numEntries
if d.Z < 0 {
d.Z = 0
}
}

if !d.hasDecayed && startingZ != d.Z {
d.hasDecayed = true
}

d.T = int(time.Now().Unix())
d.T = int(now.Unix())

d.calcProbabilities()
}
20 changes: 10 additions & 10 deletions goforget/forget.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,16 @@ import (
)

var (
VERSION = "0.4.5"
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http", ":8080", "HTTP service address (e.g., ':8080')")
redisHost = flag.String("redis-host", "", "Redis host in the form host:port:db.")
redisUri = flag.String("redis-uri", "", "Redis URI in the form redis://:password@hostname:port/db_number")
defaultRate = flag.Float64("default-rate", 0.5, "Default rate to decay distributions with")
nWorkers = flag.Int("nworkers", 1, "Number of update workers that update the redis DB")
pruneDist = flag.Bool("prune", true, "Whether or not to decay distributional fields out")
expirSigma = flag.Float64("expire-sigma", 2, "Confidence level that a distribution will be empty when set to expire")
VERSION = "0.4.6"
showVersion = flag.Bool("version", false, "print version string")
httpAddress = flag.String("http", ":8080", "HTTP service address (e.g., ':8080')")
redisHost = flag.String("redis-host", "", "Redis host in the form host:port:db.")
redisUri = flag.String("redis-uri", "", "Redis URI in the form redis://:password@hostname:port/db_number")
defaultRate = flag.Float64("default-rate", 0.5, "Default rate to decay distributions with")
nWorkers = flag.Int("nworkers", 1, "Number of update workers that update the redis DB")
UpdateOutputTime = flag.Int("status-time", 60, "Time in seconds between redis update status output")
pruneDist = flag.Bool("prune", true, "Whether or not to decay distributional fields out")
expirSigma = flag.Float64("expire-sigma", 2, "Confidence level that a distribution will be empty when set to expire")
)

var updateChan chan *Distribution
Expand Down Expand Up @@ -232,7 +233,6 @@ func main() {
}

rand.Seed(time.Now().UnixNano())
redisServer = NewRedisServer(*redisHost, *nWorkers*2)
if *redisUri != "" {
// if a redis URI exists was specified, parse it
redisServer = NewRedisServerFromUri(*redisUri)
Expand Down
36 changes: 28 additions & 8 deletions goforget/redis_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (rs *RedisServer) Connect(maxIdle int) {
log.Fatal("Could not connect to Redis!")
}
conn.Close()

log.Println("Connected to redis")
}

func (rs *RedisServer) connectPool(maxIdle int) {
Expand Down Expand Up @@ -123,15 +123,28 @@ func (rs *RedisServer) connectPool(maxIdle int) {

func UpdateRedis(readChan chan *Distribution, id int) error {
var redisConn redis.Conn
var now int
lastStatusTime := int(time.Now().Unix())
updateCount := 0
for dist := range readChan {
log.Printf("[%d] Updating distribution: %s", id, dist.Name)

redisConn = redisServer.GetConnection()
err := UpdateDistribution(redisConn, dist)
if err != nil {
log.Printf("[%d] Failed to update: %s: %v: %s", id, dist.Name, redisConn.Err(), err.Error())
// Only do a update if we have all the data necissary or we expect
// there to be a decay event
now = int(time.Now().Unix())
if dist.Full() || float64(now-dist.LastSyncT)*dist.Rate > 0.75 {
redisConn = redisServer.GetConnection()
err := UpdateDistribution(redisConn, dist)
if err != nil {
log.Printf("[%d] Failed to update: %s: %v: %s", id, dist.Name, redisConn.Err(), err.Error())
}
updateCount += 1
if now-lastStatusTime > *UpdateOutputTime {
rate := float64(updateCount) / float64(now-lastStatusTime)
log.Printf("[%d] Performing redis updates at %e updates/second", id, rate)
lastStatusTime = now
updateCount = 0
}
redisConn.Close()
}
redisConn.Close()
}
return nil
}
Expand Down Expand Up @@ -197,11 +210,13 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) error {

func GetField(distribution string, fields ...string) ([]interface{}, error) {
rdb := redisServer.GetConnection()
defer rdb.Close()

rdb.Send("MULTI")
for _, field := range fields {
rdb.Send("ZSCORE", distribution, field)
}
rdb.Send("ZCARD", distribution)
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z"))
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
data, err := redis.MultiBulk(rdb.Do("EXEC"))
Expand All @@ -210,9 +225,11 @@ func GetField(distribution string, fields ...string) ([]interface{}, error) {

func GetNMostProbable(distribution string, N int) ([]interface{}, error) {
rdb := redisServer.GetConnection()
defer rdb.Close()

rdb.Send("MULTI")
rdb.Send("ZREVRANGEBYSCORE", distribution, "+INF", "-INF", "WITHSCORES", "LIMIT", 0, N)
rdb.Send("ZCARD", distribution)
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z"))
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
data, err := redis.MultiBulk(rdb.Do("EXEC"))
Expand All @@ -221,6 +238,7 @@ func GetNMostProbable(distribution string, N int) ([]interface{}, error) {

func IncrField(distribution string, fields []string, N int) error {
rdb := redisServer.GetConnection()
defer rdb.Close()

rdb.Send("MULTI")
for _, field := range fields {
Expand All @@ -234,6 +252,7 @@ func IncrField(distribution string, fields []string, N int) error {

func GetDistribution(distribution string) ([]interface{}, error) {
rdb := redisServer.GetConnection()
defer rdb.Close()

rdb.Send("MULTI")
rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T"))
Expand All @@ -244,6 +263,7 @@ func GetDistribution(distribution string) ([]interface{}, error) {

func DBSize() (int, error) {
rdb := redisServer.GetConnection()
defer rdb.Close()

data, err := redis.Int(rdb.Do("DBSIZE"))
return data, err
Expand Down