@@ -15,8 +15,9 @@ class FileRepository
15
15
class PrefixedValue
16
16
def initialize ( file_factory , stale_time )
17
17
@file_factory = file_factory
18
- @lock = Mutex . new
18
+ @lock = Monitor . new
19
19
@stale_time = stale_time
20
+ @is_deleted = false
20
21
end
21
22
22
23
def with_lock
@@ -34,7 +35,14 @@ def apply(prefix)
34
35
end
35
36
36
37
def delete!
37
- with_lock { |factory | factory . current . delete! }
38
+ with_lock do |factory |
39
+ factory . current . delete!
40
+ @is_deleted = true
41
+ end
42
+ end
43
+
44
+ def deleted?
45
+ with_lock { |_ | @is_deleted }
38
46
end
39
47
end
40
48
@@ -72,17 +80,52 @@ def keys
72
80
end
73
81
74
82
def each_files
75
- @prefixed_factories . values . each do |prefixed_file |
76
- prefixed_file . with_lock { | factory | yield factory . current }
83
+ each_factory ( keys ) do |factory |
84
+ yield factory . current
77
85
end
78
86
end
79
87
80
- # Return the file factory
88
+ ##
89
+ # Yields the file factory while the current thread has exclusive access to it, creating a new
90
+ # one if one does not exist or if the current one is being reaped by the stale watcher.
91
+ # @param prefix_key [String]: the prefix key
92
+ # @yieldparam factory [TemporaryFileFactory]: a temporary file factory that this thread has exclusive access to
93
+ # @yieldreturn [Object]: a value to return; should NOT be the factory, which should be contained by the exclusive access scope.
94
+ # @return [Object]: the value returned by the provided block
81
95
def get_factory ( prefix_key )
82
- prefix_val = @prefixed_factories . fetch_or_store ( prefix_key ) { @factory_initializer . create_value ( prefix_key ) }
96
+
97
+ # fast-path: if factory exists and is not deleted, yield it with exclusive access and return
98
+ prefix_val = @prefixed_factories . get ( prefix_key )
99
+ prefix_val &.with_lock do |factory |
100
+ # intentional local-jump to ensure deletion detection
101
+ # is done inside the exclusive access.
102
+ return yield ( factory ) unless prefix_val . deleted?
103
+ end
104
+
105
+ # slow-path:
106
+ # the Concurrent::Map#get operation is lock-free, but may have returned an entry that was being deleted by
107
+ # another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had
108
+ # been marked deleted, use the atomic Concurrent::Map#compute to retrieve a non-deleted entry.
109
+ prefix_val = @prefixed_factories . compute ( prefix_key ) do |existing |
110
+ existing && !existing . deleted? ? existing : @factory_initializer . create_value ( prefix_key )
111
+ end
83
112
prefix_val . with_lock { |factory | yield factory }
84
113
end
85
114
115
+ ##
116
+ # Yields each non-deleted file factory while the current thread has exclusive access to it.
117
+ # @param prefixes [Array<String>]: the prefix keys
118
+ # @yieldparam factory [TemporaryFileFactory]
119
+ # @return [void]
120
+ def each_factory ( prefixes )
121
+ prefixes . each do |prefix_key |
122
+ prefix_val = @prefixed_factories . get ( prefix_key )
123
+ prefix_val &.with_lock do |factory |
124
+ yield factory unless prefix_val . deleted?
125
+ end
126
+ end
127
+ end
128
+
86
129
def get_file ( prefix_key )
87
130
get_factory ( prefix_key ) { |factory | yield factory . current }
88
131
end
@@ -95,18 +138,31 @@ def size
95
138
@prefixed_factories . size
96
139
end
97
140
98
- def remove_stale ( k , v )
99
- if v . stale?
100
- @prefixed_factories . delete_pair ( k , v )
101
- v . delete!
141
+ def remove_if_stale ( prefix_key )
142
+ # we use the ATOMIC `Concurrent::Map#compute_if_present` to atomically
143
+ # detect the staleness, mark a stale prefixed factory as deleted, and delete from the map.
144
+ @prefixed_factories . compute_if_present ( prefix_key ) do |prefixed_factory |
145
+ # once we have retrieved an instance, we acquire exclusive access to it
146
+ # for stale detection, marking it as deleted before releasing the lock
147
+ # and causing it to become deleted from the map.
148
+ prefixed_factory . with_lock do |_ |
149
+ if prefixed_factory . stale?
150
+ prefixed_factory . delete! # mark deleted to prevent reuse
151
+ nil # cause deletion
152
+ else
153
+ prefixed_factory # keep existing
154
+ end
155
+ end
102
156
end
103
157
end
104
158
105
159
def start_stale_sweeper
106
160
@stale_sweeper = Concurrent ::TimerTask . new ( :execution_interval => @sweeper_interval ) do
107
161
LogStash ::Util . set_thread_name ( "S3, Stale factory sweeper" )
108
162
109
- @prefixed_factories . each { |k , v | remove_stale ( k , v ) }
163
+ @prefixed_factories . keys . each do |prefix |
164
+ remove_if_stale ( prefix )
165
+ end
110
166
end
111
167
112
168
@stale_sweeper . execute
0 commit comments