Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.16](backport #42442) x-pack/filebeat/input/httpjson: add metrics for number of events and pages published #42588

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,61 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Update CEL mito extensions to v1.12.2. {pull}39755[39755]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Relax constraint on Base DN in entity analytics Active Directory provider. {pull}40054[40054]
- Implement Elastic Agent status and health reporting for Netflow Filebeat input. {pull}40080[40080]
- Enhance input state reporting for CEL evaluations that return a single error object in events. {pull}40083[40083]
- Allow absent credentials when using GCS with Application Default Credentials. {issue}39977[39977] {pull}40072[40072]
- Add SSL and username support for Redis input, now the input includes support for Redis 6.0+. {pull}40111[40111]
- Add scaling up support for Netflow input. {issue}37761[37761] {pull}40122[40122]
- Update CEL mito extensions to v1.15.0. {pull}40294[40294]
- Allow cross-region bucket configuration in s3 input. {issue}22161[22161] {pull}40309[40309]
- Improve logging in Okta Entity Analytics provider. {issue}40106[40106] {pull}40347[40347]
- Document `winlog` input. {issue}40074[40074] {pull}40462[40462]
- Added retry logic to websocket connections in the streaming input. {issue}40271[40271] {pull}40601[40601]
- Disable event normalization for netflow input {pull}40635[40635]
- Allow attribute selection in the Active Directory entity analytics provider. {issue}40482[40482] {pull}40662[40662]
- Improve error quality when CEL program does not correctly return an events array. {pull}40580[40580]
- Added support for Microsoft Entra ID RBAC authentication. {issue}40434[40434] {pull}40879[40879]
- Add `use_kubeadm` config option for filebeat (both filbeat.input and autodiscovery) in order to toggle kubeadm-config api requests {pull}40301[40301]
- Make HTTP library function inclusion non-conditional in CEL input. {pull}40912[40912]
- Add support for Crowdstrike streaming API to the streaming input. {issue}40264[40264] {pull}40838[40838]
- Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779]
- Add CSV decoder to awss3 input. {pull}40896[40896]
- Change request trace logging to include headers instead of complete request. {pull}41072[41072]
- Improved GCS input documentation. {pull}41143[41143]
- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978]
- Add CSV decoding capacity to gcs input {pull}40979[40979]
- Add support to source AWS cloudwatch logs from linked accounts. {pull}41188[41188]
- Jounrald input now supports filtering by facilities {pull}41061[41061]
- Add support to include AWS cloudwatch linked accounts when using log_group_name_prefix to define log group names. {pull}41206[41206]
- Improved Azure Blob Storage input documentation. {pull}41252[41252]
- Make ETW input GA. {pull}41389[41389]
- Added input metrics to GCS input. {issue}36640[36640] {pull}41505[41505]
- Add support for Okta entity analytics provider to collect role and factor data for users. {pull}41460[41460]
- Add support for Journald in the System module. {pull}41555[41555]
- Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005]
- Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Update CEL mito extensions to v1.16.0. {pull}41727[41727]
- Filebeat's registry is now added to the Elastic-Agent diagnostics bundle {issue}33238[33238] {pull}41795[41795]
- Add `unifiedlogs` input for MacOS. {pull}41791[41791]
- Add evaluation state dump debugging option to CEL input. {pull}41335[41335]
- Added support for retry configuration in GCS input. {issue}11580[11580] {pull}41862[41862]
- Improve S3 polling mode states registry when using list prefix option. {pull}41869[41869]
- Add support for SSL and Proxy configurations for websoket type in streaming input. {pull}41934[41934]
- AWS S3 input registry cleanup for untracked s3 objects. {pull}41694[41694]
- The environment variable `BEATS_AZURE_EVENTHUB_INPUT_TRACING_ENABLED: true` enables internal logs tracer for the azure-eventhub input. {issue}41931[41931] {pull}41932[41932]
- The Filestream input now uses the `fingerprint` file identity by default. The state from files are automatically migrated if the previous file identity was `native` (the default) or `path`. If the `file_identity` is explicitly set, there is no change in behaviour. {issue}40197[40197] {pull}41762[41762]
- Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]
- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804]
- Journald input now can report its status to Elastic-Agent {issue}39791[39791] {pull}42462[42462]
- Publish events progressively in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42567[42567]
- The journald input is now generally available. {pull}42107[42107]
- Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442]

*Auditbeat*

