Skip to content
This repository was archived by the owner on Feb 23, 2021. It is now read-only.

Commit 0c6b72d

Browse files
authored
Merge pull request google#1473 from jimmidyson/prom-collector-label-spaces
Switch to Prometheus decoders for Prometheus custom metric endpoint parsing
2 parents 22695f9 + 923dbc5 commit 0c6b72d

File tree

4 files changed

+191
-106
lines changed

4 files changed

+191
-106
lines changed

collector/prometheus_collector.go

Lines changed: 124 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@
1515
package collector
1616

1717
import (
18+
"bytes"
1819
"encoding/json"
1920
"fmt"
20-
"io/ioutil"
21-
"math"
21+
"io"
2222
"net/http"
23-
"strconv"
24-
"strings"
23+
"sort"
2524
"time"
2625

26+
rawmodel "github.com/prometheus/client_model/go"
27+
"github.com/prometheus/common/expfmt"
28+
"github.com/prometheus/common/model"
29+
2730
"github.com/google/cadvisor/container"
2831
"github.com/google/cadvisor/info/v1"
2932
)
@@ -100,62 +103,104 @@ func (collector *PrometheusCollector) Name() string {
100103
return collector.name
101104
}
102105

103-
func getMetricData(line string) string {
104-
fields := strings.Fields(line)
105-
data := fields[3]
106-
if len(fields) > 4 {
107-
for i := range fields {
108-
if i > 3 {
109-
data = data + "_" + fields[i]
110-
}
111-
}
112-
}
113-
return strings.TrimSpace(data)
114-
}
115-
116106
func (collector *PrometheusCollector) GetSpec() []v1.MetricSpec {
117-
specs := []v1.MetricSpec{}
118107

119108
response, err := collector.httpClient.Get(collector.configFile.Endpoint.URL)
120109
if err != nil {
121-
return specs
110+
return nil
122111
}
123112
defer response.Body.Close()
124113

125-
pageContent, err := ioutil.ReadAll(response.Body)
126-
if err != nil {
127-
return specs
114+
if response.StatusCode != http.StatusOK {
115+
return nil
128116
}
129117

130-
lines := strings.Split(string(pageContent), "\n")
131-
lineCount := len(lines)
132-
for i, line := range lines {
133-
if strings.HasPrefix(line, "# HELP") {
134-
if i+2 >= lineCount {
135-
break
136-
}
118+
dec := expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header))
137119

138-
stopIndex := strings.IndexAny(lines[i+2], "{ ")
139-
if stopIndex == -1 {
140-
continue
141-
}
120+
var specs []v1.MetricSpec
142121

143-
name := strings.TrimSpace(lines[i+2][0:stopIndex])
144-
if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok {
145-
continue
146-
}
147-
spec := v1.MetricSpec{
148-
Name: name,
149-
Type: v1.MetricType(getMetricData(lines[i+1])),
150-
Format: "float",
151-
Units: getMetricData(lines[i]),
152-
}
153-
specs = append(specs, spec)
122+
for {
123+
d := rawmodel.MetricFamily{}
124+
if err = dec.Decode(&d); err != nil {
125+
break
154126
}
127+
name := d.GetName()
128+
if len(name) == 0 {
129+
continue
130+
}
131+
// If metrics to collect is specified, skip any metrics not in the list to collect.
132+
if _, ok := collector.metricsSet[name]; collector.metricsSet != nil && !ok {
133+
continue
134+
}
135+
136+
spec := v1.MetricSpec{
137+
Name: name,
138+
Type: metricType(d.GetType()),
139+
Format: v1.FloatType,
140+
}
141+
specs = append(specs, spec)
155142
}
143+
144+
if err != nil && err != io.EOF {
145+
return nil
146+
}
147+
156148
return specs
157149
}
158150

151+
// metricType converts Prometheus metric type to cadvisor metric type.
152+
// If there is no mapping then just return the name of the Prometheus metric type.
153+
func metricType(t rawmodel.MetricType) v1.MetricType {
154+
switch t {
155+
case rawmodel.MetricType_COUNTER:
156+
return v1.MetricCumulative
157+
case rawmodel.MetricType_GAUGE:
158+
return v1.MetricGauge
159+
default:
160+
return v1.MetricType(t.String())
161+
}
162+
}
163+
164+
type prometheusLabels []*rawmodel.LabelPair
165+
166+
func labelSetToLabelPairs(labels model.Metric) prometheusLabels {
167+
var promLabels prometheusLabels
168+
for k, v := range labels {
169+
name := string(k)
170+
value := string(v)
171+
promLabels = append(promLabels, &rawmodel.LabelPair{Name: &name, Value: &value})
172+
}
173+
return promLabels
174+
}
175+
176+
func (s prometheusLabels) Len() int { return len(s) }
177+
func (s prometheusLabels) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
178+
179+
// ByName implements sort.Interface by providing Less and using the Len and
180+
// Swap methods of the embedded PrometheusLabels value.
181+
type byName struct{ prometheusLabels }
182+
183+
func (s byName) Less(i, j int) bool {
184+
return s.prometheusLabels[i].GetName() < s.prometheusLabels[j].GetName()
185+
}
186+
187+
func prometheusLabelSetToCadvisorLabel(promLabels model.Metric) string {
188+
labels := labelSetToLabelPairs(promLabels)
189+
sort.Sort(byName{labels})
190+
var b bytes.Buffer
191+
192+
for i, l := range labels {
193+
if i > 0 {
194+
b.WriteString("\xff")
195+
}
196+
b.WriteString(l.GetName())
197+
b.WriteString("=")
198+
b.WriteString(l.GetValue())
199+
}
200+
201+
return string(b.Bytes())
202+
}
203+
159204
//Returns collected metrics and the next collection time of the collector
160205
func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal) (time.Time, map[string][]v1.MetricVal, error) {
161206
currentTime := time.Now()
@@ -168,59 +213,61 @@ func (collector *PrometheusCollector) Collect(metrics map[string][]v1.MetricVal)
168213
}
169214
defer response.Body.Close()
170215

