Skip to content

Commit 7efad6d

Browse files
authored
Refactor: avoid usage of CHM (JRuby 9.3.4 work-around) (#248)
1 parent ba27986 commit 7efad6d

File tree

4 files changed

+21
-11
lines changed

4 files changed

+21
-11
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 4.3.7
2+
- Refactor: avoid usage of CHM (JRuby 9.3.4 work-around) [#248](https://github.com/logstash-plugins/logstash-output-s3/pull/248)
3+
14
## 4.3.6
25
- Docs: more documentation on restore + temp dir [#236](https://github.com/logstash-plugins/logstash-output-s3/pull/236)
36
* minor logging improvements - use the same path: naming convention

lib/logstash/outputs/s3/file_repository.rb

+11-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# encoding: utf-8
22
require "java"
3-
require "concurrent"
3+
require "concurrent/map"
44
require "concurrent/timer_task"
55
require "logstash/util"
66

@@ -39,25 +39,26 @@ def delete!
3939
end
4040

4141
class FactoryInitializer
42-
include java.util.function.Function
42+
4343
def initialize(tags, encoding, temporary_directory, stale_time)
4444
@tags = tags
4545
@encoding = encoding
4646
@temporary_directory = temporary_directory
4747
@stale_time = stale_time
4848
end
4949

50-
def apply(prefix_key)
50+
def create_value(prefix_key)
5151
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
5252
end
53+
5354
end
5455

5556
def initialize(tags, encoding, temporary_directory,
5657
stale_time = DEFAULT_STALE_TIME_SECS,
5758
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
5859
# The path need to contains the prefix so when we start
5960
# logtash after a crash we keep the remote structure
60-
@prefixed_factories = java.util.concurrent.ConcurrentHashMap.new
61+
@prefixed_factories = Concurrent::Map.new
6162

6263
@sweeper_interval = sweeper_interval
6364

@@ -67,18 +68,19 @@ def initialize(tags, encoding, temporary_directory,
6768
end
6869

6970
def keys
70-
@prefixed_factories.keySet
71+
@prefixed_factories.keys
7172
end
7273

7374
def each_files
74-
@prefixed_factories.elements.each do |prefixed_file|
75+
@prefixed_factories.values.each do |prefixed_file|
7576
prefixed_file.with_lock { |factory| yield factory.current }
7677
end
7778
end
7879

7980
# Return the file factory
8081
def get_factory(prefix_key)
81-
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
82+
prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) }
83+
prefix_val.with_lock { |factory| yield factory }
8284
end
8385

8486
def get_file(prefix_key)
@@ -95,7 +97,7 @@ def size
9597

9698
def remove_stale(k, v)
9799
if v.stale?
98-
@prefixed_factories.remove(k, v)
100+
@prefixed_factories.delete_pair(k, v)
99101
v.delete!
100102
end
101103
end
@@ -104,7 +106,7 @@ def start_stale_sweeper
104106
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
105107
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
106108

107-
@prefixed_factories.forEach{|k,v| remove_stale(k,v)}
109+
@prefixed_factories.each { |k, v| remove_stale(k,v) }
108110
end
109111

110112
@stale_sweeper.execute

logstash-output-s3.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-s3'
3-
s.version = '4.3.6'
3+
s.version = '4.3.7'
44
s.licenses = ['Apache-2.0']
55
s.summary = "Sends Logstash events to the Amazon Simple Storage Service"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/outputs/s3/file_repository_spec.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@
8989
it "returns all available keys" do
9090
subject.get_file(prefix_key) { |file| file.write("something") }
9191
expect(subject.keys).to include(prefix_key)
92-
expect(subject.keys.to_a.size).to eq(1)
92+
expect(subject.keys.size).to eq(1)
9393
end
9494

9595
it "clean stale factories" do
@@ -105,9 +105,14 @@
105105

106106
@file_repository.get_file("another-prefix") { |file| file.write("hello") }
107107
expect(@file_repository.size).to eq(2)
108+
sleep 1.2 # allow sweeper to kick in
108109
try(10) { expect(@file_repository.size).to eq(1) }
109110
expect(File.directory?(path)).to be_falsey
111+
112+
sleep 1.5 # allow sweeper to kick in, again
113+
expect(@file_repository.size).to eq(1)
110114
end
115+
111116
end
112117

113118

0 commit comments

Comments
 (0)