@@ -82,7 +82,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
82
82
83
83
if len (r .requestFactories ) == 1 {
84
84
finalResps = append (finalResps , httpResp )
85
- p := newPublisher (trCtx , publisher , true , r .log )
85
+ p := newPublisher (trCtx , publisher , true , r .metrics , r . log )
86
86
r .responseProcessors [i ].startProcessing (ctx , trCtx , finalResps , true , p )
87
87
n = p .eventCount ()
88
88
continue
@@ -119,7 +119,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
119
119
return err
120
120
}
121
121
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
122
- p := newPublisher (trCtx , publisher , false , r .log )
122
+ p := newPublisher (trCtx , publisher , false , r .metrics , r . log )
123
123
r .responseProcessors [i ].startProcessing (ctx , trCtx , finalResps , false , p )
124
124
n = p .eventCount ()
125
125
} else {
@@ -189,7 +189,7 @@ func (r *requester) doRequest(ctx context.Context, trCtx *transformContext, publ
189
189
resps = intermediateResps
190
190
}
191
191
192
- p := newPublisher (chainTrCtx , publisher , i < len (r .requestFactories ), r .log )
192
+ p := newPublisher (chainTrCtx , publisher , i < len (r .requestFactories ), r .metrics , r . log )
193
193
if rf .isChain {
194
194
rf .chainResponseProcessor .startProcessing (ctx , chainTrCtx , resps , true , p )
195
195
} else {
@@ -474,14 +474,16 @@ type requester struct {
474
474
client * httpClient
475
475
requestFactories []* requestFactory
476
476
responseProcessors []* responseProcessor
477
+ metrics * inputMetrics
477
478
log * logp.Logger
478
479
}
479
480
480
- func newRequester (client * httpClient , reqs []* requestFactory , resps []* responseProcessor , log * logp.Logger ) * requester {
481
+ func newRequester (client * httpClient , reqs []* requestFactory , resps []* responseProcessor , metrics * inputMetrics , log * logp.Logger ) * requester {
481
482
return & requester {
482
483
client : client ,
483
484
requestFactories : reqs ,
484
485
responseProcessors : resps ,
486
+ metrics : metrics ,
485
487
log : log ,
486
488
}
487
489
}
@@ -716,7 +718,7 @@ func (r *requester) processChainPaginationEvents(ctx context.Context, trCtx *tra
716
718
}
717
719
resps = intermediateResps
718
720
}
719
- p := newPublisher (chainTrCtx , publisher , i < len (r .requestFactories ), r .log )
721
+ p := newPublisher (chainTrCtx , publisher , i < len (r .requestFactories ), r .metrics , r . log )
720
722
rf .chainResponseProcessor .startProcessing (ctx , chainTrCtx , resps , true , p )
721
723
n += p .eventCount ()
722
724
}
@@ -752,13 +754,14 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
752
754
753
755
// publisher is an event publication handler.
754
756
type publisher struct {
755
- trCtx * transformContext
756
- pub inputcursor.Publisher
757
- n int
758
- log * logp.Logger
757
+ trCtx * transformContext
758
+ pub inputcursor.Publisher
759
+ n int
760
+ log * logp.Logger
761
+ metrics * inputMetrics
759
762
}
760
763
761
- func newPublisher (trCtx * transformContext , pub inputcursor.Publisher , publish bool , log * logp.Logger ) * publisher {
764
+ func newPublisher (trCtx * transformContext , pub inputcursor.Publisher , publish bool , metrics * inputMetrics , log * logp.Logger ) * publisher {
762
765
if ! publish {
763
766
pub = nil
764
767
}
@@ -789,6 +792,7 @@ func (p *publisher) handleEvent(_ context.Context, msg mapstr.M) {
789
792
p .trCtx .updateLastEvent (msg )
790
793
p .trCtx .updateCursor ()
791
794
795
+ p .metrics .addEventsPublished (1 )
792
796
p .n ++
793
797
}
794
798
0 commit comments