Skip to content

Commit c564e7d

Browse files
bacherflatoulme
andauthored
[opampsupervisor] Report mismatched instance UID (#37541)
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description This PR makes use of the recently introduced telemetry settings to emit appropriate logs and spans in case of when an unexpected instance UID is received via the AgentDescription message. <!-- Issue number (e.g. #1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes #29864 <!--Describe what testing was performed and which tests were added.--> #### Testing Added e2e test to verify span emission --------- Signed-off-by: Florian Bacher <[email protected]> Co-authored-by: Antoine Toulme <[email protected]>
1 parent 7893679 commit c564e7d

File tree

7 files changed

+247
-15
lines changed

7 files changed

+247
-15
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
7+
component: opampsupervisor
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Report the reception of an unexpected UID during bootstrapping
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [29864]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

cmd/opampsupervisor/e2e_test.go

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ import (
2929
"text/template"
3030
"time"
3131

32+
"go.opentelemetry.io/collector/pdata/ptrace"
33+
34+
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
35+
3236
"github.com/google/uuid"
3337
"github.com/knadh/koanf/parsers/yaml"
3438
"github.com/knadh/koanf/providers/file"
@@ -1868,3 +1872,106 @@ func findRandomPort() (int, error) {
18681872

18691873
return port, nil
18701874
}
1875+
1876+
func TestSupervisorEmitBootstrapTelemetry(t *testing.T) {
1877+
agentDescription := atomic.Value{}
1878+
1879+
// Load the Supervisor config so we can get the location of
1880+
// the Collector that will be run.
1881+
var cfg config.Supervisor
1882+
cfgFile := getSupervisorConfig(t, "nocap", map[string]string{})
1883+
k := koanf.New("::")
1884+
err := k.Load(file.Provider(cfgFile.Name()), yaml.Parser())
1885+
require.NoError(t, err)
1886+
err = k.UnmarshalWithConf("", &cfg, koanf.UnmarshalConf{
1887+
Tag: "mapstructure",
1888+
})
1889+
require.NoError(t, err)
1890+
1891+
// Get the binary name and version from the Collector binary
1892+
// using the `components` command that prints a YAML-encoded
1893+
// map of information about the Collector build. Some of this
1894+
// information will be used as defaults for the telemetry
1895+
// attributes.
1896+
agentPath := cfg.Agent.Executable
1897+
componentsInfo, err := exec.Command(agentPath, "components").Output()
1898+
require.NoError(t, err)
1899+
k = koanf.New("::")
1900+
err = k.Load(rawbytes.Provider(componentsInfo), yaml.Parser())
1901+
require.NoError(t, err)
1902+
buildinfo := k.StringMap("buildinfo")
1903+
command := buildinfo["command"]
1904+
version := buildinfo["version"]
1905+
1906+
server := newOpAMPServer(
1907+
t,
1908+
defaultConnectingHandler,
1909+
types.ConnectionCallbacks{
1910+
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
1911+
if message.AgentDescription != nil {
1912+
agentDescription.Store(message.AgentDescription)
1913+
}
1914+
1915+
return &protobufs.ServerToAgent{}
1916+
},
1917+
})
1918+
1919+
outputPath := filepath.Join(t.TempDir(), "output.txt")
1920+
_, err = findRandomPort()
1921+
require.Nil(t, err)
1922+
backend := testbed.NewOTLPHTTPDataReceiver(4318)
1923+
mockBackend := testbed.NewMockBackend(outputPath, backend)
1924+
mockBackend.EnableRecording()
1925+
defer mockBackend.Stop()
1926+
require.NoError(t, mockBackend.Start())
1927+
1928+
s := newSupervisor(t,
1929+
"emit_telemetry",
1930+
map[string]string{
1931+
"url": server.addr,
1932+
"telemetryUrl": fmt.Sprintf("localhost:%d", 4318),
1933+
},
1934+
)
1935+
1936+
require.Nil(t, s.Start())
1937+
defer s.Shutdown()
1938+
1939+
waitForSupervisorConnection(server.supervisorConnected, true)
1940+
1941+
require.Eventually(t, func() bool {
1942+
ad, ok := agentDescription.Load().(*protobufs.AgentDescription)
1943+
if !ok {
1944+
return false
1945+
}
1946+
1947+
var agentName, agentVersion string
1948+
identAttr := ad.IdentifyingAttributes
1949+
for _, attr := range identAttr {
1950+
switch attr.Key {
1951+
case semconv.AttributeServiceName:
1952+
agentName = attr.Value.GetStringValue()
1953+
case semconv.AttributeServiceVersion:
1954+
agentVersion = attr.Value.GetStringValue()
1955+
}
1956+
}
1957+
1958+
// By default, the Collector should report its name and version
1959+
// from the component.BuildInfo struct built into the Collector
1960+
// binary.
1961+
return agentName == command && agentVersion == version
1962+
}, 5*time.Second, 250*time.Millisecond)
1963+
1964+
require.EventuallyWithT(t, func(collect *assert.CollectT) {
1965+
require.Len(collect, mockBackend.ReceivedTraces, 1)
1966+
}, 10*time.Second, 250*time.Millisecond)
1967+
1968+
require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().Len())
1969+
gotServiceName, ok := mockBackend.ReceivedTraces[0].ResourceSpans().At(0).Resource().Attributes().Get(semconv.AttributeServiceName)
1970+
require.True(t, ok)
1971+
require.Equal(t, "opamp-supervisor", gotServiceName.Str())
1972+
1973+
require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().Len())
1974+
require.Equal(t, 1, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().Len())
1975+
require.Equal(t, "GetBootstrapInfo", mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Name())
1976+
require.Equal(t, ptrace.StatusCodeOk, mockBackend.ReceivedTraces[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().Code())
1977+
}

cmd/opampsupervisor/examples/supervisor_darwin.yaml

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
server:
2-
endpoint: wss://127.0.0.1:4320/v1/opamp
2+
endpoint: ws://127.0.0.1:4320/v1/opamp
33
tls:
44
# Disable verification to test locally.
55
# Don't do this in production.
66
insecure_skip_verify: true
77
# For more TLS settings see config/configtls.ClientConfig
8+
insecure: true
89

910
capabilities:
1011
reports_effective_config: true
@@ -20,3 +21,20 @@ agent:
2021

2122
storage:
2223
directory: .
24+
25+
telemetry:
26+
traces:
27+
processors:
28+
- simple:
29+
exporter:
30+
otlp:
31+
protocol: http/protobuf
32+
endpoint: http://localhost:4318
33+
logs:
34+
level: debug
35+
processors:
36+
- simple:
37+
exporter:
38+
otlp:
39+
protocol: http/protobuf
40+
endpoint: http://localhost:4318

cmd/opampsupervisor/go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ require (
2525
go.opentelemetry.io/collector/service v0.122.0
2626
go.opentelemetry.io/contrib/bridges/otelzap v0.10.0
2727
go.opentelemetry.io/contrib/otelconf v0.15.0
28+
go.opentelemetry.io/otel v1.35.0
2829
go.opentelemetry.io/otel/log v0.11.0
30+
go.opentelemetry.io/otel/trace v1.35.0
2931
go.uber.org/goleak v1.3.0
3032
go.uber.org/multierr v1.11.0
3133
go.uber.org/zap v1.27.0
@@ -185,7 +187,6 @@ require (
185187
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.60.0 // indirect
186188
go.opentelemetry.io/contrib/propagators/b3 v1.35.0 // indirect
187189
go.opentelemetry.io/contrib/zpages v0.60.0 // indirect
188-
go.opentelemetry.io/otel v1.35.0 // indirect
189190
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc v0.11.0 // indirect
190191
go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp v0.11.0 // indirect
191192
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.35.0 // indirect
@@ -201,14 +202,13 @@ require (
201202
go.opentelemetry.io/otel/sdk v1.35.0 // indirect
202203
go.opentelemetry.io/otel/sdk/log v0.11.0 // indirect
203204
go.opentelemetry.io/otel/sdk/metric v1.35.0 // indirect
204-
go.opentelemetry.io/otel/trace v1.35.0 // indirect
205205
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
206206
golang.org/x/exp v0.0.0-20250210185358-939b2ce775ac // indirect
207207
golang.org/x/net v0.37.0 // indirect
208208
golang.org/x/text v0.23.0 // indirect
209209
gonum.org/v1/gonum v0.15.1 // indirect
210-
google.golang.org/genproto/googleapis/api v0.0.0-20250303144028-a0af3efb3deb // indirect
211-
google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect
210+
google.golang.org/genproto/googleapis/api v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
211+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250313205543-e70fdf4c4cb4 // indirect
212212
google.golang.org/grpc v1.71.0 // indirect
213213
gopkg.in/yaml.v2 v2.4.0 // indirect
214214
)

cmd/opampsupervisor/go.sum

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/opampsupervisor/supervisor/supervisor.go

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ import (
4040
semconv "go.opentelemetry.io/collector/semconv/v1.21.0"
4141
"go.opentelemetry.io/contrib/bridges/otelzap"
4242
telemetryconfig "go.opentelemetry.io/contrib/otelconf/v0.3.0"
43+
"go.opentelemetry.io/otel/codes"
4344
"go.opentelemetry.io/otel/log"
45+
"go.opentelemetry.io/otel/trace"
4446
"go.uber.org/multierr"
4547
"go.uber.org/zap"
4648
"go.uber.org/zap/zapcore"
@@ -66,6 +68,8 @@ var (
6668

6769
lastRecvRemoteConfigFile = "last_recv_remote_config.dat"
6870
lastRecvOwnTelemetryConfigFile = "last_recv_own_telemetry_config.dat"
71+
72+
errNonMatchingInstanceUID = errors.New("received collector instance UID does not match expected UID set by the supervisor")
6973
)
7074

7175
const (
@@ -386,18 +390,23 @@ func (s *Supervisor) createTemplates() error {
386390
// shuts down the Collector. This only needs to happen
387391
// once per Collector binary.
388392
func (s *Supervisor) getBootstrapInfo() (err error) {
393+
_, span := s.getTracer().Start(context.Background(), "GetBootstrapInfo")
394+
defer span.End()
389395
s.opampServerPort, err = s.getSupervisorOpAMPServerPort()
390396
if err != nil {
397+
span.SetStatus(codes.Error, fmt.Sprintf("Could not get supervisor opamp service port: %v", err))
391398
return err
392399
}
393400

394401
bootstrapConfig, err := s.composeNoopConfig()
395402
if err != nil {
403+
span.SetStatus(codes.Error, fmt.Sprintf("Could not compose noop config config: %v", err))
396404
return err
397405
}
398406

399407
err = os.WriteFile(s.agentConfigFilePath(), bootstrapConfig, 0o600)
400408
if err != nil {
409+
span.SetStatus(codes.Error, fmt.Sprintf("Failed to write agent config: %v", err))
401410
return fmt.Errorf("failed to write agent config: %w", err)
402411
}
403412

@@ -428,13 +437,13 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
428437

429438
for _, attr := range identAttr {
430439
if attr.Key == semconv.AttributeServiceInstanceID {
431-
// TODO: Consider whether to attempt restarting the Collector.
432-
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/29864
433440
if attr.Value.GetStringValue() != s.persistentState.InstanceID.String() {
434441
done <- fmt.Errorf(
435-
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s)",
442+
"the Collector's instance ID (%s) does not match with the instance ID set by the Supervisor (%s): %w",
436443
attr.Value.GetStringValue(),
437-
s.persistentState.InstanceID.String())
444+
s.persistentState.InstanceID.String(),
445+
errNonMatchingInstanceUID,
446+
)
438447
return response
439448
}
440449
instanceIDSeen = true
@@ -479,6 +488,7 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
479488
},
480489
}.toServerSettings())
481490
if err != nil {
491+
span.SetStatus(codes.Error, fmt.Sprintf("Could not start OpAMP server: %v", err))
482492
return err
483493
}
484494

@@ -495,10 +505,12 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
495505
"--config", s.agentConfigFilePath(),
496506
)
497507
if err != nil {
508+
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
498509
return err
499510
}
500511

501512
if err = cmd.Start(context.Background()); err != nil {
513+
span.SetStatus(codes.Error, fmt.Sprintf("Could not start Agent: %v", err))
502514
return err
503515
}
504516

@@ -511,11 +523,39 @@ func (s *Supervisor) getBootstrapInfo() (err error) {
511523
select {
512524
case <-time.After(s.config.Agent.BootstrapTimeout):
513525
if connected.Load() {
514-
return errors.New("collector connected but never responded with an AgentDescription message")
526+
msg := "collector connected but never responded with an AgentDescription message"
527+
span.SetStatus(codes.Error, msg)
528+
return errors.New(msg)
515529
} else {
516-
return errors.New("collector's OpAMP client never connected to the Supervisor")
530+
msg := "collector's OpAMP client never connected to the Supervisor"
531+
span.SetStatus(codes.Error, msg)
532+
return errors.New(msg)
517533
}
518534
case err = <-done:
535+
if errors.Is(err, errNonMatchingInstanceUID) {
536+
// try to report the issue to the OpAMP server
537+
if startOpAMPErr := s.startOpAMPClient(); startOpAMPErr == nil {
538+
defer func(s *Supervisor) {
539+
if stopErr := s.stopOpAMPClient(); stopErr != nil {
540+
s.telemetrySettings.Logger.Error("Could not stop OpAmp client", zap.Error(stopErr))
541+
}
542+
}(s)
543+
if healthErr := s.opampClient.SetHealth(&protobufs.ComponentHealth{
544+
Healthy: false,
545+
LastError: err.Error(),
546+
}); healthErr != nil {
547+
s.telemetrySettings.Logger.Error("Could not report health to OpAMP server", zap.Error(healthErr))
548+
}
549+
} else {
550+
s.telemetrySettings.Logger.Error("Could not start OpAMP client to report health to server", zap.Error(startOpAMPErr))
551+
}
552+
}
553+
if err != nil {
554+
s.telemetrySettings.Logger.Error("Could not complete bootstrap", zap.Error(err))
555+
span.SetStatus(codes.Error, err.Error())
556+
} else {
557+
span.SetStatus(codes.Ok, "")
558+
}
519559
return err
520560
}
521561
}
@@ -1640,6 +1680,11 @@ func (s *Supervisor) findRandomPort() (int, error) {
16401680
return port, nil
16411681
}
16421682

1683+
func (s *Supervisor) getTracer() trace.Tracer {
1684+
tracer := s.telemetrySettings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/cmd/opampsupervisor")
1685+
return tracer
1686+
}
1687+
16431688
// The default koanf behavior is to override lists in the config.
16441689
// Instead, we provide this function, which merges the source and destination config's
16451690
// extension lists by concatenating the two.

0 commit comments

Comments
 (0)