Skip to content

Commit f366745

Browse files
belimawrmergify[bot]
authored andcommitted
Add conditions to copy_fields processor (#6730)
This commit adds conditions to the `copy_fields` processor from the monitoring Filebeat to prevent it from failing and spamming the event logger at debug level with: `target field xxx already exists, drop or rename this field first` --------- Co-authored-by: Pierre HILBERT <[email protected]> (cherry picked from commit acf1098) # Conflicts: # testing/integration/logs_ingestion_test.go
1 parent 4e62471 commit f366745

File tree

5 files changed

+104
-11
lines changed

5 files changed

+104
-11
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Kind can be one of:
2+
# - breaking-change: a change to previously-documented behavior
3+
# - deprecation: functionality that is being removed in a later release
4+
# - bug-fix: fixes a problem in a previous version
5+
# - enhancement: extends functionality but does not break or fix existing behavior
6+
# - feature: new functionality
7+
# - known-issue: problems that we are aware of in a given version
8+
# - security: impacts on the security of a product or a user’s deployment.
9+
# - upgrade: important information for someone upgrading from a prior version
10+
# - other: does not fit into any of the other categories
11+
kind: bug-fix
12+
13+
# Change summary; a 80ish characters long description of the change.
14+
summary: >
15+
Add conditions to copy_fields processors to prevent spamming the debug logs
16+
17+
# Long description; in case the summary is not enough to describe the change
18+
# this field accommodate a description without length limits.
19+
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
20+
description:
21+
22+
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
23+
component: elastic-agent
24+
25+
# PR URL; optional; the PR number that added the changeset.
26+
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
27+
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
28+
# Please provide it if you are adding a fragment for a different PR.
29+
pr: https://github.com/elastic/elastic-agent/pull/6730
30+
31+
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
32+
# If not present is automatically filled by the tooling with the issue linked to the PR number.
33+
issue: https://github.com/elastic/elastic-agent/issues/5299

internal/pkg/agent/application/monitoring/v1_monitor.go

+7
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,13 @@ func (b *BeatsMonitor) injectLogsInput(cfg map[string]interface{}, components []
450450
// possible it's a log message from agent itself (doesn't have component.dataset)
451451
map[string]interface{}{
452452
"copy_fields": map[string]interface{}{
453+
"when": map[string]any{
454+
"not": map[string]any{
455+
"has_fields": []any{
456+
"data_stream.dataset",
457+
},
458+
},
459+
},
453460
"fields": []interface{}{
454461
map[string]interface{}{
455462
"from": "data_stream.dataset_original",

testing/integration/container_cmd_test.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,7 @@ func TestContainerCMDEventToStderr(t *testing.T) {
330330
// We call agentFixture.Prepare to set the workdir
331331
require.NoError(t, agentFixture.Prepare(ctx), "failed preparing agent fixture")
332332

333-
_, outputID := createMockESOutput(t, info)
333+
_, outputID := createMockESOutput(t, info, 0, 0, 100, 0)
334334
policyID, enrollmentAPIKey := createPolicy(
335335
t,
336336
ctx,
@@ -390,8 +390,13 @@ func TestContainerCMDEventToStderr(t *testing.T) {
390390
}, 3*time.Minute, 10*time.Second, "cannot find events on stderr")
391391
}
392392

393-
func createMockESOutput(t *testing.T, info *define.Info) (string, string) {
394-
mockesURL := startMockES(t)
393+
// createMockESOutput creates an output configuration pointing to a mockES
394+
// started in a random port and a cleanup function is registered to close
395+
// the server at the end of the test.
396+
// The server will respond with the passed error probabilities. If they add
397+
// up to zero, all requests are a success.
398+
func createMockESOutput(t *testing.T, info *define.Info, percentDuplicate, percentTooMany, percentNonIndex, percentTooLarge uint) (string, string) {
399+
mockesURL := startMockES(t, percentDuplicate, percentTooMany, percentNonIndex, percentTooLarge)
395400
createOutputBody := `
396401
{
397402
"id": "mock-es-%[1]s",

testing/integration/event_logging_test.go

+41-8
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bufio"
1111
"bytes"
1212
"context"
13+
"encoding/json"
1314
"fmt"
1415
"net/http"
1516
"net/http/httputil"
@@ -37,11 +38,11 @@ outputs:
3738
hosts:
3839
- %s
3940
protocol: http
40-
preset: balanced
41-
41+
preset: latency
4242
inputs:
4343
- type: filestream
4444
id: your-input-id
45+
log_level: debug
4546
streams:
4647
- id: your-filestream-stream-id
4748
data_stream:
@@ -84,10 +85,10 @@ func TestEventLogFile(t *testing.T) {
8485
agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
8586
require.NoError(t, err)
8687

87-
esURL := startMockES(t)
88+
esURL := startMockES(t, 0, 0, 0, 0)
8889

8990
logFilepath := path.Join(t.TempDir(), t.Name())
90-
generateLogFile(t, logFilepath, time.Millisecond*100, 1)
91+
generateLogFile(t, logFilepath, time.Millisecond*100, 20)
9192

9293
cfg := fmt.Sprintf(eventLogConfig, esURL, logFilepath)
9394

@@ -126,6 +127,7 @@ func TestEventLogFile(t *testing.T) {
126127

127128
// Now the Elastic-Agent is running, so validate the Event log file.
128129
requireEventLogFileExistsWithData(t, agentFixture)
130+
requireNoCopyProcessorError(t, agentFixture)
129131

130132
// The diagnostics command is already tested by another test,
131133
// here we just want to validate the events log behaviour
@@ -169,7 +171,7 @@ func TestEventLogOutputConfiguredViaFleet(t *testing.T) {
169171
agentFixture, err := define.NewFixtureFromLocalBuild(t, define.Version())
170172
require.NoError(t, err)
171173

172-
_, outputID := createMockESOutput(t, info)
174+
_, outputID := createMockESOutput(t, info, 0, 0, 100, 0)
173175
policyName := fmt.Sprintf("%s-%s", t.Name(), uuid.Must(uuid.NewV4()).String())
174176
policyID, enrollmentAPIKey := createPolicy(
175177
t,
@@ -307,7 +309,7 @@ func addOverwriteToPolicy(t *testing.T, info *define.Info, policyName, policyID
307309
}
308310
}
309311

310-
func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixture) {
312+
func readEventLogFile(t *testing.T, agentFixture *atesting.Fixture) string {
311313
// Now the Elastic-Agent is running, so validate the Event log file.
312314
// Because the path changes based on the Elastic-Agent version, we
313315
// use glob to find the file
@@ -338,8 +340,39 @@ func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixt
338340
t.Fatalf("cannot read file '%s': %s", logFileName, err)
339341
}
340342

341-
logEntry := string(logEntryBytes)
342-
expectedStr := "Cannot index event"
343+
return string(logEntryBytes)
344+
}
345+
346+
func requireNoCopyProcessorError(t *testing.T, agentFixture *atesting.Fixture) {
347+
data := readEventLogFile(t, agentFixture)
348+
for _, line := range strings.Split(data, "\n") {
349+
logEntry := struct {
350+
LogLogger string `json:"log.logger"`
351+
Message string `json:"message"`
352+
}{}
353+
354+
if len(line) == 0 {
355+
continue
356+
}
357+
if err := json.Unmarshal([]byte(line), &logEntry); err != nil {
358+
t.Fatalf("could not parse log entry: %q", line)
359+
}
360+
361+
if logEntry.LogLogger == "copy_fields" {
362+
if strings.Contains(logEntry.Message, "Failed to copy fields") {
363+
if strings.Contains(logEntry.Message, "already exists, drop or rename this field first") {
364+
t.Fatal("copy_fields processor must not fail")
365+
}
366+
}
367+
}
368+
}
369+
}
370+
371+
func requireEventLogFileExistsWithData(t *testing.T, agentFixture *atesting.Fixture) {
372+
logEntry := readEventLogFile(t, agentFixture)
373+
// That's part of the generated event that is logged by the 'processor'
374+
// logger at level debug
375+
expectedStr := "TestEventLogFile"
343376
if !strings.Contains(logEntry, expectedStr) {
344377
t.Errorf(
345378
"did not find the expected log entry ('%s') in the events log file",

testing/integration/logs_ingestion_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -120,17 +120,32 @@ func TestLogIngestionFleetManaged(t *testing.T) {
120120
})
121121
}
122122

123+
<<<<<<< HEAD
123124
func startMockES(t *testing.T) string {
124125
registry := metrics.NewRegistry()
126+
=======
127+
// startMockES starts a MockES on a random port using httptest.NewServer.
128+
// It registers a cleanup function to close the server when the test finishes.
129+
// The server will respond with the passed error probabilities. If they add
130+
// up to zero, all requests are a success.
131+
func startMockES(t *testing.T, percentDuplicate, percentTooMany, percentNonIndex, percentTooLarge uint) string {
132+
>>>>>>> acf109882 (Add conditions to copy_fields processor (#6730))
125133
uid := uuid.Must(uuid.NewV4())
126134
clusterUUID := uuid.Must(uuid.NewV4()).String()
127135

128136
mux := http.NewServeMux()
129137
mux.Handle("/", mockes.NewAPIHandler(
130138
uid,
131139
clusterUUID,
140+
<<<<<<< HEAD
132141
registry,
133142
time.Now().Add(time.Hour), 0, 0, 0, 100, 0))
143+
=======
144+
nil,
145+
time.Now().Add(time.Hour),
146+
0,
147+
percentDuplicate, percentTooMany, percentNonIndex, percentTooLarge))
148+
>>>>>>> acf109882 (Add conditions to copy_fields processor (#6730))
134149

135150
s := httptest.NewServer(mux)
136151
t.Cleanup(s.Close)

0 commit comments

Comments
 (0)