171-
pageContent, err := ioutil.ReadAll(response.Body)
172-
if err != nil {
173-
return nextCollectionTime, nil, err
216+
if response.StatusCode != http.StatusOK {
217+
return nextCollectionTime, nil, fmt.Errorf("server returned HTTP status %s", response.Status)
174218
}
175219

176-
var errorSlice []error
177-
lines := strings.Split(string(pageContent), "\n")
178-
179-
newMetrics := make(map[string][]v1.MetricVal)
220+
sdec := expfmt.SampleDecoder{
221+
Dec: expfmt.NewDecoder(response.Body, expfmt.ResponseFormat(response.Header)),
222+
Opts: &expfmt.DecodeOptions{
223+
Timestamp: model.TimeFromUnixNano(currentTime.UnixNano()),
224+
},
225+
}
180226

181-
for _, line := range lines {
182-
if line == "" {
227+
var (
228+
// 50 is chosen as a reasonable guesstimate at a number of metrics we can
229+
// expect from virtually any endpoint to try to save allocations.
230+
decSamples = make(model.Vector, 0, 50)
231+
newMetrics = make(map[string][]v1.MetricVal)
232+
)
233+
for {
234+
if err = sdec.Decode(&decSamples); err != nil {
183235
break
184236
}
185-
if !strings.HasPrefix(line, "# HELP") && !strings.HasPrefix(line, "# TYPE") {
186-
var metLabel string
187-
startLabelIndex := strings.Index(line, "{")
188-
spaceIndex := strings.Index(line, " ")
189-
if startLabelIndex == -1 {
190-
startLabelIndex = spaceIndex
191-
}
192237

193-
metName := strings.TrimSpace(line[0:startLabelIndex])
194-
if _, ok := collector.metricsSet[metName]; collector.metricsSet != nil && !ok {
238+
for _, sample := range decSamples {
239+
metName := string(sample.Metric[model.MetricNameLabel])
240+
if len(metName) == 0 {
195241
continue
196242
}
197-
198-
if startLabelIndex+1 <= spaceIndex-1 {
199-
metLabel = strings.TrimSpace(line[(startLabelIndex + 1):(spaceIndex - 1)])
200-
}
201-
202-
metVal, err := strconv.ParseFloat(line[spaceIndex+1:], 64)
203-
if err != nil {
204-
errorSlice = append(errorSlice, err)
205-
}
206-
if math.IsNaN(metVal) {
207-
metVal = 0
243+
// If metrics to collect is specified, skip any metrics not in the list to collect.
244+
if _, ok := collector.metricsSet[metName]; collector.metricsSet != nil && !ok {
245+
continue
208246
}
247+
// TODO Handle multiple labels nicer. Prometheus metrics can have multiple
248+
// labels, cadvisor only accepts a single string for the metric label.
249+
label := prometheusLabelSetToCadvisorLabel(sample.Metric)
209250

210251
metric := v1.MetricVal{
211-
Label: metLabel,
212-
FloatValue: metVal,
213-
Timestamp: currentTime,
252+
FloatValue: float64(sample.Value),
253+
Timestamp: sample.Timestamp.Time(),
254+
Label: label,
214255
}
215256
newMetrics[metName] = append(newMetrics[metName], metric)
216257
if len(newMetrics) > collector.metricCountLimit {
217258
return nextCollectionTime, nil, fmt.Errorf("too many metrics to collect")
218259
}
219260
}
261+
decSamples = decSamples[:0]
262+
}
263+
264+
if err != nil && err != io.EOF {
265+
return nextCollectionTime, nil, err
220266
}
267+
221268
for key, val := range newMetrics {
222269
metrics[key] = append(metrics[key], val...)
223270
}
224271

225-
return nextCollectionTime, metrics, compileErrors(errorSlice)
272+
return nextCollectionTime, metrics, nil
226273
}

collector/prometheus_collector_test.go

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,21 +36,29 @@ func TestPrometheus(t *testing.T) {
3636
containerHandler := containertest.NewMockContainerHandler("mockContainer")
3737
collector, err := NewPrometheusCollector("Prometheus", configFile, 100, containerHandler, http.DefaultClient)
3838
assert.NoError(err)
39-
assert.Equal(collector.name, "Prometheus")
40-
assert.Equal(collector.configFile.Endpoint.URL, "http://localhost:8080/metrics")
39+
assert.Equal("Prometheus", collector.name)
40+
assert.Equal("http://localhost:8080/metrics", collector.configFile.Endpoint.URL)
4141

4242
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
4343

44-
text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n"
45-
text += "# TYPE go_gc_duration_seconds summary\n"
46-
text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n"
47-
text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n"
48-
text += "# HELP go_goroutines Number of goroutines that currently exist.\n"
49-
text += "# TYPE go_goroutines gauge\n"
50-
text += "go_goroutines 16\n"
51-
text += "# HELP empty_metric A metric without any values\n"
52-
text += "# TYPE empty_metric counter\n"
53-
text += "\n"
44+
text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
45+
# TYPE go_gc_duration_seconds summary
46+
go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05
47+
go_gc_duration_seconds{quantile="1"} 0.000499764
48+
go_gc_duration_seconds_sum 1.7560473e+07
49+
go_gc_duration_seconds_count 2693
50+
# HELP go_goroutines Number of goroutines that currently exist.
51+
# TYPE go_goroutines gauge
52+
go_goroutines 16
53+
# HELP empty_metric A metric without any values
54+
# TYPE empty_metric counter
55+
# HELP metric_with_spaces_in_label A metric with spaces in a label.
56+
# TYPE metric_with_spaces_in_label gauge
57+
metric_with_spaces_in_label{name="Network Agent"} 72
58+
# HELP metric_with_multiple_labels A metric with multiple labels.
59+
# TYPE metric_with_multiple_labels gauge
60+
metric_with_multiple_labels{label1="One", label2="Two", label3="Three"} 81
61+
`
5462
fmt.Fprintln(w, text)
5563
}))
5664

@@ -60,21 +68,47 @@ func TestPrometheus(t *testing.T) {
6068

6169
var spec []v1.MetricSpec
6270
require.NotPanics(t, func() { spec = collector.GetSpec() })
63-
assert.Len(spec, 2)
64-
assert.Equal(spec[0].Name, "go_gc_duration_seconds")
65-
assert.Equal(spec[1].Name, "go_goroutines")
71+
assert.Len(spec, 4)
72+
specNames := make(map[string]struct{}, 3)
73+
for _, s := range spec {
74+
specNames[s.Name] = struct{}{}
75+
}
76+
expectedSpecNames := map[string]struct{}{
77+
"go_gc_duration_seconds": {},
78+
"go_goroutines": {},
79+
"metric_with_spaces_in_label": {},
80+
"metric_with_multiple_labels": {},
81+
}
82+
assert.Equal(expectedSpecNames, specNames)
6683

6784
metrics := map[string][]v1.MetricVal{}
6885
_, metrics, errMetric := collector.Collect(metrics)
6986

7087
assert.NoError(errMetric)
7188

7289
go_gc_duration := metrics["go_gc_duration_seconds"]
73-
assert.Equal(go_gc_duration[0].FloatValue, 5.8348000000000004e-05)
74-
assert.Equal(go_gc_duration[1].FloatValue, 0.000499764)
90+
assert.Equal(5.8348000000000004e-05, go_gc_duration[0].FloatValue)
91+
assert.Equal("__name__=go_gc_duration_seconds\xffquantile=0", go_gc_duration[0].Label)
92+
assert.Equal(0.000499764, go_gc_duration[1].FloatValue)
93+
assert.Equal("__name__=go_gc_duration_seconds\xffquantile=1", go_gc_duration[1].Label)
94+
go_gc_duration_sum := metrics["go_gc_duration_seconds_sum"]
95+
assert.Equal(1.7560473e+07, go_gc_duration_sum[0].FloatValue)
96+
assert.Equal("__name__=go_gc_duration_seconds_sum", go_gc_duration_sum[0].Label)
97+
go_gc_duration_count := metrics["go_gc_duration_seconds_count"]
98+
assert.Equal(2693, go_gc_duration_count[0].FloatValue)
99+
assert.Equal("__name__=go_gc_duration_seconds_count", go_gc_duration_count[0].Label)
75100

76101
goRoutines := metrics["go_goroutines"]
77-
assert.Equal(goRoutines[0].FloatValue, 16)
102+
assert.Equal(16, goRoutines[0].FloatValue)
103+
assert.Equal("__name__=go_goroutines", goRoutines[0].Label)
104+
105+
metricWithSpaces := metrics["metric_with_spaces_in_label"]
106+
assert.Equal(72, metricWithSpaces[0].FloatValue)
107+
assert.Equal("__name__=metric_with_spaces_in_label\xffname=Network Agent", metricWithSpaces[0].Label)
108+
109+
metricWithMultipleLabels := metrics["metric_with_multiple_labels"]
110+
assert.Equal(81, metricWithMultipleLabels[0].FloatValue)
111+
assert.Equal("__name__=metric_with_multiple_labels\xfflabel1=One\xfflabel2=Two\xfflabel3=Three", metricWithMultipleLabels[0].Label)
78112
}
79113

80114
func TestPrometheusEndpointConfig(t *testing.T) {
@@ -158,13 +192,16 @@ func TestPrometheusFiltersMetrics(t *testing.T) {
158192

159193
tempServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
160194

161-
text := "# HELP go_gc_duration_seconds A summary of the GC invocation durations.\n"
162-
text += "# TYPE go_gc_duration_seconds summary\n"
163-
text += "go_gc_duration_seconds{quantile=\"0\"} 5.8348000000000004e-05\n"
164-
text += "go_gc_duration_seconds{quantile=\"1\"} 0.000499764\n"
165-
text += "# HELP go_goroutines Number of goroutines that currently exist.\n"
166-
text += "# TYPE go_goroutines gauge\n"
167-
text += "go_goroutines 16"
195+
text := `# HELP go_gc_duration_seconds A summary of the GC invocation durations.
196+
# TYPE go_gc_duration_seconds summary
197+
go_gc_duration_seconds{quantile="0"} 5.8348000000000004e-05
198+
go_gc_duration_seconds{quantile="1"} 0.000499764
199+
go_gc_duration_seconds_sum 1.7560473e+07
200+
go_gc_duration_seconds_count 2693
201+
# HELP go_goroutines Number of goroutines that currently exist.
202+
# TYPE go_goroutines gauge
203+
go_goroutines 16
204+
`
168205
fmt.Fprintln(w, text)
169206
}))
170207

0 commit comments

Comments
 (0)