From ba9dc2d83fd20b5d429b2c2d41109d89753de03c Mon Sep 17 00:00:00 2001 From: William Orr Date: Sat, 24 Oct 2020 09:10:21 +0200 Subject: [PATCH] Update index safely between threads. Fixes #160 and #326. As mentioned in a warning, as well as https://github.com/fluent/fluent-plugin-s3/issues/326 and https://github.com/fluent/fluent-plugin-s3/issues/160, the process of determining the index added to the default object key is not thread-safe. This adds some thread-safety until version 2.x is out where chunk_id is used instead of an index value. This is not a perfect implementation, since there can still be races between different workers if workers are enabled in fluentd, or if there are multiple fluentd instances uploading to the same bucket. This commit is just to resolve this problem short-term in a way that's backwards compatible. --- lib/fluent/plugin/out_s3.rb | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/lib/fluent/plugin/out_s3.rb b/lib/fluent/plugin/out_s3.rb index 5fb653db..c6d67ff0 100644 --- a/lib/fluent/plugin/out_s3.rb +++ b/lib/fluent/plugin/out_s3.rb @@ -2,6 +2,7 @@ require 'fluent/log-ext' require 'fluent/timezone' require 'aws-sdk-s3' +require 'concurrent' require 'zlib' require 'time' require 'tempfile' @@ -223,7 +224,7 @@ def configure(conf) # For backward compatibility # TODO: Remove time_slice_format when end of support compat_parameters @configured_time_slice_format = conf['time_slice_format'] - @values_for_s3_object_chunk = {} + @values_for_s3_object_chunk = Concurrent::Hash.new @time_slice_with_tz = Fluent::Timezone.formatter(@timekey_zone, @configured_time_slice_format || timekey_to_timeformat(@buffer_config['timekey'])) end @@ -251,6 +252,9 @@ def start @s3 = Aws::S3::Resource.new(client: s3_client) @bucket = @s3.bucket(@s3_bucket) + @index = Concurrent::AtomicFixnum.new(-1) + @time_slice = Concurrent::AtomicReference.new + check_apikeys if @check_apikey_on_start ensure_bucket if @check_bucket ensure_bucket_lifecycle @@ -273,8 +277,18 @@ def write(chunk) @time_slice_with_tz.call(metadata.timekey) end + # If we set a new time slice, then reset our index. + # There is a small race here, where a new time slice can have an old index set. + # This shouldn't be a problem if @check_object is enabled but could cause overwrites + # otherwise, when the old index is reached on the new timeslice + if @time_slice.get_and_set(time_slice) != time_slice + @index.value= -1 + end + if @check_object begin + i = @index.increment + @values_for_s3_object_chunk[chunk.unique_id] ||= { "%{hex_random}" => hex_random(chunk), } @@ -284,7 +298,7 @@ def write(chunk) } values_for_s3_object_key_post = { "%{time_slice}" => time_slice, - "%{index}" => sprintf(@index_format,i), + "%{index}" => sprintf(@index_format, i), }.merge!(@values_for_s3_object_chunk[chunk.unique_id]) values_for_s3_object_key_post["%{uuid_flush}".freeze] = uuid_random if @uuid_flush_enabled @@ -302,7 +316,6 @@ def write(chunk) end end - i += 1 previous_path = s3path end while @bucket.object(s3path).exists? else