Skip to content

Refactoring, Dynamic prefix and AWS v2 #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
## 4.0.0
- This version is a complete rewrite over version 3.0.0 See #103
- This Plugin now uses the V2 version of the SDK, this make sure we receive the latest updates and changes.
- We now uses S3's `upload_file` instead of reading chunks, this method is more efficient and will uses the multipart with threads if the files is too big.
- You can now use the `fieldref` syntax in the prefix to dynamically changes the target with the events it receives.
- The Upload queue is now a bounded list, this options is necessary to allow back pressure to be communicated back to the pipeline but its configurable by the user.
- If the queue is full the plugin will start the upload in the current thread.
- The plugin now threadsafe and support the concurrency model `shared`
- The rotation strategy can be selected, the recommended is `size_and_time` that will check for both the configured limits (`size` and `time` are also available)
- The `restore` option will now use a separate threadpool with an unbounded queue
- The `restore` option will not block the launch of logstash and will uses less resources than the real time path
- The plugin now uses `multi_receive_encode`, this will optimize the writes to the files
- rotate operation are now batched to reduce the number of IO calls.
- Empty file will not be uploaded by any rotation rotation strategy
- We now use Concurrent-Ruby for the implementation of the java executor
- If you have finer grain permission on prefixes or want faster boot, you can disable the credentials check with `validate_credentials_on_root_bucket`
- The credentials check will no longer fails if we can't delete the file
- We now have a full suite of integration test for all the defined rotation

Fixes: #4 #81 #44 #59 #50

## 3.2.0
- Move to the new concurrency model `:single`
- use correct license identifier #99
Expand Down
496 changes: 188 additions & 308 deletions lib/logstash/outputs/s3.rb

Large diffs are not rendered by default.

120 changes: 120 additions & 0 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# encoding: utf-8
require "java"
require "concurrent"
require "concurrent/timer_task"
require "logstash/util"

ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap

module LogStash
module Outputs
class S3
class FileRepository
DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60
DEFAULT_STALE_TIME_SECS = 15 * 60
# Ensure that all access or work done
# on a factory is threadsafe
class PrefixedValue
def initialize(file_factory, stale_time)
@file_factory = file_factory
@lock = Mutex.new
@stale_time = stale_time
end

def with_lock
@lock.synchronize {
yield @file_factory
}
end

def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end

def apply(prefix)
return self
end

def delete!
with_lock{ |factory| factory.current.delete! }
end
end

class FactoryInitializer
def initialize(tags, encoding, temporary_directory, stale_time)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@stale_time = stale_time
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
end
end

def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
@prefixed_factories = ConcurrentHashMap.new

@sweeper_interval = sweeper_interval

@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

start_stale_sweeper
end

def keys
@prefixed_factories.keySet
end

def each_files
@prefixed_factories.elements.each do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
end

def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end

def shutdown
stop_stale_sweeper
end

def size
@prefixed_factories.size
end

def remove_stale(k, v)
if v.stale?
@prefixed_factories.remove(k, v)
v.delete!
end
end

def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@prefixed_factories.forEach{|k,v| remove_stale(k,v)}
end

@stale_sweeper.execute
end

def stop_stale_sweeper
@stale_sweeper.shutdown
end
end
end
end
end
22 changes: 22 additions & 0 deletions lib/logstash/outputs/s3/patch.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# This is patch related to the autoloading and ruby
#
# The fix exist in jruby 9k but not in the current jruby, not sure when or it will be backported
# https://github.com/jruby/jruby/issues/3645
#
# AWS is doing tricky name discovery in the module to generate the correct error class and
# this strategy is bogus in jruby and `eager_autoload` don't fix this issue.
#
# This will be a short lived patch since AWS is removing the need.
# see: https://github.com/aws/aws-sdk-ruby/issues/1301#issuecomment-261115960
old_stderr = $stderr

$stderr = StringIO.new
begin
module Aws
const_set(:S3, Aws::S3)
end
ensure
$stderr = old_stderr
end


18 changes: 18 additions & 0 deletions lib/logstash/outputs/s3/path_validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class PathValidator
INVALID_CHARACTERS = "\^`><"

def self.valid?(name)
name.match(matches_re).nil?
end

def self.matches_re
/[#{Regexp.escape(INVALID_CHARACTERS)}]/
end
end
end
end
end
24 changes: 24 additions & 0 deletions lib/logstash/outputs/s3/size_and_time_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# encoding: utf-8
require "logstash/outputs/s3/size_rotation_policy"
require "logstash/outputs/s3/time_rotation_policy"

module LogStash
module Outputs
class S3
class SizeAndTimeRotationPolicy
def initialize(file_size, time_file)
@size_strategy = SizeRotationPolicy.new(file_size)
@time_strategy = TimeRotationPolicy.new(time_file)
end

def rotate?(file)
@size_strategy.rotate?(file) || @time_strategy.rotate?(file)
end

def needs_periodic?
true
end
end
end
end
end
26 changes: 26 additions & 0 deletions lib/logstash/outputs/s3/size_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class SizeRotationPolicy
attr_reader :size_file

def initialize(size_file)
if size_file <= 0
raise LogStash::ConfigurationError, "`size_file` need to be greather than 0"
end

@size_file = size_file
end

def rotate?(file)
file.size >= size_file
end

def needs_periodic?
false
end
end
end
end
end
71 changes: 71 additions & 0 deletions lib/logstash/outputs/s3/temporary_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# encoding: utf-8
require "thread"
require "forwardable"
require "fileutils"

module LogStash
module Outputs
class S3
# Wrap the actual file descriptor into an utility classe
# It make it more OOP and easier to reason with the paths.
class TemporaryFile
extend Forwardable

def_delegators :@fd, :path, :write, :close, :fsync

attr_reader :fd

def initialize(key, fd, temp_path)
@fd = fd
@key = key
@temp_path = temp_path
@created_at = Time.now
end

def ctime
@created_at
end

def temp_path
@temp_path
end

def size
# Use the fd size to get the accurate result,
# so we dont have to deal with fsync
# if the file is close we will use the File::size
begin
@fd.size
rescue IOError
::File.size(path)
end
end

def key
@key.gsub(/^\//, "")
end

# Each temporary file is made inside a directory named with an UUID,
# instead of deleting the file directly and having the risk of deleting other files
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
# a sandbox.
def delete!
@fd.close
::FileUtils.rm_rf(@temp_path, :secure => true)
end

def empty?
size == 0
end

def self.create_from_existing_file(file_path, temporary_folder)
key_parts = Pathname.new(file_path).relative_path_from(temporary_folder).to_s.split(::File::SEPARATOR)

TemporaryFile.new(key_parts.slice(1, key_parts.size).join("/"),
::File.open(file_path, "r"),
::File.join(temporary_folder, key_parts.slice(0, 1)))
end
end
end
end
end
Loading