Skip to content

Commit

Permalink
inlet/flow: update to latest version of GoFlow2
Browse files Browse the repository at this point in the history
There is no change in performance:

```
goos: linux
goarch: amd64
pkg: akvorado/inlet/flow
cpu: AMD Ryzen 5 5600X 6-Core Processor
BenchmarkDecodeEncodeNetflow/with_encoding-12             155505              8059 ns/op            8178 B/op        130 allocs/op
BenchmarkDecodeEncodeNetflow/without_encoding-12          147974              7554 ns/op            8178 B/op        130 allocs/op
BenchmarkDecodeEncodeSflow/with_encoding-12               126746              9463 ns/op            7200 B/op         90 allocs/op
BenchmarkDecodeEncodeSflow/without_encoding-12            140703              8686 ns/op            7200 B/op         90 allocs/op
```
  • Loading branch information
vincentbernat committed Dec 12, 2023
1 parent 4f043c5 commit 71b20f3
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 85 deletions.
2 changes: 1 addition & 1 deletion demoexporter/flows/nftemplates.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"akvorado/common/helpers"

"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/v2/decoders/netflow"
)

type flowFamilySettings struct {
Expand Down
2 changes: 1 addition & 1 deletion demoexporter/flows/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

package flows

import "github.com/netsampler/goflow2/decoders/netflow"
import "github.com/netsampler/goflow2/v2/decoders/netflow"

type nfv9Header struct {
Version uint16
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ require (
github.com/kylelemons/godebug v1.1.0
github.com/mattn/go-isatty v0.0.20
github.com/mitchellh/mapstructure v1.5.0
github.com/netsampler/goflow2 v1.1.1-0.20221008154147-57fad2e0c837
github.com/netsampler/goflow2/v2 v2.0.1-0.20231209195630-1a5d58e3cf2f
github.com/opencontainers/image-spec v1.1.0-rc5
github.com/oschwald/maxminddb-golang v1.12.0
github.com/osrg/gobgp/v3 v3.21.0
github.com/prometheus/client_golang v1.17.0
Expand Down Expand Up @@ -113,7 +114,6 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0-rc5 // indirect
github.com/paulmach/orb v0.10.0 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
Expand Down Expand Up @@ -154,4 +154,4 @@ require (
modernc.org/sqlite v1.23.1 // indirect
)

replace github.com/netsampler/goflow2 => github.com/vincentbernat/goflow2 v1.0.5-0.20231030050810-c2707ac87e7a
replace github.com/netsampler/goflow2/v2 => github.com/vincentbernat/goflow2/v2 v2.0.0-20231212205418-5bbb9b608ddf
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli/v2 v2.1.1/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/vincentbernat/goflow2 v1.0.5-0.20231030050810-c2707ac87e7a h1:7cU4DA2flQDqDUswK0cIJzbLPXzu+y/rGC8zdNpnuLo=
github.com/vincentbernat/goflow2 v1.0.5-0.20231030050810-c2707ac87e7a/go.mod h1:4UZsVGVAs//iMCptUHn3WNScztJeUhZH7kDW2+/vDdQ=
github.com/vincentbernat/goflow2/v2 v2.0.0-20231212205418-5bbb9b608ddf h1:22fCie695dIdfawLorVvBofR/gkL4/6ttNDMaIPKB1c=
github.com/vincentbernat/goflow2/v2 v2.0.0-20231212205418-5bbb9b608ddf/go.mod h1:ukZvnBygXBdj29ujoFNiuVZ5xUxJC5DbWLziR8bnvEE=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
Expand Down
36 changes: 19 additions & 17 deletions inlet/flow/decoder/netflow/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,30 @@ import (
"akvorado/common/schema"
"akvorado/inlet/flow/decoder"

"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/v2/decoders/netflow"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"
)

func (nd *Decoder) decode(msgDec interface{}, samplingRateSys producer.SamplingRateSystem) []*schema.FlowMessage {
func (nd *Decoder) decodeIPFIX(packet netflow.IPFIXPacket, samplingRateSys protoproducer.SamplingRateSystem) []*schema.FlowMessage {
dataFlowSet, _, _, optionsDataFlowSet := protoproducer.SplitIPFIXSets(packet)
obsDomainID := packet.ObservationDomainId
return nd.decodeCommon(obsDomainID, dataFlowSet, optionsDataFlowSet, samplingRateSys)
}

func (nd *Decoder) decodeNFv9(packet netflow.NFv9Packet, samplingRateSys protoproducer.SamplingRateSystem) []*schema.FlowMessage {
dataFlowSet, _, _, optionsDataFlowSet := protoproducer.SplitNetFlowSets(packet)
obsDomainID := packet.SourceId
return nd.decodeCommon(obsDomainID, dataFlowSet, optionsDataFlowSet, samplingRateSys)
}

func (nd *Decoder) decodeCommon(obsDomainID uint32, dataFlowSet []netflow.DataFlowSet, optionsDataFlowSet []netflow.OptionsDataFlowSet, samplingRateSys protoproducer.SamplingRateSystem) []*schema.FlowMessage {
flowMessageSet := []*schema.FlowMessage{}
var obsDomainID uint32
var dataFlowSet []netflow.DataFlowSet
var optionsDataFlowSet []netflow.OptionsDataFlowSet
switch msgDecConv := msgDec.(type) {
case netflow.NFv9Packet:
dataFlowSet, _, _, optionsDataFlowSet = producer.SplitNetFlowSets(msgDecConv)
obsDomainID = msgDecConv.SourceId
case netflow.IPFIXPacket:
dataFlowSet, _, _, optionsDataFlowSet = producer.SplitIPFIXSets(msgDecConv)
obsDomainID = msgDecConv.ObservationDomainId
default:
return nil
}

// Get sampling rate
samplingRate, found := producer.SearchNetFlowOptionDataSets(optionsDataFlowSet)
samplingRate, found, err := protoproducer.SearchNetFlowOptionDataSets(optionsDataFlowSet)
if err != nil {
return nil
}
if samplingRateSys != nil {
if found {
samplingRateSys.AddSamplingRate(10, obsDomainID, samplingRate)
Expand Down
59 changes: 35 additions & 24 deletions inlet/flow/decoder/netflow/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"strconv"
"sync"

"github.com/netsampler/goflow2/decoders/netflow"
"github.com/netsampler/goflow2/producer"
"github.com/netsampler/goflow2/v2/decoders/netflow"
protoproducer "github.com/netsampler/goflow2/v2/producer/proto"

"akvorado/common/reporter"
"akvorado/common/schema"
Expand All @@ -26,7 +26,7 @@ type Decoder struct {
// Templates and sampling systems
systemsLock sync.RWMutex
templates map[string]*templateSystem
sampling map[string]producer.SamplingRateSystem
sampling map[string]protoproducer.SamplingRateSystem

metrics struct {
errors *reporter.CounterVec
Expand All @@ -43,7 +43,7 @@ func New(r *reporter.Reporter, dependencies decoder.Dependencies) decoder.Decode
r: r,
d: dependencies,
templates: map[string]*templateSystem{},
sampling: map[string]producer.SamplingRateSystem{},
sampling: map[string]protoproducer.SamplingRateSystem{},
}

nd.metrics.errors = nd.r.CounterVec(
Expand Down Expand Up @@ -88,16 +88,15 @@ func New(r *reporter.Reporter, dependencies decoder.Dependencies) decoder.Decode
type templateSystem struct {
nd *Decoder
key string
templates *netflow.BasicTemplateSystem
templates netflow.NetFlowTemplateSystem
}

func (s *templateSystem) AddTemplate(version uint16, obsDomainID uint32, template interface{}) {
s.templates.AddTemplate(version, obsDomainID, template)
func (s *templateSystem) AddTemplate(version uint16, obsDomainID uint32, templateID uint16, template interface{}) error {
if err := s.templates.AddTemplate(version, obsDomainID, templateID, template); err != nil {
return nil
}

var (
templateID uint16
typeStr string
)
var typeStr string
switch templateIDConv := template.(type) {
case netflow.IPFIXOptionsTemplateRecord:
templateID = templateIDConv.TemplateId
Expand All @@ -117,12 +116,17 @@ func (s *templateSystem) AddTemplate(version uint16, obsDomainID uint32, templat
strconv.Itoa(int(templateID)),
typeStr,
).Inc()
return nil
}

func (s *templateSystem) GetTemplate(version uint16, obsDomainID uint32, templateID uint16) (interface{}, error) {
return s.templates.GetTemplate(version, obsDomainID, templateID)
}

func (s *templateSystem) RemoveTemplate(version uint16, obsDomainID uint32, templateID uint16) (interface{}, error) {
return s.templates.RemoveTemplate(version, obsDomainID, templateID)
}

// Decode decodes a Netflow payload.
func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
key := in.Source.String()
Expand All @@ -141,19 +145,22 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
nd.systemsLock.Unlock()
}
if !sok {
sampling = producer.CreateSamplingSystem()
sampling = protoproducer.CreateSamplingSystem()
nd.systemsLock.Lock()
nd.sampling[key] = sampling
nd.systemsLock.Unlock()
}

ts := uint64(in.TimeReceived.UTC().Unix())
buf := bytes.NewBuffer(in.Payload)
msgDec, err := netflow.DecodeMessage(buf, templates)
if err != nil {
var (
packetNFv9 netflow.NFv9Packet
packetIPFIX netflow.IPFIXPacket
)
if err := netflow.DecodeMessageVersion(buf, templates, &packetNFv9, &packetIPFIX); err != nil {
switch err.(type) {
case *netflow.ErrorTemplateNotFound:
nd.metrics.errors.WithLabelValues(key, "template not found").Inc()
case *netflow.DecoderError:
nd.metrics.errors.WithLabelValues(key, err.Error()).Inc()
default:
nd.metrics.errors.WithLabelValues(key, "error decoding").Inc()
}
Expand All @@ -166,14 +173,13 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
)

// Update some stats
switch msgDecConv := msgDec.(type) {
case netflow.IPFIXPacket:
version = "10"
flowSets = msgDecConv.FlowSets
case netflow.NFv9Packet:
if packetNFv9.Version == 9 {
version = "9"
flowSets = msgDecConv.FlowSets
default:
flowSets = packetNFv9.FlowSets
} else if packetIPFIX.Version == 10 {
version = "10"
flowSets = packetIPFIX.FlowSets
} else {
nd.metrics.stats.WithLabelValues(key, "unknown").
Inc()
return nil
Expand Down Expand Up @@ -209,7 +215,12 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
}
}

flowMessageSet := nd.decode(msgDec, sampling)
var flowMessageSet []*schema.FlowMessage
if packetNFv9.Version == 9 {
flowMessageSet = nd.decodeNFv9(packetNFv9, sampling)
} else if packetIPFIX.Version == 10 {
flowMessageSet = nd.decodeIPFIX(packetIPFIX, sampling)
}
exporterAddress, _ := netip.AddrFromSlice(in.Source.To16())
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
Expand Down
34 changes: 14 additions & 20 deletions inlet/flow/decoder/sflow/decode.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@ import (
"akvorado/common/schema"
"akvorado/inlet/flow/decoder"

"github.com/netsampler/goflow2/decoders/sflow"
"github.com/netsampler/goflow2/v2/decoders/sflow"
)

func (nd *Decoder) decode(msgDec interface{}) []*schema.FlowMessage {
func (nd *Decoder) decode(packet sflow.Packet) []*schema.FlowMessage {
flowMessageSet := []*schema.FlowMessage{}
switch msgDec.(type) {
case sflow.Packet:
default:
return nil
}
packet := msgDec.(sflow.Packet)

for _, flowSample := range packet.Samples {
var records []sflow.FlowRecord
Expand Down Expand Up @@ -87,21 +81,21 @@ func (nd *Decoder) decode(msgDec interface{}) []*schema.FlowMessage {
}
}
case sflow.SampledIPv4:
bf.SrcAddr = decoder.DecodeIP(recordData.Base.SrcIP)
bf.DstAddr = decoder.DecodeIP(recordData.Base.DstIP)
l3length = uint64(recordData.Base.Length)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
bf.SrcAddr = decoder.DecodeIP(recordData.SrcIP)
bf.DstAddr = decoder.DecodeIP(recordData.DstIP)
l3length = uint64(recordData.Length)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.DstPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv4)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnIPTos, uint64(recordData.Tos))
case sflow.SampledIPv6:
bf.SrcAddr = decoder.DecodeIP(recordData.Base.SrcIP)
bf.DstAddr = decoder.DecodeIP(recordData.Base.DstIP)
l3length = uint64(recordData.Base.Length)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Base.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.Base.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.Base.DstPort))
bf.SrcAddr = decoder.DecodeIP(recordData.SrcIP)
bf.DstAddr = decoder.DecodeIP(recordData.DstIP)
l3length = uint64(recordData.Length)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnProto, uint64(recordData.Protocol))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnSrcPort, uint64(recordData.SrcPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnDstPort, uint64(recordData.DstPort))
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnEType, helpers.ETypeIPv6)
nd.d.Schema.ProtobufAppendVarint(bf, schema.ColumnIPTos, uint64(recordData.Priority))
case sflow.SampledEthernet:
Expand Down
25 changes: 8 additions & 17 deletions inlet/flow/decoder/sflow/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"bytes"
"net"

"github.com/netsampler/goflow2/decoders/sflow"
"github.com/netsampler/goflow2/v2/decoders/sflow"

"akvorado/common/reporter"
"akvorado/common/schema"
Expand Down Expand Up @@ -85,30 +85,21 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
key := in.Source.String()

ts := uint64(in.TimeReceived.UTC().Unix())
msgDec, err := sflow.DecodeMessage(buf)
if err != nil {
var packet sflow.Packet
if err := sflow.DecodeMessageVersion(buf, &packet); err != nil {
switch err.(type) {
case *sflow.ErrorVersion:
nd.metrics.errors.WithLabelValues(key, "error version").Inc()
case *sflow.ErrorIPVersion:
nd.metrics.errors.WithLabelValues(key, "error ip version").Inc()
case *sflow.ErrorDataFormat:
nd.metrics.errors.WithLabelValues(key, "error data format").Inc()
case *sflow.DecoderError:
nd.metrics.errors.WithLabelValues(key, err.Error()).Inc()
default:
nd.metrics.errors.WithLabelValues(key, "error decoding").Inc()
}
return nil
}

// Update some stats
msgDecConv, ok := msgDec.(sflow.Packet)
if !ok {
nd.metrics.stats.WithLabelValues(key, "unknown", "unknwon").Inc()
return nil
}
agent := net.IP(msgDecConv.AgentIP).String()
agent := net.IP(packet.AgentIP).String()
version := "5"
samples := msgDecConv.Samples
samples := packet.Samples
nd.metrics.stats.WithLabelValues(key, agent, version).Inc()
for _, s := range samples {
switch sConv := s.(type) {
Expand All @@ -130,7 +121,7 @@ func (nd *Decoder) Decode(in decoder.RawFlow) []*schema.FlowMessage {
}
}

flowMessageSet := nd.decode(msgDec)
flowMessageSet := nd.decode(packet)
for _, fmsg := range flowMessageSet {
fmsg.TimeReceived = ts
}
Expand Down

0 comments on commit 71b20f3

Please sign in to comment.