Skip to content

Commit ba3b713

Browse files
committed
Ainda repensando comportamento, porém as métricas parecem ok
1 parent 2150929 commit ba3b713

File tree

1 file changed

+103
-82
lines changed

1 file changed

+103
-82
lines changed

clustergo/main.go

+103-82
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"time"
1212

1313
"github.com/agoussia/godes"
14+
"github.com/gcinterceptor/gci-simulator/clustergo/interval"
1415
)
1516

1617
var (
@@ -55,48 +56,60 @@ func main() {
5556
s.terminate()
5657
}
5758
godes.WaitUntilDone()
59+
finishTime := godes.GetSystemTime()
5860
fmt.Printf("NSERVERS: %d\n", lb.nServers)
59-
fmt.Printf("Proc: %f\n", lb.nRequests)
60-
fmt.Printf("EvI: %f\n", lb.nReqEvictedByInstance)
61-
fmt.Printf("EvS: %f\n", lb.nReqEvictedByService)
62-
fmt.Printf("Succ: %f\n", lb.nSucc)
63-
fmt.Printf("Succ Ratio: %.4f\n", lb.nSucc/lb.nRequests)
64-
fmt.Printf("PVN: %.4f\n", lb.nReqEvictedByService/lb.nRequests)
65-
fmt.Printf("PCP: %.4f\n", lb.nReqEvictedByInstance/lb.nRequests)
61+
fmt.Printf("FINISH TIME: %f\n", finishTime)
62+
63+
var nProc int64
64+
var unav []interval.LimitSet
65+
for _, s := range servers {
66+
unav = append(unav, s.unavIntervals)
67+
nProc += s.nProc
68+
fmt.Printf("SERVER: %d UPTIME:%f NPROC:%d PROCTIME:%f UNAVTIME:%f\n", s.id, s.uptime, s.nProc, s.procTime, s.unavTime)
69+
}
70+
fmt.Printf("NPROC: %d\n", nProc)
71+
72+
unavTime := float64(0)
73+
union := interval.Unite(unav...)
74+
for _, i := range union.Limits {
75+
unavTime += i.End - i.Start
76+
}
77+
fmt.Printf("UNITED UNAVAILABILITY:%f PROB:%f\n", unavTime, unavTime/finishTime)
78+
79+
var msUnav float64
80+
intersect := interval.Intersect(unav...)
81+
for _, i := range intersect {
82+
if len(i.Participants) == len(servers) {
83+
for _, l := range i.Limits {
84+
msUnav += l.End - l.Start
85+
}
86+
}
87+
}
88+
fmt.Printf("MICROSERVICE UNAVAILABILITY:%f %f\n", msUnav, msUnav/finishTime)
6689
}
6790

6891
type loadBalancer struct {
6992
*godes.Runner
70-
replicaQueue *godes.FIFOQueue
71-
next int
72-
nServers int
73-
isTerminated bool
74-
nRequests float64
75-
nSucc float64
76-
nReqEvictedByInstance float64
77-
nReqEvictedByService float64
93+
servers []*server
94+
next int
95+
nServers int
96+
isTerminated bool
7897
}
7998

8099
func (lb *loadBalancer) schedule(r *request) {
81-
// Ignore requests when there is no available replica
82-
if lb.replicaQueue.Len() == 0 {
83-
return
84-
}
85-
lb.replicaQueue.Get().(*server).newRequest(r)
100+
lb.next = (lb.next + 1) % len(lb.servers)
101+
lb.servers[lb.next].newRequest(r)
86102
}
87103

