Skip to content

Commit 95bf768

Browse files
nigoelelyscape
authored andcommitted
Support for dynamic field values in prefix
1 parent 8f646a3 commit 95bf768

File tree

1 file changed

+115
-43
lines changed
  • lib/logstash/outputs

1 file changed

+115
-43
lines changed

lib/logstash/outputs/s3.rb

Lines changed: 115 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
require "thread"
99
require "tmpdir"
1010
require "fileutils"
11+
require 'pathname'
1112

1213

1314
# INFORMATION:
@@ -60,6 +61,7 @@
6061
# time_file => 5 (optional)
6162
# format => "plain" (optional)
6263
# canned_acl => "private" (optional. Options are "private", "public_read", "public_read_write", "authenticated_read". Defaults to "private" )
64+
# no_event_wait => 5 (optional. Defines the number of time_file s3 upload events that may go with no eventns for the prefix, before cleaning up the watch on that)
6365
# }
6466
#
6567
class LogStash::Outputs::S3 < LogStash::Outputs::Base
@@ -110,6 +112,9 @@ class LogStash::Outputs::S3 < LogStash::Outputs::Base
110112
# Specify how many workers to use to upload the files to S3
111113
config :upload_workers_count, :validate => :number, :default => 1
112114

115+
# Specify after how many interval of time_file, a prefix directory should be cleaned up locally if no events happing for it
116+
config :no_event_wait, :validate => :number, :default => 5
117+
113118
# Define tags to be appended to the file on the S3 bucket.
114119
#
115120
# Example:
@@ -149,8 +154,13 @@ def aws_service_endpoint(region)
149154
def write_on_bucket(file)
150155
# find and use the bucket
151156
bucket = @s3.buckets[@bucket]
157+
158+
first = Pathname.new @temporary_directory
159+
second = Pathname.new file
152160

153-
remote_filename = "#{@prefix}#{File.basename(file)}"
161+
remote_filename_path = second.relative_path_from first
162+
163+
remote_filename = remote_filename_path.to_s
154164

155165
@logger.debug("S3: ready to write file in bucket", :remote_filename => remote_filename, :bucket => @bucket)
156166

@@ -170,17 +180,21 @@ def write_on_bucket(file)
170180

171181
# This method is used for create new empty temporary files for use. Flag is needed for indicate new subsection time_file.
172182
public
173-
def create_temporary_file
174-
filename = File.join(@temporary_directory, get_temporary_filename(@page_counter))
175-
176-
@logger.debug("S3: Creating a new temporary file", :filename => filename)
177-
178-
@file_rotation_lock.synchronize do
179-
unless @tempfile.nil?
180-
@tempfile.close
183+
def create_temporary_file(prefix)
184+
filename = File.join(@temporary_directory, prefix, get_temporary_filename(@page_counter[prefix]))
185+
@file_rotation_lock[prefix].synchronize do
186+
unless @tempfile[prefix].nil?
187+
@tempfile[prefix].close
188+
end
189+
190+
if @prefixes.include? prefix
191+
dirname = File.dirname(filename)
192+
unless File.directory?(dirname)
193+
FileUtils.mkdir_p(dirname)
194+
end
195+
@logger.debug("S3: Creating a new temporary file", :filename => filename)
196+
@tempfile[prefix] = File.open(filename, "a")
181197
end
182-
183-
@tempfile = File.open(filename, "a")
184198
end
185199
end
186200

@@ -195,7 +209,11 @@ def register
195209

196210
@s3 = aws_s3_config
197211
@upload_queue = Queue.new
198-
@file_rotation_lock = Mutex.new
212+
@file_rotation_lock = Hash.new
213+
@tempfile = Hash.new
214+
@page_counter = Hash.new
215+
@prefixes = Set.new
216+
@empty_uploads = Hash.new
199217

200218
if @prefix && @prefix =~ S3_INVALID_CHARACTERS
201219
@logger.error("S3: prefix contains invalid characters", :prefix => @prefix, :contains => S3_INVALID_CHARACTERS)
@@ -207,15 +225,14 @@ def register
207225
end
208226

209227
test_s3_write
210-
211228
restore_from_crashes if @restore == true
212-
reset_page_counter
213-
create_temporary_file
229+
#reset_page_counter
230+
#create_temporary_file
214231
configure_periodic_rotation if time_file != 0
215232
configure_upload_workers
216233

217234
@codec.on_event do |event, encoded_event|
218-
handle_event(encoded_event)
235+
handle_event(encoded_event, event)
219236
end
220237
end
221238

@@ -252,13 +269,36 @@ def restore_from_crashes
252269
end
253270
end
254271

272+
public
273+
def shouldcleanup(prefix)
274+
return @empty_uploads[prefix] > @no_event_wait
275+
end
276+
255277
public
256278
def move_file_to_bucket(file)
279+
280+
@logger.debug("S3: moving to bucket ", :file => file)
281+
282+
basepath = Pathname.new @temporary_directory
283+
dirname = Pathname.new File.dirname(file)
284+
prefixpath = dirname.relative_path_from basepath
285+
prefix = prefixpath.to_s
286+
@logger.debug("S3: moving the file for prefix", :prefix => prefix)
287+
257288
if !File.zero?(file)
289+
if @prefixes.include? prefix
290+
@empty_uploads[prefix] = 0
291+
end
258292
write_on_bucket(file)
259293
@logger.debug("S3: file was put on the upload thread", :filename => File.basename(file), :bucket => @bucket)
294+
else
295+
if @prefixes.include? prefix
296+
@empty_uploads[prefix] += 1
297+
end
260298
end
261299

