Skip to content

Commit 652ac09

Browse files
author
Nitin Goel
committed
Fix for issue : logstash-plugins#16
1 parent 865bd68 commit 652ac09

File tree

1 file changed

+116
-42
lines changed
  • lib/logstash/outputs

1 file changed

+116
-42
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 116 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
require "thread"
88
require "tmpdir"
99
require "fileutils"
10+
require 'pathname'
1011

1112

1213
# INFORMATION:
@@ -60,6 +61,7 @@
6061
# time_file => 5 (optional)
6162
# format => "plain" (optional)
6263
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
64+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
6365
# }
6466
#
6567
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
110112
# Specify how many workers to use to upload the files to S3
111113
config :upload_workers_count, :validate => :number, :default => 1
112114

115+
# Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it
116+
config :no_event_wait, :validate => :number, :default => 5
117+
113118
# Exposed attributes for testing purpose.
114119
attr_accessor :tempfile
115120
attr_reader :page_counter
@@ -139,8 +144,13 @@ def aws_service_endpoint(region)
139144
def write_on_bucket(file)
140145
# find and use the bucket
141146
bucket = @s3.buckets[@bucket]
147+
148+
first = Pathname.new @temporary_directory
149+
second = Pathname.new file
142150

143-
remote_filename = "#{@prefix}#{File.basename(file)}"
151+
remote_filename_path = second.relative_path_from first
152+
153+
remote_filename = remote_filename_path.to_s
144154

145155
@logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)
146156

@@ -160,17 +170,21 @@ def write_on_bucket(file)
160170

161171
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
162172
public
163-
def create_temporary_file
164-
filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))
165-
166-
@logger.debug("S3: Creating a new temporary file", :filename => filename)
167-
168-
@file_rotation_lock.synchronize do
169-
unless @tempfile.nil?
170-
@tempfile.close
173+
def create_temporary_file(prefix)
174+
filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix]))
175+
@file_rotation_lock[prefix].synchronize do
176+
unless @tempfile[prefix].nil?
177+
@tempfile[prefix].close
178+
end
179+
180+
if @prefixes.include? prefix
181+
dirname = File.dirname(filename)
182+
unless File.directory?(dirname)
183+
FileUtils.mkdir_p(dirname)
184+
end
185+
@logger.debug("S3: Creating a new temporary file", :filename => filename)
186+
@tempfile[prefix] = File.open(filename, "a")
171187
end
172-
173-
@tempfile = File.open(filename, "a")
174188
end
175189
end
176190

@@ -185,7 +199,11 @@ def register
185199

186200
@s3 = aws_s3_config
187201
@upload_queue = Queue.new
188-
@file_rotation_lock = Mutex.new
202+
@file_rotation_lock = Hash.new
203+
@tempfile = Hash.new
204+
@page_counter = Hash.new
205+
@prefixes = Set.new
206+
@empty_uploads = Hash.new
189207

190208
if @prefix && @prefix =~ S3_INVALID_CHARACTERS
191209
@logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
@@ -197,15 +215,14 @@ def register
197215
end
198216

199217
test_s3_write
200-
201218
restore_from_crashes if @restore == true
202-
reset_page_counter
203-
create_temporary_file
219+
#reset_page_counter
220+
#create_temporary_file
204221
configure_periodic_rotation if time_file != 0
205222
configure_upload_workers
206223

207224
@codec.on_event do |event, encoded_event|
208-
handle_event(encoded_event)
225+
handle_event(encoded_event, event)
209226
end
210227
end
211228

@@ -242,13 +259,36 @@ def restore_from_crashes
242259
end
243260
end
244261

262+
public
263+
def shouldcleanup(prefix)
264+
return @empty_uploads[prefix] > @no_event_wait
265+
end
266+
245267
public
246268
def move_file_to_bucket(file)
269+
270+
@logger.debug("S3: moving to bucket ", :file => file)
271+
272+
basepath = Pathname.new @temporary_directory
273+
dirname = Pathname.new File.dirname(file)
274+
prefixpath = dirname.relative_path_from basepath
275+
prefix = prefixpath.to_s
276+
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
277+
247278
if !File.zero?(file)
279+
if @prefixes.include? prefix
280+
@empty_uploads[prefix] = 0
281+
end
248282
write_on_bucket(file)
249283
@logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
284+
else
285+
if @prefixes.include? prefix
286+
@empty_uploads[prefix] += 1
287+
end
250288
end
251289

