Skip to content

Commit 15d3eba

Browse files
committed
Fix: watched files performance with huge filesets (logstash-plugins#268)
Plugin tracks all discovered files in a map-like collection (`FileWatch::WatchedFilesCollection`). The collection design was to re-sort all elements every time the collection gets updated ... this obviously does not scale with 10000s of watched files. Starting up LS with 60k files in a watched directory, locally, takes **\~ an hour before these changes**. **After these changes** for the plugin to start processing files it takes **~2 seconds**. Since the collection is known to have a noticeable (memory) footprint, there's changes towards reducing intermediate garbage - which also contributed to the decision to move the collection to native. Notable changes to tracking watched files with `FileWatch::WatchedFilesCollection`: - predictable `log(size)` modification (and access) times - implementation has clear locking semantics, previously operations weren't 100% atomic - hopefully, cleaner API as the collection re-resembles a *WatchedFile -> String (path)* map - the collection now needs an explicit update whenever a file changes (on re-stat), previous implicitness of picking up changes might have been the reasoning behind re-sorts (could be decoupled further by making `WatchedFile` immutable but there's other state anyway) --- Apart from the necessary changes to resolve the performance bottleneck, there's also a quick review of (trace) logging - annoying to not only see partial traces even in debug/trace levels. Few unused methods and instance variables were removed for a better code reading experience.
1 parent aec0be9 commit 15d3eba

26 files changed

+667
-304
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 4.2.0
2+
- Fix: watched files performance with huge filesets [#268](https://github.com/logstash-plugins/logstash-input-file/pull/268)
3+
- Updated logging to include full traces in debug (and trace) levels
4+
15
## 4.1.18
26
- Fix: release watched files on completion (in read-mode) [#271](https://github.com/logstash-plugins/logstash-input-file/pull/271)
37

lib/filewatch/discoverer.rb

+9-8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ class Discoverer
99
# associated with a sincedb entry if one can be found
1010
include LogStash::Util::Loggable
1111

12+
attr_reader :watched_files_collection
13+
1214
def initialize(watched_files_collection, sincedb_collection, settings)
1315
@watching = Concurrent::Array.new
1416
@exclude = Concurrent::Array.new
@@ -37,8 +39,7 @@ def can_exclude?(watched_file, new_discovery)
3739
@exclude.each do |pattern|
3840
if watched_file.pathname.basename.fnmatch?(pattern)
3941
if new_discovery
40-
logger.trace("Discoverer can_exclude?: #{watched_file.path}: skipping " +
41-
"because it matches exclude #{pattern}")
42+
logger.trace("skipping file because it matches exclude", :path => watched_file.path, :pattern => pattern)
4243
end
4344
watched_file.unwatch
4445
return true
@@ -56,13 +57,13 @@ def discover_files_ongoing(path)
5657
end
5758

5859
def discover_any_files(path, ongoing)
59-
fileset = Dir.glob(path).select{|f| File.file?(f)}
60-
logger.trace("discover_files", "count" => fileset.size)
60+
fileset = Dir.glob(path).select { |f| File.file?(f) }
61+
logger.trace("discover_files", :count => fileset.size)
6162
fileset.each do |file|
62-
pathname = Pathname.new(file)
6363
new_discovery = false
64-
watched_file = @watched_files_collection.watched_file_by_path(file)
64+
watched_file = @watched_files_collection.get(file)
6565
if watched_file.nil?
66+
pathname = Pathname.new(file)
6667
begin
6768
path_stat = PathStatClass.new(pathname)
6869
rescue Errno::ENOENT
@@ -74,7 +75,7 @@ def discover_any_files(path, ongoing)
7475
# if it already unwatched or its excluded then we can skip
7576
next if watched_file.unwatched? || can_exclude?(watched_file, new_discovery)
7677

77-
logger.trace("discover_files handling:", "new discovery"=> new_discovery, "watched_file details" => watched_file.details)
78+
logger.trace? && logger.trace("handling:", :new_discovery => new_discovery, :watched_file => watched_file.details)
7879

7980
if new_discovery
8081
watched_file.initial_completed if ongoing
@@ -86,7 +87,7 @@ def discover_any_files(path, ongoing)
8687
# associated with a different watched_file
8788
if @sincedb_collection.associate(watched_file)
8889
if watched_file.file_ignorable?
89-
logger.trace("Discoverer discover_files: #{file}: skipping because it was last modified more than #{@settings.ignore_older} seconds ago")
90+
logger.trace("skipping file because it was last modified more than #{@settings.ignore_older} seconds ago", :path => file)
9091
# on discovery ignorable watched_files are put into the ignored state and that
9192
# updates the size from the internal stat
9293
# so the existing contents are not read.

lib/filewatch/observing_base.rb

+1-2
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ def build_watch_and_dependencies
6262
@sincedb_collection = SincedbCollection.new(@settings)
6363
@sincedb_collection.open
6464
discoverer = Discoverer.new(watched_files_collection, @sincedb_collection, @settings)
65-
@watch = Watch.new(discoverer, watched_files_collection, @settings)
66-
@watch.add_processor build_specific_processor(@settings)
65+
@watch = Watch.new(discoverer, build_specific_processor(@settings), @settings)
6766
end
6867

6968
def watch_this(path)

lib/filewatch/processor.rb

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
# encoding: utf-8
2+
require "logstash/util/loggable"
3+
require 'concurrent/atomic/atomic_reference'
4+
5+
module FileWatch
6+
class Processor
7+
include LogStash::Util::Loggable
8+
9+
attr_reader :watch
10+
11+
def initialize(settings)
12+
@settings = settings
13+
@deletable_paths = Concurrent::AtomicReference.new []
14+
end
15+
16+
def add_watch(watch)
17+
@watch = watch
18+
self
19+
end
20+
21+
def clear_deletable_paths
22+
@deletable_paths.get_and_set []
23+
end
24+
25+
def add_deletable_path(path)
26+
@deletable_paths.get << path
27+
end
28+
29+
def restat(watched_file)
30+
changed = watched_file.restat!
31+
if changed
32+
# the collection (when sorted by modified_at) needs to re-sort every time watched-file is modified,
33+
# we can perform these update operation while processing files (stat interval) instead of having to
34+
# re-sort the whole collection every time an entry is accessed
35+
@watch.watched_files_collection.update(watched_file)
36+
end
37+
end
38+
39+
private
40+
41+
def error_details(error, watched_file)
42+
details = { :path => watched_file.path,
43+
:exception => error.class,
44+
:message => error.message,
45+
:backtrace => error.backtrace }
46+
if logger.debug?
47+
details[:file] = watched_file
48+
else
49+
details[:backtrace] = details[:backtrace].take(8) if details[:backtrace]
50+
end
51+
details
52+
end
53+
54+
end
55+
end

lib/filewatch/read_mode/handlers/base.rb

+8-6
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,21 @@ def handle_specifically(watched_file)
3434

3535
def open_file(watched_file)
3636
return true if watched_file.file_open?
37-
logger.trace("opening #{watched_file.path}")
37+
logger.trace? && logger.trace("opening", :path => watched_file.path)
3838
begin
3939
watched_file.open
40-
rescue
40+
rescue => e
4141
# don't emit this message too often. if a file that we can't
4242
# read is changing a lot, we'll try to open it more often, and spam the logs.
4343
now = Time.now.to_i
4444
logger.trace("opening OPEN_WARN_INTERVAL is '#{OPEN_WARN_INTERVAL}'")
4545
if watched_file.last_open_warning_at.nil? || now - watched_file.last_open_warning_at > OPEN_WARN_INTERVAL
46-
logger.warn("failed to open #{watched_file.path}: #{$!.inspect}, #{$!.backtrace.take(3)}")
46+
backtrace = e.backtrace
47+
backtrace = backtrace.take(3) if backtrace && !logger.debug?
48+
logger.warn("failed to open", :path => watched_file.path, :exception => e.class, :message => e.message, :backtrace => backtrace)
4749
watched_file.last_open_warning_at = now
4850
else
49-
logger.trace("suppressed warning for `failed to open` #{watched_file.path}: #{$!.inspect}")
51+
logger.trace("suppressed warning (failed to open)", :path => watched_file.path, :exception => e.class, :message => e.message)
5052
end
5153
watched_file.watch # set it back to watch so we can try it again
5254
end
@@ -75,7 +77,7 @@ def add_or_update_sincedb_collection(watched_file)
7577
watched_file.update_bytes_read(sincedb_value.position)
7678
else
7779
sincedb_value.set_watched_file(watched_file)
78-
logger.trace("add_or_update_sincedb_collection: switching from...", "watched_file details" => watched_file.details)
80+
logger.trace("add_or_update_sincedb_collection: switching from", :watched_file => watched_file.details)
7981
watched_file.rotate_from(existing_watched_file)
8082
end
8183

@@ -92,7 +94,7 @@ def update_existing_sincedb_collection_value(watched_file, sincedb_value)
9294
def add_new_value_sincedb_collection(watched_file)
9395
sincedb_value = SincedbValue.new(0)
9496
sincedb_value.set_watched_file(watched_file)
95-
logger.trace("add_new_value_sincedb_collection: #{watched_file.path}", "position" => sincedb_value.position)
97+
logger.trace("add_new_value_sincedb_collection:", :path => watched_file.path, :position => sincedb_value.position)
9698
sincedb_collection.set(watched_file.sincedb_key, sincedb_value)
9799
end
98100
end

lib/filewatch/read_mode/handlers/read_file.rb

+19-6
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def handle_specifically(watched_file)
3131
end
3232

3333
def controlled_read(watched_file, loop_control)
34-
logger.trace("reading...", "iterations" => loop_control.count, "amount" => loop_control.size, "filename" => watched_file.filename)
34+
logger.trace? && logger.trace("reading...", :filename => watched_file.filename, :iterations => loop_control.count, :amount => loop_control.size)
3535
loop_control.count.times do
3636
break if quit?
3737
begin
@@ -43,22 +43,35 @@ def controlled_read(watched_file, loop_control)
4343
delta = line.bytesize + @settings.delimiter_byte_size
4444
sincedb_collection.increment(watched_file.sincedb_key, delta)
4545
end
46-
rescue EOFError
47-
logger.error("controlled_read: eof error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
46+
rescue EOFError => e
47+
log_error("controlled_read: eof error reading file", watched_file, e)
4848
loop_control.flag_read_error
4949
break
50-
rescue Errno::EWOULDBLOCK, Errno::EINTR
51-
logger.error("controlled_read: block or interrupt error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
50+
rescue Errno::EWOULDBLOCK, Errno::EINTR => e
51+
log_error("controlled_read: block or interrupt error reading file", watched_file, e)
5252
watched_file.listener.error
5353
loop_control.flag_read_error
5454
break
5555
rescue => e
56-
logger.error("controlled_read: general error reading file", "path" => watched_file.path, "error" => e.inspect, "backtrace" => e.backtrace.take(8))
56+
log_error("controlled_read: general error reading file", watched_file, e)
5757
watched_file.listener.error
5858
loop_control.flag_read_error
5959
break
6060
end
6161
end
6262
end
63+
64+
def log_error(msg, watched_file, error)
65+
details = { :path => watched_file.path,
66+
:exception => error.class,
67+
:message => error.message,
68+
:backtrace => error.backtrace }
69+
if logger.debug?
70+
details[:file] = watched_file
71+
else
72+
details[:backtrace] = details[:backtrace].take(8) if details[:backtrace]
73+
end
74+
logger.error(msg, details)
75+
end
6376
end
6477
end end end

lib/filewatch/read_mode/handlers/read_zip_file.rb

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
# encoding: utf-8
22
require 'java'
3-
java_import java.io.InputStream
4-
java_import java.io.InputStreamReader
5-
java_import java.io.FileInputStream
6-
java_import java.io.BufferedReader
7-
java_import java.util.zip.GZIPInputStream
8-
java_import java.util.zip.ZipException
93

104
module FileWatch module ReadMode module Handlers
5+
6+
java_import java.io.InputStream
7+
java_import java.io.InputStreamReader
8+
java_import java.io.FileInputStream
9+
java_import java.io.BufferedReader
10+
java_import java.util.zip.GZIPInputStream
11+
java_import java.util.zip.ZipException
12+
1113
class ReadZipFile < Base
1214
def handle_specifically(watched_file)
1315
add_or_update_sincedb_collection(watched_file) unless sincedb_collection.member?(watched_file.sincedb_key)

lib/filewatch/read_mode/processor.rb

+22-36
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
# encoding: utf-8
2-
require "logstash/util/loggable"
3-
2+
require 'filewatch/processor'
43
require_relative "handlers/base"
54
require_relative "handlers/read_file"
65
require_relative "handlers/read_zip_file"
@@ -9,20 +8,7 @@ module FileWatch module ReadMode
98
# Must handle
109
# :read_file
1110
# :read_zip_file
12-
class Processor
13-
include LogStash::Util::Loggable
14-
15-
attr_reader :watch, :deletable_filepaths
16-
17-
def initialize(settings)
18-
@settings = settings
19-
@deletable_filepaths = []
20-
end
21-
22-
def add_watch(watch)
23-
@watch = watch
24-
self
25-
end
11+
class Processor < FileWatch::Processor
2612

2713
def initialize_handlers(sincedb_collection, observer)
2814
# we deviate from the tail mode handler initialization here
@@ -48,24 +34,23 @@ def process_all_states(watched_files)
4834
private
4935

5036
def process_watched(watched_files)
51-
logger.trace("Watched processing")
37+
logger.trace(__method__.to_s)
5238
# Handles watched_files in the watched state.
5339
# for a slice of them:
5440
# move to the active state
5541
# should never have been active before
5642
# how much of the max active window is available
57-
to_take = @settings.max_active - watched_files.count{|wf| wf.active?}
43+
to_take = @settings.max_active - watched_files.count { |wf| wf.active? }
5844
if to_take > 0
59-
watched_files.select {|wf| wf.watched?}.take(to_take).each do |watched_file|
60-
path = watched_file.path
45+
watched_files.select(&:watched?).take(to_take).each do |watched_file|
6146
begin
62-
watched_file.restat
47+
restat(watched_file)
6348
watched_file.activate
6449
rescue Errno::ENOENT
65-
common_deleted_reaction(watched_file, "Watched")
50+
common_deleted_reaction(watched_file, __method__)
6651
next
6752
rescue => e
68-
common_error_reaction(path, e, "Watched")
53+
common_error_reaction(watched_file, e, __method__)
6954
next
7055
end
7156
break if watch.quit?
@@ -74,7 +59,7 @@ def process_watched(watched_files)
7459
now = Time.now.to_i
7560
if (now - watch.lastwarn_max_files) > MAX_FILES_WARN_INTERVAL
7661
waiting = watched_files.size - @settings.max_active
77-
logger.warn(@settings.max_warn_msg + ", files yet to open: #{waiting}")
62+
logger.warn("#{@settings.max_warn_msg}, files yet to open: #{waiting}")
7863
watch.lastwarn_max_files = now
7964
end
8065
end
@@ -83,17 +68,18 @@ def process_watched(watched_files)
8368
## TODO add process_rotation_in_progress
8469

8570
def process_active(watched_files)
86-
logger.trace("Active processing")
71+
logger.trace(__method__.to_s)
8772
# Handles watched_files in the active state.
88-
watched_files.select {|wf| wf.active? }.each do |watched_file|
89-
path = watched_file.path
73+
watched_files.each do |watched_file|
74+
next unless watched_file.active?
75+
9076
begin
91-
watched_file.restat
77+
restat(watched_file)
9278
rescue Errno::ENOENT
93-
common_deleted_reaction(watched_file, "Active")
79+
common_deleted_reaction(watched_file, __method__)
9480
next
9581
rescue => e
96-
common_error_reaction(path, e, "Active")
82+
common_error_reaction(watched_file, e, __method__)
9783
next
9884
end
9985
break if watch.quit?
@@ -114,19 +100,19 @@ def process_active(watched_files)
114100
def common_detach_when_allread(watched_file)
115101
watched_file.unwatch
116102
watched_file.listener.reading_completed
117-
deletable_filepaths << watched_file.path
118-
logger.trace("Whole file read: #{watched_file.path}, removing from collection")
103+
add_deletable_path watched_file.path
104+
logger.trace? && logger.trace("whole file read, removing from collection", :path => watched_file.path)
119105
end
120106

121107
def common_deleted_reaction(watched_file, action)
122108
# file has gone away or we can't read it anymore.
123109
watched_file.unwatch
124-
deletable_filepaths << watched_file.path
125-
logger.trace("#{action} - stat failed: #{watched_file.path}, removing from collection")
110+
add_deletable_path watched_file.path
111+
logger.trace? && logger.trace("#{action} - stat failed, removing from collection", :path => watched_file.path)
126112
end
127113

128-
def common_error_reaction(path, error, action)
129-
logger.error("#{action} - other error #{path}: (#{error.message}, #{error.backtrace.take(8).inspect})")
114+
def common_error_reaction(watched_file, error, action)
115+
logger.error("#{action} - other error", error_details(error, watched_file))
130116
end
131117
end
132118
end end

0 commit comments

Comments
 (0)