300+
@logger.debug("S3: empty_uploads for the prefix ", :prefix => prefix, :empty_uploads => @empty_uploads[prefix])
301+
262302
begin
263303
File.delete(file)
264304
rescue Errno::ENOENT
@@ -267,6 +307,10 @@ def move_file_to_bucket(file)
267307
rescue Errno::EACCES
268308
@logger.error("S3: Logstash doesnt have the permission to delete the file in the temporary directory.", :filename => File.basename(file), :temporary_directory => @temporary_directory)
269309
end
310+
311+
if shouldcleanup(prefix)
312+
cleanprefix(prefix)
313+
end
270314
end
271315

272316
public
@@ -293,9 +337,10 @@ def receive(event)
293337
end
294338

295339
public
296-
def rotate_events_log?
297-
@file_rotation_lock.synchronize do
298-
@tempfile.size > @size_file
340+
341+
def rotate_events_log(prefix)
342+
@file_rotation_lock[prefix].synchronize do
343+
@tempfile[prefix].size > @size_file
299344
end
300345
end
301346

@@ -305,12 +350,13 @@ def write_events_to_multiple_files?
305350
end
306351

307352
public
308-
def write_to_tempfile(event)
353+
def write_to_tempfile(event, prefix)
354+
309355
begin
310-
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile))
356+
@logger.debug("S3: put event into tempfile ", :tempfile => File.basename(@tempfile[prefix]))
311357

312-
@file_rotation_lock.synchronize do
313-
@tempfile.syswrite(event)
358+
@file_rotation_lock[prefix].synchronize do
359+
@tempfile[prefix].syswrite(event)
314360
end
315361
rescue Errno::ENOSPC
316362
@logger.error("S3: No space left in temporary directory", :temporary_directory => @temporary_directory)
@@ -322,9 +368,11 @@ def write_to_tempfile(event)
322368
def close
323369
shutdown_upload_workers
324370
@periodic_rotation_thread.stop! if @periodic_rotation_thread
325-
326-
@file_rotation_lock.synchronize do
327-
@tempfile.close unless @tempfile.nil? && @tempfile.closed?
371+
372+
for prefix in @prefixes
373+
@file_rotation_lock[prefix].synchronize do
374+
@tempfile[prefix].close unless @tempfile[prefix].nil? && @tempfile[prefix].closed?
375+
end
328376
end
329377
end
330378

@@ -335,20 +383,29 @@ def shutdown_upload_workers
335383
end
336384

337385
private
338-
def handle_event(encoded_event)
386+
def handle_event(encoded_event, event)
387+
actualprefix = event.sprintf(@prefix)
388+
if not @prefixes.to_a().include? actualprefix
389+
@file_rotation_lock[actualprefix] = Mutex.new
390+
@prefixes.add(actualprefix)
391+
reset_page_counter(actualprefix)
392+
create_temporary_file(actualprefix)
393+
@empty_uploads[actualprefix] = 0
394+
end
395+
339396
if write_events_to_multiple_files?
340-
if rotate_events_log?
341-
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile))
397+
if rotate_events_log(actualprefix)
398+
@logger.debug("S3: tempfile is too large, let's bucket it and create new file", :tempfile => File.basename(@tempfile[actualprefix]))
342399

343-
move_file_to_bucket_async(@tempfile.path)
344-
next_page
345-
create_temporary_file
400+
move_file_to_bucket_async(@tempfile[actualprefix].path)
401+
next_page(actualprefix)
402+
create_temporary_file(actualprefix)
346403
else
347-
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile.size, :size_file => @size_file)
404+
@logger.debug("S3: tempfile file size report.", :tempfile_size => @tempfile[actualprefix].size, :size_file => @size_file)
348405
end
349406
end
350407

351-
write_to_tempfile(encoded_event)
408+
write_to_tempfile(encoded_event, actualprefix)
352409
end
353410

354411
private
@@ -357,15 +414,30 @@ def configure_periodic_rotation
357414
LogStash::Util::set_thread_name("<S3 periodic uploader")
358415

359416
Stud.interval(periodic_interval, :sleep_then_run => true) do
360-
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile.path)
361-
362-
move_file_to_bucket_async(@tempfile.path)
363-
next_page
364-
create_temporary_file
417+
@tempfile.keys.each do |key|
418+
@logger.debug("S3: time_file triggered, bucketing the file", :filename => @tempfile[key].path)
419+
move_file_to_bucket_async(@tempfile[key].path)
420+
next_page(key)
421+
create_temporary_file(key)
422+
end
365423
end
366424
end
367425
end
368426

427+
private
428+
def cleanprefix(prefix)
429+
path = File.join(@temporary_directory, prefix)
430+
@logger.debug("cleaning the directory and prefix ", :dir => path, :prefix => prefix)
431+
@file_rotation_lock[prefix].synchronize do
432+
@tempfile[prefix].close
433+
Dir.foreach(path) {|f| fn = File.join(path, f); File.delete(fn) if f != '.' && f != '..'}
434+
FileUtils.remove_dir(path)
435+
@prefixes.delete(prefix)
436+
@tempfile.delete(prefix)
437+
@empty_uploads[prefix] = 0
438+
end
439+
end
440+
369441
private
370442
def configure_upload_workers
371443
@logger.debug("S3: Configure upload workers")
@@ -398,13 +470,13 @@ def upload_worker
398470
end
399471

400472
private
401-
def next_page
402-
@page_counter += 1
473+
def next_page(key)
474+
@page_counter[key] += 1
403475
end
404476

405477
private
406-
def reset_page_counter
407-
@page_counter = 0
478+
def reset_page_counter(key)
479+
@page_counter[key] = 0
408480
end
409481

410482
private

0 commit comments

Comments
 (0)