290+
@logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix])
291+
252292
begin
253293
File.delete(file)
254294
rescue Errno::ENOENT
@@ -257,6 +297,10 @@ def move_file_to_bucket(file)
257297
rescue Errno::EACCES
258298
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
259299
end
300+
301+
if shouldcleanup(prefix)
302+
cleanprefix(prefix)
303+
end
260304
end
261305

262306
public
@@ -283,8 +327,8 @@ def receive(event)
283327
end
284328

285329
public
286-
def rotate_events_log?
287-
@tempfile.size > @size_file
330+
def rotate_events_log(prefix)
331+
return @tempfile[prefix].size > @size_file
288332
end
289333

290334
public
@@ -293,12 +337,13 @@ def write_events_to_multiple_files?
293337
end
294338

295339
public
296-
def write_to_tempfile(event)
340+
def write_to_tempfile(event, prefix)
341+
297342
begin
298-
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
343+
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix]))
299344

300-
@file_rotation_lock.synchronize do
301-
@tempfile.syswrite(event)
345+
@file_rotation_lock[prefix].synchronize do
346+
@tempfile[prefix].syswrite(event)
302347
end
303348
rescue Errno::ENOSPC
304349
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
@@ -307,11 +352,14 @@ def write_to_tempfile(event)
307352
end
308353

309354
public
310-
def teardown
355+
def teardown()
311356
shutdown_upload_workers
312357
@periodic_rotation_thread.stop! if @periodic_rotation_thread
358+
359+
for prefix in @prefixes
360+
@tempfile[prefix].close
361+
end
313362

314-
@tempfile.close
315363
finished
316364
end
317365

@@ -322,37 +370,63 @@ def shutdown_upload_workers
322370
end
323371

324372
private
325-
def handle_event(encoded_event)
373+
def handle_event(encoded_event, event)
374+
actualprefix = event.sprintf(@prefix)
375+
if not @prefixes.to_a().include? actualprefix
376+
@file_rotation_lock[actualprefix] = Mutex.new
377+
@prefixes.add(actualprefix)
378+
reset_page_counter(actualprefix)
379+
create_temporary_file(actualprefix)
380+
@empty_uploads[actualprefix] = 0
381+
end
382+
326383
if write_events_to_multiple_files?
327-
if rotate_events_log?
328-
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
384+
if rotate_events_log(actualprefix)
385+
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix]))
329386

330-
move_file_to_bucket_async(@tempfile.path)
331-
next_page
332-
create_temporary_file
387+
move_file_to_bucket_async(@tempfile[actualprefix].path)
388+
next_page(actualprefix)
389+
create_temporary_file(actualprefix)
333390
else
334-
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
391+
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file)
335392
end
336393
end
337394

338-
write_to_tempfile(encoded_event)
395+
write_to_tempfile(encoded_event, actualprefix)
339396
end
340397

341398
private
342399
def configure_periodic_rotation
343400
@periodic_rotation_thread = Stud::Task.new do
344401
LogStash::Util::set_thread_name("<S3 periodic uploader")
345402

346-
Stud.interval(periodic_interval, :sleep_then_run => true) do
347-
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
348-
349-
move_file_to_bucket_async(@tempfile.path)
350-
next_page
351-
create_temporary_file
403+
Stud.interval(periodic_interval, :sleep_then_run => true) do
404+
405+
@tempfile.keys.each do |key|
406+
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path)
407+
move_file_to_bucket_async(@tempfile[key].path)
408+
next_page(key)
409+
create_temporary_file(key)
410+
end
411+
352412
end
353413
end
354414
end
355415

416+
private
417+
def cleanprefix(prefix)
418+
path = File.join(@temporary_directory, prefix)
419+
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
420+
@file_rotation_lock[prefix].synchronize do
421+
@tempfile[prefix].close
422+
Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'}
423+
FileUtils.remove_dir(path)
424+
@prefixes.delete(prefix)
425+
@tempfile.delete(prefix)
426+
@empty_uploads[prefix] = 0
427+
end
428+
end
429+
356430
private
357431
def configure_upload_workers
358432
@logger.debug("S3: Configure upload workers")
@@ -385,13 +459,13 @@ def upload_worker
385459
end
386460

387461
private
388-
def next_page
389-
@page_counter += 1
462+
def next_page(key)
463+
@page_counter[key] += 1
390464
end
391465

392466
private
393-
def reset_page_counter
394-
@page_counter = 0
467+
def reset_page_counter(key)
468+
@page_counter[key] = 0
395469
end
396470

397471
private

0 commit comments

Comments
 (0)