Skip to content

Commit 286e281

Browse files
authored
Unify metrics format (alibaba#1060)
* temp * polish metrics log in sls & add pushgateway decoder * polish metrics log in sls & add pushgateway decoder * polish codes * polish codes * polish codes * polish codes * polish codes * polish codes * polish codes * polish codes * polish codes * fix lint * polish codes * polish codes * polish codes * polish codes * polish codes * fix unittest
1 parent 2c403c7 commit 286e281

38 files changed

+1483
-1719
lines changed

Diff for: core/common/LogtailCommonFlags.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -94,4 +94,5 @@ DEFINE_FLAG_INT32(fuse_file_max_count, "max file total count from fuse root dir"
9494
DEFINE_FLAG_BOOL(enable_root_path_collection, "", false);
9595
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
9696
"if enable containerd upper dir detect when locating rootfs",
97-
false);
97+
false);
98+
DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);

Diff for: core/common/LogtailCommonFlags.h

+1
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,5 @@ DECLARE_FLAG_STRING(fuse_root_dir);
7070
DECLARE_FLAG_BOOL(enable_root_path_collection);
7171
DECLARE_FLAG_INT32(logtail_alarm_interval);
7272
DECLARE_FLAG_BOOL(enable_containerd_upper_dir_detect);
73+
DECLARE_FLAG_BOOL(enable_sls_metrics_format);
7374
DECLARE_FLAG_BOOL(enable_new_pipeline);

Diff for: core/logtail.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ static void overwrite_community_edition_flags() {
111111
INT32_FLAG(data_server_port) = 443;
112112
BOOL_FLAG(enable_env_ref_in_config) = true;
113113
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
114+
BOOL_FLAG(enable_sls_metrics_format) = false;
114115
}
115116

116117
// Main routine of worker process.

Diff for: core/logtail_windows.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ static void overwrite_community_edition_flags() {
6464
INT32_FLAG(data_server_port) = 443;
6565
BOOL_FLAG(enable_env_ref_in_config) = true;
6666
BOOL_FLAG(enable_containerd_upper_dir_detect) = true;
67+
BOOL_FLAG(enable_sls_metrics_format) = false;
6768
}
6869

