Skip to content

NETOBSERV-2186: Set sampling in flows #931

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/api/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package api

type TransformFilter struct {
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
Rules []TransformFilterRule `yaml:"rules,omitempty" json:"rules,omitempty" doc:"list of filter rules, each includes:"`
SamplingField string `yaml:"samplingField,omitempty" json:"samplingField,omitempty" doc:"sampling field name to be set when sampling is used; if the field already exists in flows, its value is multiplied with the new sampling"`
}

func (tf *TransformFilter) Preprocess() {
Expand Down
45 changes: 36 additions & 9 deletions pkg/pipeline/transform/transform_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ var (
)

type Filter struct {
Rules []api.TransformFilterRule
KeepRules []predicateRule
Rules []api.TransformFilterRule
KeepRules []predicateRule
SamplingField string
}

type predicateRule struct {
Expand All @@ -56,7 +57,7 @@ func (f *Filter) Transform(entry config.GenericMap) (config.GenericMap, bool) {
if len(f.KeepRules) > 0 {
keep := false
for _, r := range f.KeepRules {
if applyPredicate(outputEntry, r) {
if applyKeepPredicate(outputEntry, r, f.SamplingField) {
keep = true
break
}
Expand Down Expand Up @@ -182,11 +183,21 @@ func isRemoveEntrySatisfied(entry config.GenericMap, rules []*api.RemoveEntryRul
return true
}

func applyPredicate(entry config.GenericMap, rule predicateRule) bool {
if !rollSampling(rule.sampling) {
return false
// Returns true if flow must be kept. If sampling is configured, set the Sampling field on that flow.
func applyKeepPredicate(entry config.GenericMap, rule predicateRule, samplingField string) bool {
if rule.predicate(entry) {
if rule.sampling > 0 {
if rollSampling(rule.sampling) {
if len(samplingField) > 0 {
storeSampling(entry, int(rule.sampling), samplingField)
}
return true // predicate true and sampled-in
}
return false // sampled-out
}
return true // predicate true / no sampling
}
return rule.predicate(entry)
return false // predicate false
}

func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
Expand All @@ -202,12 +213,27 @@ func rollSampling(value uint16) bool {
return value == 0 || (rndgen.Intn(int(value)) == 0)
}

func storeSampling(entry config.GenericMap, value int, samplingField string) {
if current, found := entry[samplingField]; found {
if cast, err := utils.ConvertToInt(current); err == nil {
if cast == 0 {
cast = 1
}
entry[samplingField] = cast * value
}
} else {
entry[samplingField] = value
}
}

// NewTransformFilter create a new filter transform
func NewTransformFilter(params config.StageParam) (Transformer, error) {
tlog.Debugf("entering NewTransformFilter")
keepRules := []predicateRule{}
rules := []api.TransformFilterRule{}
var samplingField string
if params.Transform != nil && params.Transform.Filter != nil {
samplingField = params.Transform.Filter.SamplingField
params.Transform.Filter.Preprocess()
for i := range params.Transform.Filter.Rules {
baseRules := &params.Transform.Filter.Rules[i]
Expand All @@ -226,8 +252,9 @@ func NewTransformFilter(params config.StageParam) (Transformer, error) {
}
}
transformFilter := &Filter{
Rules: rules,
KeepRules: keepRules,
Rules: rules,
KeepRules: keepRules,
SamplingField: samplingField,
}
return transformFilter, nil
}
23 changes: 23 additions & 0 deletions pkg/pipeline/transform/transform_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,7 @@ func Test_Transform_KeepEntry(t *testing.T) {

func Test_Transform_KeepEntrySampling(t *testing.T) {
newFilter := api.TransformFilter{
SamplingField: "sampling",
Rules: []api.TransformFilterRule{
{
Type: api.KeepEntryQuery,
Expand All @@ -724,6 +725,11 @@ func Test_Transform_KeepEntrySampling(t *testing.T) {
Type: api.KeepEntryQuery,
KeepEntryQuery: `namespace="B"`,
},
{
Type: api.KeepEntryQuery,
KeepEntryQuery: `namespace="C"`,
KeepEntrySampling: 10,
},
},
}

Expand All @@ -738,23 +744,40 @@ func Test_Transform_KeepEntrySampling(t *testing.T) {
input = append(input, config.GenericMap{
"namespace": "B",
})
input = append(input, config.GenericMap{
"namespace": "C",
"sampling": 50, // First-pass sampling already exists
})
}

countA := 0
countB := 0
countC := 0

for _, flow := range input {
if out, ok := tf.Transform(flow); ok {
switch out["namespace"] {
case "A":
countA++
sampling, ok := out["sampling"]
assert.True(t, ok)
assert.Equal(t, 10, sampling)
case "B":
countB++
_, ok := out["sampling"]
assert.False(t, ok)
case "C":
countC++
sampling, ok := out["sampling"]
assert.True(t, ok)
assert.Equal(t, 500, sampling)
}
}
}

assert.Less(t, countA, 300)
assert.Greater(t, countA, 30)
assert.Equal(t, countB, 1000)
assert.Less(t, countC, 300)
assert.Greater(t, countC, 30)
}