Skip to content

Commit f0e4ac6

Browse files
committed
stale race: use atomic methods for deletions, mark deleted nodes
Refactor of `S3::FileRepository` to avoid several closely-related race conditions: - prevent `get_factory()` from yielding a factory that was mid-deletion by the stale watcher, which could cause the plugin crash due to the file no longer existing on disk. This is solved by marking a factory's prefix wrapper as deleted while the stale watcher has exclusive access to it, and checking for deletion status before yielding exclusive access to a prefix wrapper's factory. - eliminates `get_factory()`'s non-atomic `Concurrent::Map#fetch_or_store`, which could cause multiple factories to be created for a single prefix, only one of which would be retained and bytes written to the other(s) would be lost. - introduce `each_factory`, which _avoids_ creating new factories or yielding deleted ones. - refactor `each_files` to use new `each_factory` to avoid yielding files whose factories have been deleted. Additionally, `S3#rotate_if_needed` was migrated to use the now-safer `S3::FileRepository#each_factory` that _avoids_ initializing new factories (and therefore avoids creating empty files on disk after the existing ones had been stale-reaped).
1 parent 3dc04f3 commit f0e4ac6

File tree

4 files changed

+90
-28
lines changed

4 files changed

+90
-28
lines changed

CHANGELOG.md

+6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
## 4.4.1
2+
- Fixes several closely-related race conditions that could cause plugin crashes or data-loss
3+
- race condition in initializing a prefix could cause one or more local temp files to be abandoned and only recovered after next pipeline start
4+
- race condition in stale watcher could cause the plugin to crash when working with a stale (empty) file that had been deleted
5+
- race condition in stale watcher could cause a non-empty file to be deleted if bytes were written to it after it was detected as stale
6+
17
## 4.4.0
28
- Logstash recovers corrupted gzip and uploads to S3 [#249](https://github.com/logstash-plugins/logstash-output-s3/pull/249)
39

lib/logstash/outputs/s3.rb

+16-16
Original file line numberDiff line numberDiff line change
@@ -344,22 +344,22 @@ def bucket_resource
344344
end
345345

346346
def rotate_if_needed(prefixes)
347-
prefixes.each do |prefix|
348-
# Each file access is thread safe,
349-
# until the rotation is done then only
350-
# one thread has access to the resource.
351-
@file_repository.get_factory(prefix) do |factory|
352-
temp_file = factory.current
353-
354-
if @rotation.rotate?(temp_file)
355-
@logger.debug? && @logger.debug("Rotate file",
356-
:key => temp_file.key,
357-
:path => temp_file.path,
358-
:strategy => @rotation.class.name)
359-
360-
upload_file(temp_file)
361-
factory.rotate!
362-
end
347+
# Each file access is thread safe,
348+
# until the rotation is done then only
349+
# one thread has access to the resource.
350+
@file_repository.each_factory(prefixes) do |factory|
351+
# we have exclusive access to the one-and-only
352+
# prefix WRAPPER for this factory.
353+
temp_file = factory.current
354+
355+
if @rotation.rotate?(temp_file)
356+
@logger.debug? && @logger.debug("Rotate file",
357+
:key => temp_file.key,
358+
:path => temp_file.path,
359+
:strategy => @rotation.class.name)
360+
361+
upload_file(temp_file) # may be async or blocking
362+
factory.rotate!
363363
end
364364
end
365365
end

lib/logstash/outputs/s3/file_repository.rb

+67-11
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@ class FileRepository
1515
class PrefixedValue
1616
def initialize(file_factory, stale_time)
1717
@file_factory = file_factory
18-
@lock = Mutex.new
18+
@lock = Monitor.new
1919
@stale_time = stale_time
20+
@is_deleted = false
2021
end
2122

2223
def with_lock
@@ -34,7 +35,14 @@ def apply(prefix)
3435
end
3536

3637
def delete!
37-
with_lock{ |factory| factory.current.delete! }
38+
with_lock do |factory|
39+
factory.current.delete!
40+
@is_deleted = true
41+
end
42+
end
43+
44+
def deleted?
45+
with_lock { |_| @is_deleted }
3846
end
3947
end
4048

@@ -72,17 +80,52 @@ def keys
7280
end
7381

7482
def each_files
75-
@prefixed_factories.values.each do |prefixed_file|
76-
prefixed_file.with_lock { |factory| yield factory.current }
83+
each_factory(keys) do |factory|
84+
yield factory.current
7785
end
7886
end
7987

80-
# Return the file factory
88+
##
89+
# Yields the file factory while the current thread has exclusive access to it, creating a new
90+
# one if one does not exist or if the current one is being reaped by the stale watcher.
91+
# @param prefix_key [String]: the prefix key
92+
# @yieldparam factory [TemporaryFileFactory]: a temporary file factory that this thread has exclusive access to
93+
# @yieldreturn [Object]: a value to return; should NOT be the factory, which should be contained by the exclusive access scope.
94+
# @return [Object]: the value returned by the provided block
8195
def get_factory(prefix_key)
82-
prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) }
96+
97+
# fast-path: if factory exists and is not deleted, yield it with exclusive access and return
98+
prefix_val = @prefixed_factories.get(prefix_key)
99+
prefix_val&.with_lock do |factory|
100+
# intentional local-jump to ensure deletion detection
101+
# is done inside the exclusive access.
102+
return yield(factory) unless prefix_val.deleted?
103+
end
104+
105+
# slow-path:
106+
# the Concurrent::Map#get operation is lock-free, but may have returned an entry that was being deleted by
107+
# another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had
108+
# been marked deleted, use the atomic Concurrent::Map#compute to retrieve a non-deleted entry.
109+
prefix_val = @prefixed_factories.compute(prefix_key) do |existing|
110+
existing && !existing.deleted? ? existing : @factory_initializer.create_value(prefix_key)
111+
end
83112
prefix_val.with_lock { |factory| yield factory }
84113
end
85114

