Skip to content

Commit 0abb576

Browse files
committed
NETOBSERV-2186: Set sampling in flows
1 parent bd55b45 commit 0abb576

File tree

1 file changed

+24
-1
lines changed

1 file changed

+24
-1
lines changed

pkg/pipeline/transform/transform_filter.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,13 +190,22 @@ func applyPredicates(entry config.GenericMap, rule predicatesRule) bool {
190190
return false
191191
}
192192
}
193+
if rule.sampling > 0 {
194+
storeSampling(entry, int(rule.sampling))
195+
}
193196
return true
194197
}
195198

196199
func sample(entry config.GenericMap, rules []*api.SamplingCondition) bool {
197200
for _, r := range rules {
198201
if isRemoveEntrySatisfied(entry, r.Rules) {
199-
return rollSampling(r.Value)
202+
if rollSampling(r.Value) {
203+
if r.Value > 0 {
204+
storeSampling(entry, int(r.Value))
205+
}
206+
return true
207+
}
208+
return false
200209
}
201210
}
202211
return true
@@ -206,6 +215,20 @@ func rollSampling(value uint16) bool {
206215
return value == 0 || (rndgen.Intn(int(value)) == 0)
207216
}
208217

218+
func storeSampling(entry config.GenericMap, value int) {
219+
// TODO: make "Sampling" string configurable, and configured from the operator.
220+
if current, found := entry["Sampling"]; found {
221+
if cast, err := utils.ConvertToInt(current); err != nil {
222+
if cast == 0 {
223+
cast = 1
224+
}
225+
entry["Sampling"] = cast * value
226+
}
227+
} else {
228+
entry["Sampling"] = value
229+
}
230+
}
231+
209232
// NewTransformFilter create a new filter transform
210233
func NewTransformFilter(params config.StageParam) (Transformer, error) {
211234
tlog.Debugf("entering NewTransformFilter")

0 commit comments

Comments
 (0)