Skip to content

Commit cc23366

Browse files
authored
feat: filter ordinals (fluent#1386)
* feat: filter ordinals Implements fluent#1384 Signed-off-by: Zoltán Reegn <[email protected]> * Fix tests Signed-off-by: Zoltán Reegn <[email protected]> --------- Signed-off-by: Zoltán Reegn <[email protected]>
1 parent 92df045 commit cc23366

13 files changed

+288
-110
lines changed

apis/fluentbit/v1alpha2/clusterfilter_types.go

+18-45
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ type FilterSpec struct {
4444
LogLevel string `json:"logLevel,omitempty"`
4545
// A set of filter plugins in order.
4646
FilterItems []FilterItem `json:"filters,omitempty"`
47+
// An ordinal to influence filter ordering
48+
Ordinal int32 `json:"ordinal,omitempty"`
4749
}
4850

4951
type FilterItem struct {
@@ -101,17 +103,25 @@ type ClusterFilterList struct {
101103

102104
// +kubebuilder:object:generate:=false
103105

104-
// FilterByName implements sort.Interface for []ClusterFilter based on the Name field.
105-
type FilterByName []ClusterFilter
106-
107-
func (a FilterByName) Len() int { return len(a) }
108-
func (a FilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
109-
func (a FilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
106+
// FilterByOrdinalAndName implements sort.Interface for []ClusterFilter based on the Ordinal and Name field.
107+
type FilterByOrdinalAndName []ClusterFilter
108+
109+
func (a FilterByOrdinalAndName) Len() int { return len(a) }
110+
func (a FilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
111+
func (a FilterByOrdinalAndName) Less(i, j int) bool {
112+
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
113+
return true
114+
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
115+
return a[i].Name < a[j].Name
116+
} else {
117+
return false
118+
}
119+
}
110120

111121
func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
112122
var buf bytes.Buffer
113123

114-
sort.Sort(FilterByName(list.Items))
124+
sort.Sort(FilterByOrdinalAndName(list.Items))
115125

116126
for _, item := range list.Items {
117127
merge := func(p plugins.Plugin) error {
@@ -156,7 +166,7 @@ func (list ClusterFilterList) Load(sl plugins.SecretLoader) (string, error) {
156166
func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
157167
var buf bytes.Buffer
158168

159-
sort.Sort(FilterByName(list.Items))
169+
sort.Sort(FilterByOrdinalAndName(list.Items))
160170
if len(list.Items) == 0 {
161171
return "", nil
162172
}
@@ -202,43 +212,6 @@ func (list ClusterFilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (st
202212
return buf.String(), nil
203213
}
204214

205-
func (clusterFilter ClusterFilter) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
206-
var buf bytes.Buffer
207-
padding := utils.YamlIndent(depth + 2)
208-
merge := func(p plugins.Plugin) error {
209-
if p == nil || reflect.ValueOf(p).IsNil() {
210-
return nil
211-
}
212-
213-
if p.Name() != "" {
214-
buf.WriteString(fmt.Sprintf("%s- name: %s\n", utils.YamlIndent(depth+1), p.Name()))
215-
}
216-
if clusterFilter.Spec.LogLevel != "" {
217-
buf.WriteString(fmt.Sprintf("%slog_level: %s\n", padding, clusterFilter.Spec.LogLevel))
218-
}
219-
if clusterFilter.Spec.Match != "" {
220-
buf.WriteString(fmt.Sprintf("%smatch: \"%s\"\n", padding, clusterFilter.Spec.Match))
221-
}
222-
if clusterFilter.Spec.MatchRegex != "" {
223-
buf.WriteString(fmt.Sprintf("%smatch_regex: %s\n", padding, clusterFilter.Spec.MatchRegex))
224-
}
225-
kvs, err := p.Params(sl)
226-
if err != nil {
227-
return err
228-
}
229-
buf.WriteString(kvs.YamlString(depth + 2))
230-
return nil
231-
}
232-
for _, elem := range clusterFilter.Spec.FilterItems {
233-
for i := 0; i < reflect.ValueOf(elem).NumField(); i++ {
234-
p, _ := reflect.ValueOf(elem).Field(i).Interface().(plugins.Plugin)
235-
if err := merge(p); err != nil {
236-
return "", err
237-
}
238-
}
239-
}
240-
return buf.String(), nil
241-
}
242215
func init() {
243216
SchemeBuilder.Register(&ClusterFilter{}, &ClusterFilterList{})
244217
}

apis/fluentbit/v1alpha2/clusterfilter_types_test.go

+111-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func TestClusterFilterList_Load(t *testing.T) {
13-
var filtersExpected = `[Filter]
13+
filtersExpected := `[Filter]
1414
Name modify
1515
Match logs.foo.bar
1616
Condition Key_value_equals kve0 kvev0
@@ -185,8 +185,115 @@ func TestClusterFilterList_Load(t *testing.T) {
185185
}
186186
}
187187

188+
func TestClusterFilterList_Load_With_Ordinals(t *testing.T) {
189+
filtersExpected := `[Filter]
190+
Name grep
191+
Match *
192+
Alias first
193+
Regex ^.*$
194+
[Filter]
195+
Name grep
196+
Match *
197+
Alias second
198+
Regex ^.*$
199+
[Filter]
200+
Name grep
201+
Match *
202+
Alias third
203+
Regex ^.*$
204+
`
205+
206+
g := NewGomegaWithT(t)
207+
sl := plugins.NewSecretLoader(nil, "testnamespace")
208+
209+
filterObj1 := &ClusterFilter{
210+
TypeMeta: metav1.TypeMeta{
211+
APIVersion: "fluentbit.fluent.io/v1alpha2",
212+
Kind: "ClusterFilter",
213+
},
214+
ObjectMeta: metav1.ObjectMeta{
215+
Name: "first",
216+
},
217+
Spec: FilterSpec{
218+
Match: "*",
219+
FilterItems: []FilterItem{
220+
{
221+
Grep: &filter.Grep{
222+
CommonParams: plugins.CommonParams{
223+
Alias: "second",
224+
},
225+
Regex: "^.*$",
226+
},
227+
},
228+
},
229+
},
230+
}
231+
232+
filterObj2 := &ClusterFilter{
233+
TypeMeta: metav1.TypeMeta{
234+
APIVersion: "fluentbit.fluent.io/v1alpha2",
235+
Kind: "ClusterFilter",
236+
},
237+
ObjectMeta: metav1.ObjectMeta{
238+
Name: "second",
239+
},
240+
Spec: FilterSpec{
241+
Ordinal: 10,
242+
Match: "*",
243+
FilterItems: []FilterItem{
244+
{
245+
Grep: &filter.Grep{
246+
CommonParams: plugins.CommonParams{
247+
Alias: "third",
248+
},
249+
Regex: "^.*$",
250+
},
251+
},
252+
},
253+
},
254+
}
255+
256+
filterObj3 := &ClusterFilter{
257+
TypeMeta: metav1.TypeMeta{
258+
APIVersion: "fluentbit.fluent.io/v1alpha2",
259+
Kind: "ClusterFilter",
260+
},
261+
ObjectMeta: metav1.ObjectMeta{
262+
Name: "third",
263+
},
264+
Spec: FilterSpec{
265+
Ordinal: -10,
266+
Match: "*",
267+
FilterItems: []FilterItem{
268+
{
269+
Grep: &filter.Grep{
270+
CommonParams: plugins.CommonParams{
271+
Alias: "first",
272+
},
273+
Regex: "^.*$",
274+
},
275+
},
276+
},
277+
},
278+
}
279+
280+
filters := ClusterFilterList{
281+
Items: []ClusterFilter{*filterObj1, *filterObj2, *filterObj3},
282+
}
283+
284+
i := 0
285+
for i < 5 {
286+
clusterFilters, err := filters.Load(sl)
287+
g.Expect(err).NotTo(HaveOccurred())
288+
289+
g.Expect(clusterFilters).To(Equal(filtersExpected))
290+
291+
i++
292+
}
293+
}
294+
188295
func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
189-
var filtersExpected = `[Filter]
296+
filtersExpected := `[Filter]
190297
Name record_modifier
191298
Match logs.foo.bar
192299
Record hostname ${HOSTNAME}
@@ -260,7 +367,7 @@ func TestClusterFilter_RecordModifier_Generated(t *testing.T) {
260367
}
261368

262369
func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
263-
var filtersExpected = `filters:
370+
filtersExpected := `filters:
264371
- name: modify
265372
match: "logs.foo.bar"
266373
condition:
@@ -438,7 +545,7 @@ func TestClusterFilterList_Load_As_Yaml(t *testing.T) {
438545
}
439546

440547
func TestClusterFilter_RecordModifier_Generated_Load_As_Yaml(t *testing.T) {
441-
var filtersExpected = `filters:
548+
filtersExpected := `filters:
442549
- name: record_modifier
443550
match: "logs.foo.bar"
444551
record:

apis/fluentbit/v1alpha2/clusterfluentbitconfig_types.go

-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"bytes"
2121
"crypto/md5"
2222
"fmt"
23-
2423
"sort"
2524

2625
"github.com/fluent/fluent-operator/v3/apis/fluentbit/v1alpha2/plugins"

apis/fluentbit/v1alpha2/filter_types.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,24 @@ type FilterList struct {
5151
Items []Filter `json:"items"`
5252
}
5353

54-
type NSFilterByName []Filter
55-
56-
func (a NSFilterByName) Len() int { return len(a) }
57-
func (a NSFilterByName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
58-
func (a NSFilterByName) Less(i, j int) bool { return a[i].Name < a[j].Name }
54+
type NSFilterByOrdinalAndName []Filter
55+
56+
func (a NSFilterByOrdinalAndName) Len() int { return len(a) }
57+
func (a NSFilterByOrdinalAndName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
58+
func (a NSFilterByOrdinalAndName) Less(i, j int) bool {
59+
if a[i].Spec.Ordinal < a[j].Spec.Ordinal {
60+
return true
61+
} else if a[i].Spec.Ordinal == a[j].Spec.Ordinal {
62+
return a[i].Name < a[j].Name
63+
} else {
64+
return false
65+
}
66+
}
5967

6068
func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
6169
var buf bytes.Buffer
6270

63-
sort.Sort(NSFilterByName(list.Items))
71+
sort.Sort(NSFilterByOrdinalAndName(list.Items))
6472

6573
for _, item := range list.Items {
6674
merge := func(p plugins.Plugin) error {
@@ -108,7 +116,7 @@ func (list FilterList) Load(sl plugins.SecretLoader) (string, error) {
108116
func (list FilterList) LoadAsYaml(sl plugins.SecretLoader, depth int) (string, error) {
109117
var buf bytes.Buffer
110118

111-
sort.Sort(NSFilterByName(list.Items))
119+
sort.Sort(NSFilterByOrdinalAndName(list.Items))
112120
padding := utils.YamlIndent(depth + 2)
113121
for _, item := range list.Items {
114122
merge := func(p plugins.Plugin) error {

0 commit comments

Comments
 (0)