115+
##
116+
# Yields each non-deleted file factory while the current thread has exclusive access to it.
117+
# @param prefixes [Array<String>]: the prefix keys
118+
# @yieldparam factory [TemporaryFileFactory]
119+
# @return [void]
120+
def each_factory(prefixes)
121+
prefixes.each do |prefix_key|
122+
prefix_val = @prefixed_factories.get(prefix_key)
123+
prefix_val&.with_lock do |factory|
124+
yield factory unless prefix_val.deleted?
125+
end
126+
end
127+
end
128+
86129
def get_file(prefix_key)
87130
get_factory(prefix_key) { |factory| yield factory.current }
88131
end
@@ -95,18 +138,31 @@ def size
95138
@prefixed_factories.size
96139
end
97140

98-
def remove_stale(k, v)
99-
if v.stale?
100-
@prefixed_factories.delete_pair(k, v)
101-
v.delete!
141+
def remove_if_stale(prefix_key)
142+
# we use the ATOMIC `Concurrent::Map#compute_if_present` to atomically
143+
# detect the staleness, mark a stale prefixed factory as deleted, and delete from the map.
144+
@prefixed_factories.compute_if_present(prefix_key) do |prefixed_factory|
145+
# once we have retrieved an instance, we acquire exclusive access to it
146+
# for stale detection, marking it as deleted before releasing the lock
147+
# and causing it to become deleted from the map.
148+
prefixed_factory.with_lock do |_|
149+
if prefixed_factory.stale?
150+
prefixed_factory.delete! # mark deleted to prevent reuse
151+
nil # cause deletion
152+
else
153+
prefixed_factory # keep existing
154+
end
155+
end
102156
end
103157
end
104158

105159
def start_stale_sweeper
106160
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
107161
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
108162

109-
@prefixed_factories.each { |k, v| remove_stale(k,v) }
163+
@prefixed_factories.keys.each do |prefix|
164+
remove_if_stale(prefix)
165+
end
110166
end
111167

112168
@stale_sweeper.execute

logstash-output-s3.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-s3'
3-
s.version = '4.4.0'
3+
s.version = '4.4.1'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Sends Logstash events to the Amazon Simple Storage Service"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)