Skip to content

Commit bc08169

Browse files
committed
using Vector otlp sink
Signed-off-by: Vitalii Parfonov <[email protected]>
1 parent b12e562 commit bc08169

File tree

6 files changed

+28
-20
lines changed

6 files changed

+28
-20
lines changed

api/observability/v1/output_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1314,7 +1314,7 @@ type OTLPTuningSpec struct {
13141314
// Compression causes data to be compressed before sending over the network.
13151315
// It is an error if the compression type is not supported by the output.
13161316
//
1317-
// +kubebuilder:validation:Enum:=gzip;none
1317+
// +kubebuilder:validation:Enum:=gzip;snappy;zlib;zstd;none
13181318
// +operator-sdk:csv:customresourcedefinitions:type=spec,displayName="Compression",xDescriptors={"urn:alm:descriptor:com.tectonic.ui:text"}
13191319
Compression string `json:"compression,omitempty"`
13201320
}

bundle/manifests/observability.openshift.io_clusterlogforwarders.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3100,6 +3100,9 @@ spec:
31003100
It is an error if the compression type is not supported by the output.
31013101
enum:
31023102
- gzip
3103+
- snappy
3104+
- zlib
3105+
- zstd
31033106
- none
31043107
type: string
31053108
deliveryMode:

config/crd/bases/observability.openshift.io_clusterlogforwarders.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3100,6 +3100,9 @@ spec:
31003100
It is an error if the compression type is not supported by the output.
31013101
enum:
31023102
- gzip
3103+
- snappy
3104+
- zlib
3105+
- zstd
31033106
- none
31043107
type: string
31053108
deliveryMode:

internal/cmd/functional-benchmarker/config/options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
const (
1919
LogStressorImage = "quay.io/openshift-logging/cluster-logging-load-client:0.2"
20-
imageVector = "quay.io/openshift-logging/vector:6.0"
20+
imageVector = "quay.io/vparfono/vector:v0.46.1-rh"
2121
)
2222

2323
type Options struct {

internal/generator/vector/output/otlp/otlp.go

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type Otlp struct {
2828
ComponentID string
2929
Inputs string
3030
URI string
31-
common.RootMixin
31+
Compression genhelper.OptionalPair
3232
}
3333

3434
func (p Otlp) Name() string {
@@ -38,21 +38,20 @@ func (p Otlp) Name() string {
3838
func (p Otlp) Template() string {
3939
return `{{define "` + p.Name() + `" -}}
4040
[sinks.{{.ComponentID}}]
41-
type = "http"
41+
type = "opentelemetry"
4242
inputs = {{.Inputs}}
43-
uri = "{{.URI}}"
44-
method = "post"
45-
payload_prefix = "{\"resourceLogs\":"
46-
payload_suffix = "}"
43+
protocol.uri = "{{.URI}}"
44+
protocol.type = "http"
45+
protocol.method = "post"
46+
protocol.encoding.codec = "json"
47+
protocol.encoding.except_fields = ["_internal"]
48+
protocol.payload_prefix = "{\"resourceLogs\":"
49+
protocol.payload_suffix = "}"
4750
{{.Compression}}
4851
{{end}}
4952
`
5053
}
5154

52-
func (p *Otlp) SetCompression(algo string) {
53-
p.Compression.Value = algo
54-
}
55-
5655
const (
5756
logSourceContainer = string(obs.ApplicationSourceContainer)
5857
logSourceNode = string(obs.InfrastructureSourceNode)
@@ -145,20 +144,18 @@ func New(id string, o obs.OutputSpec, inputs []string, secrets observability.Sec
145144
els = append(els, FormatResourceLog(formatResourceLogsID, reduceInputs))
146145
// Create sink and wrap in `resourceLogs`
147146
sink := Output(id, o, []string{formatResourceLogsID}, secrets, op)
148-
if strategy != nil {
149-
strategy.VisitSink(sink)
150-
}
147+
148+
protocolId := id + ".protocol"
151149
return MergeElements(
152150
els,
153151
[]Element{
154152
sink,
155-
common.NewEncoding(id, common.CodecJSON),
156153
common.NewAcknowledgments(id, strategy),
157-
common.NewBatch(id, strategy),
154+
common.NewBatch(protocolId, strategy),
158155
common.NewBuffer(id, strategy),
159-
common.NewRequest(id, strategy),
156+
common.NewRequest(protocolId, strategy),
160157
tls.New(id, o.TLS, secrets, op),
161-
auth.HTTPAuth(id, o.OTLP.Authentication, secrets, op),
158+
auth.HTTPAuth(protocolId, o.OTLP.Authentication, secrets, op),
162159
},
163160
)
164161
}
@@ -180,10 +177,14 @@ func RouteBySource(id string, inputs []string, logSources []string) Element {
180177
}
181178

182179
func Output(id string, o obs.OutputSpec, inputs []string, secrets observability.Secrets, op Options) *Otlp {
180+
compression := genhelper.NewOptionalPair("protocol.compression", nil)
181+
if o.OTLP.Tuning != nil && o.OTLP.Tuning.Compression != "" {
182+
compression = genhelper.NewOptionalPair("protocol.compression", o.OTLP.Tuning.Compression)
183+
}
183184
return &Otlp{
184185
ComponentID: id,
185186
Inputs: vectorhelpers.MakeInputs(inputs...),
186187
URI: o.OTLP.URL,
187-
RootMixin: common.NewRootMixin(nil),
188+
Compression: compression,
188189
}
189190
}

test/functional/outputs/otlp/container_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ var _ = Describe("[Functional][Outputs][OTLP] Functional tests", func() {
125125
},
126126
Entry("should pass with gzip", "gzip"),
127127
Entry("should pass with zlib", "zlib"),
128+
Entry("should pass with zstd", "zstd"),
128129
Entry("should pass with no compression", "none"))
129130
})
130131
})

0 commit comments

Comments
 (0)