6970
void do_worker_process() {

Diff for: core/plugin/LogtailPlugin.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ LogtailPlugin::LogtailPlugin() {
5252
mPluginCfg["HostIP"] = LogFileProfiler::mIpAddr;
5353
mPluginCfg["Hostname"] = LogFileProfiler::mHostname;
5454
mPluginCfg["EnableContainerdUpperDirDetect"] = BOOL_FLAG(enable_containerd_upper_dir_detect);
55+
mPluginCfg["EnableSlsMetricsFormat"] = BOOL_FLAG(enable_sls_metrics_format);
5556
}
5657

5758
LogtailPlugin::~LogtailPlugin() {

Diff for: pkg/config/global_config.go

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type GlobalConfig struct {
3737

3838
EnableTimestampNanosecond bool
3939
EnableContainerdUpperDirDetect bool
40+
EnableSlsMetricsFormat bool
4041
}
4142

4243
// LogtailGlobalConfig is the singleton instance of GlobalConfig.

Diff for: pkg/helper/log_helper.go

+223-37
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,25 @@ package helper
1616

1717
import (
1818
"fmt"
19+
"math"
1920
"sort"
2021
"strconv"
2122
"strings"
2223
"time"
2324

25+
"github.com/alibaba/ilogtail/pkg/config"
2426
"github.com/alibaba/ilogtail/pkg/protocol"
27+
"github.com/alibaba/ilogtail/pkg/util"
28+
)
29+
30+
const (
31+
// StaleNaN is a signaling NaN, due to the MSB of the mantissa being 0.
32+
// This value is chosen with many leading 0s, so we have scope to store more
33+
// complicated values in the future. It is 2 rather than 1 to make
34+
// it easier to distinguish from the NormalNaN by a human when debugging.
35+
StaleNaN uint64 = 0x7ff0000000000002
36+
StaleNan = "__STALE_NAN__"
37+
SlsMetricstoreInvalidReplaceCharacter = '_'
2538
)
2639

2740
func CreateLog(t time.Time, configTag map[string]string, logTags map[string]string, fields map[string]string) (*protocol.Log, error) {
@@ -96,25 +109,112 @@ type MetricLabel struct {
96109
}
97110

98111
// Labels for metric labels
99-
type MetricLabels []MetricLabel
112+
type MetricLabels struct {
113+
keyValues []*MetricLabel
114+
sorted bool
115+
formatStr string
116+
}
117+
118+
func (kv *MetricLabels) clearCache() {
119+
kv.sorted = false
120+
kv.formatStr = ""
121+
}
122+
123+
func (kv *MetricLabels) Len() int {
124+
return len(kv.keyValues)
125+
}
100126

101-
func (l MetricLabels) Len() int {
102-
return len(l)
127+
func (kv *MetricLabels) Swap(i int, j int) {
128+
kv.keyValues[i], kv.keyValues[j] = kv.keyValues[j], kv.keyValues[i]
103129
}
104130

105-
func (l MetricLabels) Swap(i int, j int) {
106-
l[i], l[j] = l[j], l[i]
131+
func (kv *MetricLabels) Less(i int, j int) bool {
132+
return kv.keyValues[i].Name < kv.keyValues[j].Name
107133
}
108134

109-
func (l MetricLabels) Less(i int, j int) bool {
110-
return l[i].Name < l[j].Name
135+
func (kv *MetricLabels) Replace(key, value string) {
136+
findIndex := sort.Search(len(kv.keyValues), func(index int) bool {
137+
return kv.keyValues[index].Name >= key
138+
})
139+
if findIndex < len(kv.keyValues) && kv.keyValues[findIndex].Name == key {
140+
kv.keyValues[findIndex].Value = value
141+
} else {
142+
kv.Append(key, value)
143+
}
144+
kv.clearCache()
145+
}
146+
147+
func (kv *MetricLabels) Clone() *MetricLabels {
148+
if kv == nil {
149+
return &MetricLabels{}
150+
}
151+
var newKeyValues MetricLabels
152+
kv.CloneInto(&newKeyValues)
153+
return &newKeyValues
111154
}
112155

113-
func MinInt(a, b int) int {
114-
if a < b {
115-
return a
156+
func (kv *MetricLabels) CloneInto(dst *MetricLabels) *MetricLabels {
157+
if kv == nil {
158+
return &MetricLabels{}
159+
}
160+
if dst == nil {
161+
return kv.Clone()
162+
}
163+
if len(kv.keyValues) < cap(dst.keyValues) {
164+
dst.keyValues = dst.keyValues[:len(kv.keyValues)]
165+
} else {
166+
dst.keyValues = make([]*MetricLabel, len(kv.keyValues))
167+
}
168+
dst.sorted = kv.sorted
169+
dst.formatStr = kv.formatStr
170+
for i, value := range kv.keyValues {
171+
cp := *value
172+
dst.keyValues[i] = &cp
116173
}
117-
return b
174+
return dst
175+
}
176+
177+
// AppendMap ...
178+
func (kv *MetricLabels) AppendMap(mapVal map[string]string) {
179+
for key, value := range mapVal {
180+
kv.Append(key, value)
181+
}
182+
kv.clearCache()
183+
}
184+
185+
// Append ...
186+
func (kv *MetricLabels) Append(key, value string) {
187+
kv.keyValues = append(kv.keyValues, &MetricLabel{
188+
formatLabelKey(key),
189+
formatLabelValue(value),
190+
})
191+
kv.clearCache()
192+
}
193+
194+
func (kv *MetricLabels) SubSlice(begin, end int) {
195+
kv.keyValues = kv.keyValues[begin:end]
196+
kv.clearCache()
197+
}
198+
199+
func (kv *MetricLabels) String() string {
200+
if kv == nil {
201+
return ""
202+
}
203+
if !kv.sorted || kv.formatStr == "" {
204+
sort.Sort(kv)
205+
var builder strings.Builder
206+
for index, label := range kv.keyValues {
207+
builder.WriteString(label.Name)
208+
builder.WriteString("#$#")
209+
builder.WriteString(label.Value)
210+
if index != len(kv.keyValues)-1 {
211+
builder.WriteByte('|')
212+
}
213+
}
214+
kv.formatStr = builder.String()
215+
kv.sorted = true
216+
}
217+
return kv.formatStr
118218
}
119219

120220
// DefBucket ...
@@ -131,42 +231,128 @@ type HistogramData struct {
131231
}
132232

133233
// ToMetricLogs ..
134-
func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels MetricLabels) []*protocol.Log {
234+
func (hd *HistogramData) ToMetricLogs(name string, timeMs int64, labels *MetricLabels) []*protocol.Log {
135235
logs := make([]*protocol.Log, 0, len(hd.Buckets)+2)
136-
sort.Sort(labels)
236+
logs = append(logs, NewMetricLog(name+"_count", timeMs, float64(hd.Count), labels))
237+
logs = append(logs, NewMetricLog(name+"_sum", timeMs, hd.Sum, labels))
137238
for _, v := range hd.Buckets {
138-
newLabels := make(MetricLabels, len(labels), len(labels)+1)
139-
copy(newLabels, labels)
140-
newLabels = append(newLabels, MetricLabel{Name: "le", Value: strconv.FormatFloat(v.Le, 'g', -1, 64)})
141-
sort.Sort(newLabels)
142-
logs = append(logs, NewMetricLog(name+"_bucket", timeMs, strconv.FormatInt(v.Count, 10), newLabels))
143-
}
144-
logs = append(logs, NewMetricLog(name+"_count", timeMs, strconv.FormatInt(hd.Count, 10), labels))
145-
logs = append(logs, NewMetricLog(name+"_sum", timeMs, strconv.FormatFloat(hd.Sum, 'g', -1, 64), labels))
239+
labels.Replace("le", strconv.FormatFloat(v.Le, 'g', -1, 64))
240+
logs = append(logs, NewMetricLog(name+"_bucket", timeMs, float64(v.Count), labels))
241+
}
242+
146243
return logs
147244
}
148245

149-
// NewMetricLog caller must sort labels
150-
func NewMetricLog(name string, timeMs int64, value string, labels []MetricLabel) *protocol.Log {
151-
strTime := strconv.FormatInt(timeMs, 10)
246+
// NewMetricLog create a metric log, time support unix milliseconds and unix nanoseconds.
247+
func NewMetricLog(name string, t int64, value float64, labels *MetricLabels) *protocol.Log {
248+
var valStr string
249+
if math.Float64bits(value) == StaleNaN {
250+
valStr = StaleNan
251+
} else {
252+
valStr = strconv.FormatFloat(value, 'g', -1, 64)
253+
}
254+
return NewMetricLogStringVal(name, t, valStr, labels)
255+
}
256+
257+
// NewMetricLogStringVal create a metric log with val string, time support unix milliseconds and unix nanoseconds.
258+
func NewMetricLogStringVal(name string, t int64, value string, labels *MetricLabels) *protocol.Log {
259+
strTime := strconv.FormatInt(t, 10)
152260
metric := &protocol.Log{}
153-
protocol.SetLogTimeWithNano(metric, uint32(timeMs/1000), uint32((timeMs*1e6)%1e9))
154-
metric.Contents = []*protocol.Log_Content{}
155-
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: name})
261+
switch len(strTime) {
262+
case 13:
263+
protocol.SetLogTimeWithNano(metric, uint32(t/1000), uint32((t*1e6)%1e9))
264+
strTime += "000000"
265+
case 19:
266+
protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9))
267+
default:
268+
t = int64(float64(t) * math.Pow10(19-len(strTime)))
269+
strTime = strconv.FormatInt(t, 10)
270+
protocol.SetLogTimeWithNano(metric, uint32(t/1e9), uint32(t%1e9))
271+
}
272+
metric.Contents = make([]*protocol.Log_Content, 0, 4)
273+
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__name__", Value: formatNewMetricName(name)})
156274
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__time_nano__", Value: strTime})
275+
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: labels.String()})
276+
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value})
277+
return metric
278+
}
279+
280+
func formatLabelKey(key string) string {
281+
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
282+
return key
283+
}
284+
var newKey []byte
285+
for i := 0; i < len(key); i++ {
286+
b := key[i]
287+
if (b >= 'a' && b <= 'z') ||
288+
(b >= 'A' && b <= 'Z') ||
289+
(b >= '0' && b <= '9') ||
290+
b == '_' {
291+
continue
292+
} else {
293+
if newKey == nil {
294+
newKey = []byte(key)
295+
}
296+
newKey[i] = SlsMetricstoreInvalidReplaceCharacter
297+
}
298+
}
299+
if newKey == nil {
300+
return key
301+
}
302+
return util.ZeroCopyBytesToString(newKey)
303+
}
157304

