@@ -86,7 +86,8 @@ type Flows struct {
8686 promoServer * http.Server
8787 sampleDecoder * ovnobserv.SampleDecoder
8888
89- metrics * metrics.Metrics
89+ metrics * metrics.Metrics
90+ rbSSLTracer * flow.RingBufTracer
9091}
9192
9293// ebpfFlowFetcher abstracts the interface of ebpf.FlowFetcher to allow dependency injection in tests
@@ -97,6 +98,7 @@ type ebpfFlowFetcher interface {
9798 LookupAndDeleteMap (* metrics.Metrics ) map [ebpf.BpfFlowId ]model.BpfFlowContent
9899 DeleteMapsStaleEntries (timeOut time.Duration )
99100 ReadRingBuf () (ringbuf.Record , error )
101+ ReadSSLRingBuf () (ringbuf.Record , error )
100102}
101103
102104// FlowsAgent instantiates a new agent, given a configuration.
@@ -175,6 +177,8 @@ func FlowsAgent(cfg *config.Agent) (*Flows, error) {
175177 BpfManBpfFSPath : cfg .BpfManBpfFSPath ,
176178 EnableIPsecTracker : cfg .EnableIPsecTracking ,
177179 FilterConfig : filterRules ,
180+ EnableSSL : cfg .EnableSSL ,
181+ OpenSSLPath : cfg .OpenSSLPath ,
178182 }
179183
180184 fetcher , err := tracer .NewFlowFetcher (ebpfConfig , m )
@@ -206,6 +210,10 @@ func flowsAgent(
206210
207211 mapTracer := flow .NewMapTracer (fetcher , cfg .CacheActiveTimeout , cfg .StaleEntriesEvictTimeout , m , s , cfg .EnableUDNMapping )
208212 rbTracer := flow .NewRingBufTracer (fetcher , mapTracer , cfg .CacheActiveTimeout , m )
213+ var rbSSLTracer * flow.RingBufTracer
214+ if cfg .EnableSSL {
215+ rbSSLTracer = flow .NewSSLRingBufTracer (fetcher , mapTracer , cfg .CacheActiveTimeout , m )
216+ }
209217 accounter := flow .NewAccounter (cfg .CacheMaxFlows , cfg .CacheActiveTimeout , time .Now , monotime .Now , m , s , cfg .EnableUDNMapping )
210218 limiter := flow .NewCapacityLimiter (m )
211219
@@ -222,6 +230,7 @@ func flowsAgent(
222230 informer : informer ,
223231 promoServer : promoServer ,
224232 metrics : m ,
233+ rbSSLTracer : rbSSLTracer ,
225234 }, nil
226235}
227236
@@ -392,6 +401,10 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
392401 alog .Debug ("connecting flows processing graph" )
393402 mapTracer := node .AsStart (f .mapTracer .TraceLoop (ctx , f .cfg .ForceGC ))
394403 rbTracer := node .AsStart (f .rbTracer .TraceLoop (ctx ))
404+ var rbSSLTracer * node.Start [* model.RawRecord ]
405+ if f .cfg .EnableSSL {
406+ rbSSLTracer = node .AsStart (f .rbSSLTracer .TraceLoop (ctx ))
407+ }
395408
396409 accounter := node .AsMiddle (f .accounter .Account ,
397410 node .ChannelBufferLen (f .cfg .BuffersLength ))
@@ -408,6 +421,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
408421 node .ChannelBufferLen (ebl ))
409422
410423 rbTracer .SendsTo (accounter )
424+ if rbSSLTracer != nil {
425+ rbSSLTracer .SendsTo (accounter )
426+ }
411427
412428 mapTracer .SendsTo (limiter )
413429 accounter .SendsTo (limiter )
@@ -416,6 +432,9 @@ func (f *Flows) buildAndStartPipeline(ctx context.Context) (*node.Terminal[[]*mo
416432 alog .Debug ("starting graph" )
417433 mapTracer .Start ()
418434 rbTracer .Start ()
435+ if rbSSLTracer != nil {
436+ rbSSLTracer .Start ()
437+ }
419438 return export , nil
420439}
421440
0 commit comments