Skip to content

Commit 7e1c08e

Browse files
authored
feat: refactor consumer, add runtime metrics (#302)
1 parent 22f712a commit 7e1c08e

File tree

7 files changed

+367
-226
lines changed

7 files changed

+367
-226
lines changed

consumer/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type LogHubConfig struct {
5151
//:param AutoCommitIntervalInSec: default auto commit interval, default is 30
5252
//:param AuthVersion: signature algorithm version, default is sls.AuthV1
5353
//:param Region: region of sls endpoint, eg. cn-hangzhou, region must be set if AuthVersion is sls.AuthV4
54+
//:param DisableRuntimeMetrics: disable runtime metrics, runtime metrics prints to local log.
5455
Endpoint string
5556
AccessKeyID string
5657
AccessKeySecret string
@@ -81,6 +82,7 @@ type LogHubConfig struct {
8182
AutoCommitIntervalInMS int64
8283
AuthVersion sls.AuthVersionType
8384
Region string
85+
DisableRuntimeMetrics bool
8486
}
8587

8688
const (

consumer/shard_monitor.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package consumerLibrary
2+
3+
import (
4+
"fmt"
5+
"math"
6+
"time"
7+
8+
"go.uber.org/atomic"
9+
10+
sls "github.com/aliyun/aliyun-log-go-sdk"
11+
"github.com/go-kit/kit/log"
12+
)
13+
14+
type MonitorMetrics struct {
15+
fetchReqFailedCount atomic.Int64
16+
logRawSize atomic.Int64
17+
fetchLogHistogram TimeHistogram // in us
18+
19+
processFailedCount atomic.Int64
20+
processHistogram TimeHistogram // in us
21+
}
22+
23+
type ShardMonitor struct {
24+
shard int
25+
reportInterval time.Duration
26+
lastReportTime time.Time
27+
metrics atomic.Value // *MonitorMetrics
28+
}
29+
30+
func newShardMonitor(shard int, reportInterval time.Duration) *ShardMonitor {
31+
monitor := &ShardMonitor{
32+
shard: shard,
33+
reportInterval: reportInterval,
34+
lastReportTime: time.Now(),
35+
}
36+
monitor.metrics.Store(&MonitorMetrics{})
37+
return monitor
38+
}
39+
40+
func (m *ShardMonitor) RecordFetchRequest(plm *sls.PullLogMeta, err error, start time.Time) {
41+
metrics := m.metrics.Load().(*MonitorMetrics)
42+
if err != nil {
43+
metrics.fetchReqFailedCount.Inc()
44+
} else {
45+
metrics.logRawSize.Add(int64(plm.RawSize))
46+
}
47+
metrics.fetchLogHistogram.AddSample(float64(time.Since(start).Microseconds()))
48+
}
49+
50+
func (m *ShardMonitor) RecordProcess(err error, start time.Time) {
51+
metrics := m.metrics.Load().(*MonitorMetrics)
52+
if err != nil {
53+
metrics.processFailedCount.Inc()
54+
}
55+
metrics.processHistogram.AddSample(float64(time.Since(start).Microseconds()))
56+
}
57+
58+
func (m *ShardMonitor) getAndResetMetrics() *MonitorMetrics {
59+
// we dont need cmp and swap, only one thread would call m.metrics.Store
60+
old := m.metrics.Load().(*MonitorMetrics)
61+
m.metrics.Store(&MonitorMetrics{})
62+
return old
63+
}
64+
65+
func (m *ShardMonitor) shouldReport() bool {
66+
return time.Since(m.lastReportTime) >= m.reportInterval
67+
}
68+
69+
func (m *ShardMonitor) reportByLogger(logger log.Logger) {
70+
m.lastReportTime = time.Now()
71+
metrics := m.getAndResetMetrics()
72+
logger.Log("msg", "report status",
73+
"fetchFailed", metrics.fetchReqFailedCount.Load(),
74+
"logRawSize", metrics.logRawSize.Load(),
75+
"processFailed", metrics.processFailedCount.Load(),
76+
"fetch", metrics.fetchLogHistogram.String(),
77+
"process", metrics.processHistogram.String(),
78+
)
79+
}
80+
81+
type TimeHistogram struct {
82+
Count atomic.Int64
83+
Sum atomic.Float64
84+
SumSquare atomic.Float64
85+
}
86+
87+
func (h *TimeHistogram) AddSample(v float64) {
88+
h.Count.Inc()
89+
h.Sum.Add(v)
90+
h.SumSquare.Add(v * v)
91+
}
92+
93+
func (h *TimeHistogram) String() string {
94+
avg := h.Avg()
95+
stdDev := h.StdDev()
96+
count := h.Count.Load()
97+
return fmt.Sprintf("{avg: %.1fus, stdDev: %.1fus, count: %d}", avg, stdDev, count)
98+
}
99+
100+
func (h *TimeHistogram) Avg() float64 {
101+
count := h.Count.Load()
102+
if count == 0 {
103+
return 0
104+
}
105+
return h.Sum.Load() / float64(count)
106+
}
107+
108+
func (h *TimeHistogram) StdDev() float64 {
109+
count := h.Count.Load()
110+
if count < 2 {
111+
return 0
112+
}
113+
div := float64(count * (count - 1))
114+
num := (float64(count) * h.SumSquare.Load()) - math.Pow(h.Sum.Load(), 2)
115+
return math.Sqrt(num / div)
116+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package consumerLibrary
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
sls "github.com/aliyun/aliyun-log-go-sdk"
8+
)
9+
10+
// BenchmarkRecordFetchRequest
11+
// BenchmarkRecordFetchRequest-12 29816072 40.05 ns/op 0 B/op 0 allocs/op
12+
func BenchmarkRecordFetchRequest(b *testing.B) {
13+
shardMonitor := newShardMonitor(1, time.Second)
14+
start := time.Now()
15+
plm := &sls.PullLogMeta{RawSize: 1}
16+
b.ResetTimer()
17+
18+
for i := 0; i < b.N; i++ {
19+
shardMonitor.RecordFetchRequest(plm, nil, start)
20+
}
21+
}
22+
23+
// BenchmarkRecordProcess
24+
// BenchmarkRecordProcess-12 33092797 35.15 ns/op 0 B/op 0 allocs/op
25+
func BenchmarkRecordProcess(b *testing.B) {
26+
shardMonitor := newShardMonitor(1, time.Second)
27+
start := time.Now()
28+
b.ResetTimer()
29+
30+
for i := 0; i < b.N; i++ {
31+
shardMonitor.RecordProcess(nil, start)
32+
}
33+
}

0 commit comments

Comments
 (0)