Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,30 @@ When a group reaches its limit and as long as it is not reset, a warning
message with the current log rate of the group is emitted repeatedly. This is
the delay between every repetition.

#### ignore

Default: `none`

Define which records you want to ignore, you should specify key and regex to filter by.

Example:

```
<ignore>
key app.version
regex /^(2|3)$/
</ignore>
```

A dot indicates a key within a sub-object. As an example, in the following log,
the group key resolve to "2":
```
{"level": "error", "msg": "plugin test", "app": { "version": "2" } }
```

Will not take into throttling bucket calculations records that has version 2 or 3,
They will just pass-through.

## License

Apache License, Version 2.0
Expand Down
40 changes: 36 additions & 4 deletions lib/fluent/plugin/filter_throttle.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ class ThrottleFilter < Filter
This is the delay between every repetition.
DESC
config_param :group_warning_delay_s, :integer, :default => 10

desc <<~DESC
Defines records which should be excluded from the throttling counters
DESC
config_section :ignore, param_name: :ignores, multi: true do
desc "The field name to which the regular expression is applied"
config_param :key do |value|
value.split(".")
end
desc "The regular expression"
config_param :regex do |value|
if value.start_with?("/") && value.end_with?("/")
Regexp.compile(value[1..-2])
else
Regexp.compile(value)
end
end
end


Group = Struct.new(
:rate_count,
Expand Down Expand Up @@ -80,9 +99,18 @@ def shutdown
end

def filter(tag, time, record)
unless @ignores.empty?
@ignores.each { |ignore|
keysValue = extract_value_from_key_path(ignore.key, record)
if keysValue != nil && ignore.regex.match(keysValue.to_s)
return record
end
}
end

now = Time.now
rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
group = extract_group(record)
group = extract_value_from_key_paths(@group_key_paths, record)
counter = (@counters[group] ||= Group.new(0, now, 0, 0, now, nil))
counter.rate_count += 1

Expand Down Expand Up @@ -133,12 +161,16 @@ def filter(tag, time, record)

private

def extract_group(record)
@group_key_paths.map do |key_path|
record.dig(*key_path) || record.dig(*key_path.map(&:to_sym))
def extract_value_from_key_paths(paths, record)
paths.map do |key_path|
extract_value_from_key_path(key_path, record)
end
end

def extract_value_from_key_path(path, record)
record.dig(*path) || record.dig(*path.map(&:to_sym))
end

def log_rate_limit_exceeded(now, group, counter)
emit = counter.last_warning == nil ? true \
: (now - counter.last_warning) >= @group_warning_delay_s
Expand Down
87 changes: 87 additions & 0 deletions test/fluent/plugin/filter_throttle_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,93 @@ def create_driver(conf='')
assert_equal records_expected, driver.filtered_records.size
assert driver.logs.any? { |log| log.include?('rate exceeded') }
assert driver.logs.any? { |log| log.include?('rate back down') }

end

it 'does not throttle when log includes the key to ignore' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 15
<ignore>
key level
regex /^([Ii]nfo|[Ii]nformation|[Dd]ebug)$/
</ignore>
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test lower cased i", "level": "info", "group": "a"}]] * 10)
driver.feed([[event_time, {"msg": "test capital I", "level": "Info", "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "level": "information", "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test capital I", "level": "Information", "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test", "level": "error", "group": "a"}]] * 20)
driver.feed([[event_time, {"msg": "test", "level": "error", "group": "b"}]] * 20)
end

assert_equal(70, driver.filtered_records.compact.length) # compact remove nils
end

it 'does not throttle when log includes the nested key to ignore' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 15
<ignore>
key app.version
regex /^(2|3)$/
</ignore>
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
end

assert_equal(50, driver.filtered_records.compact.length) # compact remove nils
end

it 'does not throttle when nested key to ignore does not exists' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 15
<ignore>
key app.author
regex /^(john|doe)$/
</ignore>
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
end

assert_equal(30, driver.filtered_records.compact.length) # compact remove nils
end

it 'does not throttle when key to ignore does not exists' do
driver = create_driver <<~CONF
group_key "group"
group_bucket_period_s 1
group_bucket_limit 15
<ignore>
key testKey
regex /^(test|test2)$/
</ignore>
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 2}, "group": "a"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 3}, "group": "b"}]] * 10)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 4}, "group": "a"}]] * 20)
driver.feed([[event_time, {"msg": "test lower cased i", "app": {"version": 5}, "group": "b"}]] * 20)
end

assert_equal(30, driver.filtered_records.compact.length) # compact remove nils
end
end

Expand Down