158-
builder := strings.Builder{}
159-
for index, l := range labels {
160-
if index != 0 {
161-
builder.WriteString("|")
305+
func formatLabelValue(value string) string {
306+
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
307+
return value
308+
}
309+
var newValue []byte
310+
for i := 0; i < len(value); i++ {
311+
b := value[i]
312+
if b != '|' {
313+
continue
314+
} else {
315+
if newValue == nil {
316+
newValue = []byte(value)
317+
}
318+
newValue[i] = SlsMetricstoreInvalidReplaceCharacter
162319
}
163-
builder.WriteString(l.Name)
164-
builder.WriteString("#$#")
165-
builder.WriteString(l.Value)
320+
}
321+
if newValue == nil {
322+
return value
323+
}
324+
return util.ZeroCopyBytesToString(newValue)
325+
}
166326

327+
func formatNewMetricName(name string) string {
328+
if !config.LogtailGlobalConfig.EnableSlsMetricsFormat {
329+
return name
330+
}
331+
newName := []byte(name)
332+
for i, b := range newName {
333+
if (b >= 'a' && b <= 'z') ||
334+
(b >= 'A' && b <= 'Z') ||
335+
(b >= '0' && b <= '9') ||
336+
b == '_' ||
337+
b == ':' {
338+
continue
339+
} else {
340+
newName[i] = SlsMetricstoreInvalidReplaceCharacter
341+
}
167342
}
168-
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__labels__", Value: builder.String()})
343+
return util.ZeroCopyBytesToString(newName)
344+
}
169345

170-
metric.Contents = append(metric.Contents, &protocol.Log_Content{Key: "__value__", Value: value})
171-
return metric
346+
// ReplaceInvalidChars analog of invalidChars = regexp.MustCompile("[^a-zA-Z0-9_]")
347+
func ReplaceInvalidChars(in *string) {
348+
for charIndex, char := range *in {
349+
charInt := int(char)
350+
if !((charInt >= 97 && charInt <= 122) || // a-z
351+
(charInt >= 65 && charInt <= 90) || // A-Z
352+
(charInt >= 48 && charInt <= 57) || // 0-9
353+
charInt == 95 || charInt == ':') { // _
354+
355+
*in = (*in)[:charIndex] + "_" + (*in)[charIndex+1:]
356+
}
357+
}
172358
}

Diff for: pkg/helper/log_helper_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package helper
2+
3+
import (
4+
"github.com/stretchr/testify/require"
5+
6+
"testing"
7+
)
8+
9+
func TestMetricLabels_Append(t *testing.T) {
10+
var ml MetricLabels
11+
ml.Append("key2", "val")
12+
ml.Append("key", "val")
13+
log := NewMetricLog("name", 1691646109945, 1, &ml)
14+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val|key2#$#val" > Contents:<Key:"__value__" Value:"1" > `, log.String())
15+
16+
ml.Replace("key", "val2")
17+
18+
log = NewMetricLog("name", 1691646109945, 1, &ml)
19+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val" > Contents:<Key:"__value__" Value:"1" > `, log.String())
20+
21+
ml.Replace("key3", "val3")
22+
log = NewMetricLog("name", 1691646109945, 1, &ml)
23+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val3" > Contents:<Key:"__value__" Value:"1" > `, log.String())
24+
25+
cloneLabel := ml.Clone()
26+
cloneLabel.Replace("key3", "val4")
27+
log = NewMetricLog("name", 1691646109945, 1, cloneLabel)
28+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val4" > Contents:<Key:"__value__" Value:"1" > `, log.String())
29+
30+
log = NewMetricLog("name", 1691646109945, 1, &ml)
31+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"key#$#val2|key2#$#val|key3#$#val3" > Contents:<Key:"__value__" Value:"1" > `, log.String())
32+
33+
log = NewMetricLog("name", 1691646109945, 1, nil)
34+
require.Equal(t, `Time:1691646109 Contents:<Key:"__name__" Value:"name" > Contents:<Key:"__time_nano__" Value:"1691646109945000000" > Contents:<Key:"__labels__" Value:"" > Contents:<Key:"__value__" Value:"1" > `, log.String())
35+
36+
}

0 commit comments

Comments
 (0)