Skip to content

Commit ec63ecc

Browse files
committed
Add integration tests for GZIP recovery cases.
1 parent d96531b commit ec63ecc

File tree

4 files changed

+87
-15
lines changed

4 files changed

+87
-15
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,6 @@ def restore_from_crash
398398
@crash_uploader = Uploader.new(bucket_resource, @logger, CRASH_RECOVERY_THREADPOOL)
399399

400400
temp_folder_path = Pathname.new(@temporary_directory)
401-
402401
files = Dir.glob(::File.join(@temporary_directory, "**/*"))
403402
.select { |file_path| ::File.file?(file_path) }
404403
under_recovery_files = get_under_recovery_files(files)
@@ -411,15 +410,16 @@ def restore_from_crash
411410
end
412411
else
413412
temp_file = TemporaryFile.create_from_existing_file(file_path, temp_folder_path)
414-
if temp_file.size > 0
415-
@logger.debug? && @logger.debug("Recovering from crash and uploading", :path => temp_file.path)
416-
@crash_uploader.upload_async(temp_file,
417-
:on_complete => method(:clean_temporary_file),
418-
:upload_options => upload_options)
419-
end
420-
# do not remove if Logstash tries to recover but fails
421-
if @encoding != GZIP_ENCODING && temp_file.size != 0
422-
clean_temporary_file(temp_file)
413+
# do not remove or upload if Logstash tries to recover file but fails
414+
if temp_file.recoverable?
415+
if temp_file.size > 0
416+
@logger.debug? && @logger.debug("Recovering from crash and uploading", :path => temp_file.path)
417+
@crash_uploader.upload_async(temp_file,
418+
:on_complete => method(:clean_temporary_file),
419+
:upload_options => upload_options)
420+
else
421+
clean_temporary_file(temp_file)
422+
end
423423
end
424424
end
425425
end

lib/logstash/outputs/s3/temporary_file.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ def empty?
6868
size == 0
6969
end
7070

71+
# only to cover the case where LS cannot restore corrupted file, file is not exist
72+
def recoverable?
73+
!@fd.nil?
74+
end
75+
7176
def self.create_from_existing_file(file_path, temporary_folder)
7277
key_parts = Pathname.new(file_path).relative_path_from(temporary_folder).to_s.split(::File::SEPARATOR)
7378

spec/integration/restore_from_crash_spec.rb

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,17 @@
77
describe "Restore from crash", :integration => true do
88
include_context "setup plugin"
99

10-
let(:options) { main_options.merge({ "restore" => true, "canned_acl" => "public-read-write" }) }
11-
1210
let(:number_of_files) { 5 }
1311
let(:dummy_content) { "foobar\n" * 100 }
14-
let(:factory) { LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory)}
1512

1613
before do
1714
clean_remote_files(prefix)
1815
end
1916

20-
2117
context 'with a non-empty tempfile' do
18+
let(:options) { main_options.merge({ "restore" => true, "canned_acl" => "public-read-write" }) }
19+
let(:factory) { LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory)}
20+
2221
before do
2322
# Creating a factory always create a file
2423
factory.current.write(dummy_content)
@@ -41,6 +40,9 @@
4140
end
4241

4342
context 'with an empty tempfile' do
43+
let(:options) { main_options.merge({ "restore" => true, "canned_acl" => "public-read-write" }) }
44+
let(:factory) { LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "none", temporary_directory)}
45+
4446
before do
4547
factory.current
4648
factory.rotate!
@@ -63,5 +65,68 @@
6365
expect(bucket_resource.objects(:prefix => prefix).count).to eq(0)
6466
end
6567
end
68+
69+
context "#gzip encoding" do
70+
let(:options) { main_options.merge({ "restore" => true, "canned_acl" => "public-read-write", "encoding" => "gzip" }) }
71+
let(:factory) { LogStash::Outputs::S3::TemporaryFileFactory.new(prefix, tags, "gzip", temporary_directory)}
72+
describe "with empty recovered file" do
73+
before do
74+
# Creating a factory always create a file
75+
factory.current.write('')
76+
factory.current.fsync
77+
factory.current.close
78+
end
79+
80+
it 'should not upload and not remove temp file' do
81+
subject.register
82+
try(20) do
83+
expect(bucket_resource.objects(:prefix => prefix).count).to eq(0)
84+
expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(1)
85+
end
86+
end
87+
end
88+
89+
describe "with healthy recovered, size is greater than zero file" do
90+
before do
91+
# Creating a factory always create a file
92+
factory.current.write(dummy_content)
93+
factory.current.fsync
94+
factory.current.close
95+
96+
(number_of_files - 1).times do
97+
factory.rotate!
98+
factory.current.write(dummy_content)
99+
factory.current.fsync
100+
factory.current.close
101+
end
102+
end
103+
104+
it 'should recover, upload to S3 and remove temp file' do
105+
subject.register
106+
try(20) do
107+
expect(bucket_resource.objects(:prefix => prefix).count).to eq(number_of_files)
108+
expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(0)
109+
expect(bucket_resource.objects(:prefix => prefix).first.acl.grants.collect(&:permission)).to include("READ", "WRITE")
110+
end
111+
end
112+
end
113+
114+
describe "with failure when recovering" do
115+
before do
116+
# Creating a factory always create a file
117+
factory.current.write(dummy_content)
118+
factory.current.fsync
119+
end
120+
121+
it 'should not upload to S3 and not remove temp file' do
122+
subject.register
123+
try(20) do
124+
expect(bucket_resource.objects(:prefix => prefix).count).to eq(0)
125+
expect(Dir.glob(File.join(temporary_directory, "*")).size).to eq(1)
126+
end
127+
end
128+
end
129+
end
130+
66131
end
67132

spec/supports/helpers.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
let(:bucket) { ENV["AWS_LOGSTASH_TEST_BUCKET"] }
66
let(:access_key_id) { ENV["AWS_ACCESS_KEY_ID"] }
77
let(:secret_access_key) { ENV["AWS_SECRET_ACCESS_KEY"] }
8+
let(:session_token) { ENV["AWS_SESSION_TOKEN"] }
89
let(:size_file) { 100 }
910
let(:time_file) { 100 }
1011
let(:tags) { [] }
@@ -18,14 +19,15 @@
1819
"temporary_directory" => temporary_directory,
1920
"access_key_id" => access_key_id,
2021
"secret_access_key" => secret_access_key,
22+
"session_token" => session_token,
2123
"size_file" => size_file,
2224
"time_file" => time_file,
2325
"region" => region,
2426
"tags" => []
2527
}
2628
end
2729

28-
let(:client_credentials) { Aws::Credentials.new(access_key_id, secret_access_key) }
30+
let(:client_credentials) { Aws::Credentials.new(access_key_id, secret_access_key, session_token) }
2931
let(:bucket_resource) { Aws::S3::Bucket.new(bucket, { :credentials => client_credentials, :region => region }) }
3032

3133
subject { LogStash::Outputs::S3.new(options) }

0 commit comments

Comments
 (0)