88104
func (lb *loadBalancer) reqFinished(s *server, r *request) {
89105
// Sending server back to the availability queue.
90106
switch {
91107
case r.status == 200:
92108
fmt.Printf("%d,%d,%.1f,%.4f,%d,%v\n", r.id, r.status, r.latency, r.ts, len(r.hops), r.hops)
93-
lb.nSucc++
94109
case r.status == 503:
95-
lb.nReqEvictedByInstance++
96110
if len(r.hops) < lb.nServers {
97111
lb.schedule(r)
98112
} else {
99-
lb.nReqEvictedByService++
100113
fmt.Printf("%d,%d,%.1f,%.4f,%d,%v\n", r.id, r.status, r.latency, r.ts, len(r.hops), r.hops)
101114
}
102115
default:
@@ -115,7 +128,6 @@ func (lb *loadBalancer) Run() {
115128
for {
116129
arrivalCond.Wait(true)
117130
if arrivalQueue.Len() > 0 {
118-
lb.nRequests++
119131
lb.schedule(arrivalQueue.Get().(*request))
120132
}
121133
if lb.isTerminated && arrivalQueue.Len() == 0 {
@@ -128,66 +140,73 @@ func (lb *loadBalancer) Run() {
128140
}
129141

130142
func newLoadBalancer(servers []*server) *loadBalancer {
131-
lb := &loadBalancer{
132-
Runner: &godes.Runner{},
133-
next: 0,
134-
isTerminated: false,
135-
nRequests: 0,
136-
nReqEvictedByService: 0,
137-
nSucc: 0,
138-
nReqEvictedByInstance: 0}
139-
q := godes.NewFIFOQueue("serverQueue")
140-
for _, s := range servers {
141-
s.lb = lb
142-
q.Place(s)
143-
}
144-
lb.replicaQueue = q
145-
lb.nServers = q.Len()
146-
return lb
143+
return &loadBalancer{
144+
Runner: &godes.Runner{},
145+
next: 0,
146+
isTerminated: false,
147+
servers: servers}
147148
}
148149

149150
type server struct {
150151
*godes.Runner
151-
id int
152-
entries []inputEntry
153-
index int
154-
cond *godes.BooleanControl
155-
queue *godes.FIFOQueue
156-
lb *loadBalancer
157-
isTerminated bool
158-
unavailable *godes.BooleanControl
152+
id int
153+
entries []inputEntry
154+
index int
155+
cond *godes.BooleanControl
156+
queue *godes.FIFOQueue
157+
lb *loadBalancer
158+
isTerminated bool
159+
unavailable *godes.BooleanControl
160+
unavIntervals interval.LimitSet
161+
uptime float64
162+
unavTime float64
163+
procTime float64
164+
nProc int64
159165
}
160166

161167
func (s *server) Run() {
162168
for {
163169
s.cond.Wait(true)
170+
if s.isTerminated {
171+
break
172+
}
164173
if s.queue.Len() > 0 {
165-
// Processing request.
166-
167-
duration, status := s.next()
168-
r := s.queue.Get().(*request)
169-
r.latency += duration
170-
r.status = status
171-
r.ts = godes.GetSystemTime()
172-
r.hops = append(r.hops, hop{serverID: s.id, duration: duration, status: status})
173-
174-
// Advancing simulation time.
175-
godes.Advance(duration)
176-
177-
if r.status == 200 {
178-
s.lb.replicaQueue.Place(s)
179-
s.lb.reqFinished(s, r) // Sending the request back to the loadbalancer.
180-
} else {
181-
s.lb.reqFinished(s, r) // Sending the request back to the loadbalancer.
182-
fmt.Println("starting unavailability", godes.GetSystemTime())
183-
s.unavailable.WaitAndTimeout(true, r.latency) // Only comes back to the queue after the duration period.
184-
s.lb.replicaQueue.Place(s)
185-
fmt.Println("backing to the queue", godes.GetSystemTime())
186-
}
174+
func() {
175+
r := s.queue.Get().(*request)
187176

188-
}
189-
if s.isTerminated && arrivalQueue.Len() == 0 {
190-
break
177+
if s.unavailable.GetState() {
178+
r.hops = append(r.hops, hop{serverID: s.id, duration: 0, status: 503})
179+
s.lb.reqFinished(s, r)
180+
return
181+
}
182+
183+
duration, status := s.next()
184+
185+
// Unavailability mark found.
186+
if status == 503 {
187+
s.unavailable.Set(true)
188+
st := godes.GetSystemTime()
189+
s.unavailable.WaitAndTimeout(false, duration) // Only comes back to the queue after the duration period.
190+
s.unavailable.Set(false)
191+
192+
// metrics
193+
s.unavIntervals.Limits = append(s.unavIntervals.Limits, interval.Limit{Start: st, End: st + r.latency})
194+
s.unavTime += r.latency
195+
return
196+
}
197+
198+
// All good, process request
199+
r.latency += duration
200+
r.status = status
201+
r.ts = godes.GetSystemTime()
202+
r.hops = append(r.hops, hop{serverID: s.id, duration: duration, status: 200})
203+
godes.Advance(duration)
204+
s.lb.reqFinished(s, r)
205+
206+
// metrics
207+
s.nProc++
208+
s.procTime += r.latency
209+
}()
191210
}
192211
if s.queue.Len() == 0 {
193212
s.cond.Set(false)
@@ -207,8 +226,9 @@ func (s *server) next() (float64, int) {
207226
}
208227

209228
func (s *server) terminate() {
210-
s.cond.Set(true)
229+
s.uptime = godes.GetSystemTime()
211230
s.isTerminated = true
231+
s.cond.Set(true)
212232
}
213233

214234
func newServer(p string, id int) (*server, error) {
@@ -241,14 +261,15 @@ func newServer(p string, id int) (*server, error) {
241261
}
242262
}
243263
return &server{
244-
Runner: &godes.Runner{},
245-
id: id,
246-
entries: entries,
247-
index: 0,
248-
cond: godes.NewBooleanControl(),
249-
queue: godes.NewFIFOQueue(fmt.Sprintf("server%d", id)),
250-
unavailable: godes.NewBooleanControl(),
251-
isTerminated: false}, nil
264+
Runner: &godes.Runner{},
265+
id: id,
266+
entries: entries,
267+
index: 0,
268+
cond: godes.NewBooleanControl(),
269+
queue: godes.NewFIFOQueue(fmt.Sprintf("server%d", id)),
270+
unavailable: godes.NewBooleanControl(),
271+
isTerminated: false,
272+
unavIntervals: interval.LimitSet{ID: id}}, nil
252273
}
253274

254275
func toEntry(row []string) (float64, inputEntry, error) {

0 commit comments

Comments
 (0)