Skip to content

Commit 70aadf2

Browse files
authored
Change read mode to immediately stop consuming lines when shutting down (#322)
Changed read mode to immediately stop consuming buffered lines when quit is requested
1 parent b2dd262 commit 70aadf2

File tree

4 files changed

+24
-28
lines changed

4 files changed

+24
-28
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.4.6
2+
- Change read mode to immediately stop consuming buffered lines when shutdown is requested [#322](https://github.com/logstash-plugins/logstash-input-file/pull/322)
3+
14
## 4.4.5
25
- Handle EOF when checking archive validity [#321](https://github.com/logstash-plugins/logstash-input-file/pull/321)
36

lib/filewatch/read_mode/handlers/read_file.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ def controlled_read(watched_file, loop_control)
5454
# sincedb position is independent from the watched_file bytes_read
5555
delta = line.bytesize + @settings.delimiter_byte_size
5656
sincedb_collection.increment(watched_file.sincedb_key, delta)
57+
break if quit?
5758
end
5859
rescue EOFError => e
5960
log_error("controlled_read: eof error reading file", watched_file, e)

logstash-input-file.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-input-file'
4-
s.version = '4.4.5'
4+
s.version = '4.4.6'
55
s.licenses = ['Apache-2.0']
66
s.summary = "Streams events from files"
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/file_read_spec.rb

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -181,25 +181,27 @@
181181
end
182182

183183
context "for a compressed file" do
184+
let(:tmp_directory) { Stud::Temporary.directory }
185+
let(:all_files_path) { fixture_dir.join("compressed.*.*") }
186+
let(:gz_file_path) { fixture_dir.join('compressed.log.gz') }
187+
let(:gzip_file_path) { fixture_dir.join('compressed.log.gzip') }
188+
let(:sincedb_path) { ::File.join(tmp_directory, "sincedb.db") }
189+
let(:log_completed_path) { ::File.join(tmp_directory, "completed.log") }
190+
184191
it "the file is read" do
185-
file_path = fixture_dir.join('compressed.log.gz')
186-
file_path2 = fixture_dir.join('compressed.log.gzip')
187-
FileInput.make_fixture_current(file_path.to_path)
188-
FileInput.make_fixture_current(file_path2.to_path)
189-
tmpfile_path = fixture_dir.join("compressed.*.*")
190-
directory = Stud::Temporary.directory
191-
sincedb_path = ::File.join(directory, "readmode_C_sincedb.txt")
192-
log_completed_path = ::File.join(directory, "C_completed.txt")
192+
FileInput.make_fixture_current(gz_file_path.to_path)
193+
FileInput.make_fixture_current(gzip_file_path.to_path)
193194

194195
conf = <<-CONFIG
195196
input {
196197
file {
197198
type => "blah"
198-
path => "#{tmpfile_path}"
199+
path => "#{all_files_path}"
199200
sincedb_path => "#{sincedb_path}"
200201
mode => "read"
201202
file_completed_action => "log"
202203
file_completed_log_path => "#{log_completed_path}"
204+
exit_after_read => true
203205
}
204206
}
205207
CONFIG
@@ -216,17 +218,11 @@
216218
end
217219

218220
it "the corrupted file is untouched" do
219-
directory = Stud::Temporary.directory
220-
file_path = fixture_dir.join('compressed.log.gz')
221-
corrupted_file_path = ::File.join(directory, 'corrupted.gz')
222-
FileUtils.cp(file_path, corrupted_file_path)
221+
corrupted_file_path = ::File.join(tmp_directory, 'corrupted.gz')
222+
FileUtils.cp(gz_file_path, corrupted_file_path)
223223

224224
FileInput.corrupt_gzip(corrupted_file_path)
225225

226-
log_completed_path = ::File.join(directory, "C_completed.txt")
227-
f = File.new(log_completed_path, "w")
228-
f.close()
229-
230226
conf = <<-CONFIG
231227
input {
232228
file {
@@ -236,28 +232,23 @@
236232
file_completed_action => "log_and_delete"
237233
file_completed_log_path => "#{log_completed_path}"
238234
check_archive_validity => true
235+
exit_after_read => true
239236
}
240237
}
241238
CONFIG
242239

243-
events = input(conf) do |pipeline, queue|
240+
input(conf) do |pipeline, queue|
244241
wait(1)
245242
expect(IO.read(log_completed_path)).to be_empty
246243
end
247244
end
248245

249246
it "the truncated file is untouched" do
250-
directory = Stud::Temporary.directory
251-
file_path = fixture_dir.join('compressed.log.gz')
252-
truncated_file_path = ::File.join(directory, 'truncated.gz')
253-
FileUtils.cp(file_path, truncated_file_path)
247+
truncated_file_path = ::File.join(tmp_directory, 'truncated.gz')
248+
FileUtils.cp(gz_file_path, truncated_file_path)
254249

255250
FileInput.truncate_gzip(truncated_file_path)
256251

257-
log_completed_path = ::File.join(directory, "C_completed.txt")
258-
f = File.new(log_completed_path, "w")
259-
f.close()
260-
261252
conf = <<-CONFIG
262253
input {
263254
file {
@@ -267,11 +258,12 @@
267258
file_completed_action => "log_and_delete"
268259
file_completed_log_path => "#{log_completed_path}"
269260
check_archive_validity => true
261+
exit_after_read => true
270262
}
271263
}
272264
CONFIG
273265

274-
events = input(conf) do |pipeline, queue|
266+
input(conf) do |pipeline, queue|
275267
wait(1)
276268
expect(IO.read(log_completed_path)).to be_empty
277269
end

0 commit comments

Comments
 (0)