Expand Down
2 changes: 2 additions & 0 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -1622,6 +1622,8 @@ observe the activity of the input.
[options="header"]
|=======
| Metric | Description
| `events_published_total` | Total number of events published.
| `pages_published_total` | Total number of pages of event published.
| `http_request_total` | Total number of processed requests.
| `http_request_errors_total` | Total number of request errors.
| `http_request_delete_total` | Total number of `DELETE` requests.
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func run(ctx v2.Context, cfg config, pub inputcursor.Publisher, crsr *inputcurso
}
pagination := newPagination(cfg, client, log)
responseProcessor := newResponseProcessor(cfg, pagination, xmlDetails, metrics, log)
requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, metrics, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(cfg.Cursor, log)
Expand Down
12 changes: 12 additions & 0 deletions x-pack/filebeat/input/httpjson/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ type inputMetrics struct {
intervalPages metrics.Sample // histogram of pages per interval
intervals *monitoring.Uint // total number of intervals executed
intervalErrs *monitoring.Uint // total number of interval errors
eventsPublished *monitoring.Uint // number of events published
pagesPublished *monitoring.Uint // number of pages of event published
}

func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
Expand All @@ -29,6 +31,8 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
out := &inputMetrics{
intervals: monitoring.NewUint(reg, "httpjson_interval_total"),
intervalErrs: monitoring.NewUint(reg, "httpjson_interval_errors_total"),
eventsPublished: monitoring.NewUint(reg, "events_published_total"),
pagesPublished: monitoring.NewUint(reg, "pages_published_total"),
intervalExecutionTime: metrics.NewUniformSample(1024),
intervalPageExecutionTime: metrics.NewUniformSample(1024),
intervalPages: metrics.NewUniformSample(1024),
Expand All @@ -44,6 +48,13 @@ func newInputMetrics(reg *monitoring.Registry) *inputMetrics {
return out
}

func (m *inputMetrics) addEventsPublished(n uint64) {
if m == nil {
return
}
m.eventsPublished.Add(n)
}

func (m *inputMetrics) updateIntervalMetrics(err error, t time.Time) {
if m == nil {
return
Expand All @@ -59,6 +70,7 @@ func (m *inputMetrics) updatePageExecutionTime(t time.Time) {
if m == nil {
return
}
m.pagesPublished.Add(1)
m.intervalPageExecutionTime.Update(time.Since(t).Nanoseconds())
}

Expand Down
24 changes: 14 additions & 10 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
p := newPublisher(trCtx, publisher, true, r.log)
p := newPublisher(trCtx, publisher, true, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
Expand Down Expand Up @@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
p := newPublisher(trCtx, publisher, false, r.log)
p := newPublisher(trCtx, publisher, false, r.metrics, r.log)
r.responseProcessors[i].startProcessing(ctx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
Expand Down Expand Up @@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
resps = intermediateResps
}

p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
if rf.isChain {
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
} else {
Expand Down Expand Up @@ -474,14 +474,16 @@ type requester struct {
client *httpClient
requestFactories []*requestFactory
responseProcessors []*responseProcessor
metrics *inputMetrics
log *logp.Logger
}

func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, log *logp.Logger) *requester {
func newRequester(client *httpClient, reqs []*requestFactory, resps []*responseProcessor, metrics *inputMetrics, log *logp.Logger) *requester {
return &requester{
client: client,
requestFactories: reqs,
responseProcessors: resps,
metrics: metrics,
log: log,
}
}
Expand Down Expand Up @@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra
}
resps = intermediateResps
}
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.metrics, r.log)
rf.chainResponseProcessor.startProcessing(ctx, chainTrCtx, resps, true, p)
n += p.eventCount()
}
Expand Down Expand Up @@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {

// publisher is an event publication handler.
type publisher struct {
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
trCtx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
metrics *inputMetrics
}

func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, metrics *inputMetrics, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
Expand Down Expand Up @@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {
p.trCtx.updateLastEvent(msg)
p.trCtx.updateCursor()

p.metrics.addEventsPublished(1)
p.n++
}

Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestCtxAfterDoRequest(t *testing.T) {
pagination := newPagination(config, client, log)
responseProcessor := newResponseProcessor(config, pagination, nil, nil, log)

requester := newRequester(client, requestFactory, responseProcessor, log)
requester := newRequester(client, requestFactory, responseProcessor, nil, log)

trCtx := emptyTransformContext()
trCtx.cursor = newCursor(config.Cursor, log)
Expand Down
Loading