Skip to content

Refactoring, Dynamic prefix and AWS v2 (extended) #105

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
491 changes: 184 additions & 307 deletions lib/logstash/outputs/s3.rb

Large diffs are not rendered by default.

120 changes: 120 additions & 0 deletions lib/logstash/outputs/s3/file_repository.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# encoding: utf-8
require "java"
require "concurrent"
require "concurrent/timer_task"
require "logstash/util"

ConcurrentHashMap = java.util.concurrent.ConcurrentHashMap

module LogStash
module Outputs
class S3
class FileRepository
DEFAULT_STATE_SWEEPER_INTERVAL_SECS = 60
DEFAULT_STALE_TIME_SECS = 15 * 60
# Ensure that all access or work done
# on a factory is threadsafe
class PrefixedValue
def initialize(factory, stale_time)
@factory = factory
@lock = Mutex.new
@stale_time = stale_time
end

def with_lock
@lock.synchronize {
yield @factory
}
end

def stale?
with_lock { |factory| factory.current.size == 0 && (Time.now - factory.current.ctime > @stale_time) }
end

def apply(prefix)
return self
end

def delete!
with_lock{ |factory| factory.current.delete! }
end
end

class FactoryInitializer
def initialize(tags, encoding, temporary_directory, stale_time)
@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@stale_time = stale_time
end

def apply(prefix_key)
PrefixedValue.new(TemporaryFileFactory.new(prefix_key, @tags, @encoding, @temporary_directory), @stale_time)
end
end

def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
# The path need to contains the prefix so when we start
# logtash after a crash we keep the remote structure
@prefixed_factories = ConcurrentHashMap.new

@sweeper_interval = sweeper_interval

@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

start_stale_sweeper
end

def keys
@prefixed_factories.keySet
end

def each_files
@prefixed_factories.elements.each do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end

# Return the file factory
def get_factory(prefix_key)
@prefixed_factories.computeIfAbsent(prefix_key, @factory_initializer).with_lock { |factory| yield factory }
end

def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end

def shutdown
stop_stale_sweeper
end

def size
@prefixed_factories.size
end

def remove_stale(k, v)
if v.stale?
@prefixed_factories.remove(k, v)
v.delete!
end
end

def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")

@prefixed_factories.forEach{|k,v| remove_stale(k,v)}
end

@stale_sweeper.execute
end

def stop_stale_sweeper
@stale_sweeper.shutdown
end
end
end
end
end
18 changes: 18 additions & 0 deletions lib/logstash/outputs/s3/path_validator.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class PathValidator
INVALID_CHARACTERS = "\^`><"

def self.valid?(name)
name.match(matches_re).nil?
end

def self.matches_re
/[#{Regexp.escape(INVALID_CHARACTERS)}]/
end
end
end
end
end
24 changes: 24 additions & 0 deletions lib/logstash/outputs/s3/size_and_time_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# encoding: utf-8
require "logstash/outputs/s3/size_rotation_policy"
require "logstash/outputs/s3/time_rotation_policy"

module LogStash
module Outputs
class S3
class SizeAndTimeRotationPolicy
def initialize(file_size, time_file)
@size_strategy = SizeRotationPolicy.new(file_size)
@time_strategy = TimeRotationPolicy.new(time_file)
end

def rotate?(file)
@size_strategy.rotate?(file) || @time_strategy.rotate?(file)
end

def need_periodic?
true
end
end
end
end
end
26 changes: 26 additions & 0 deletions lib/logstash/outputs/s3/size_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class SizeRotationPolicy
attr_reader :size_file

def initialize(size_file)
if size_file <= 0
raise LogStash::ConfigurationError, "`size_file` need to be greather than 0"
end

@size_file = size_file
end

def rotate?(file)
file.size >= size_file
end

def need_periodic?
false
end
end
end
end
end
51 changes: 51 additions & 0 deletions lib/logstash/outputs/s3/temporary_file.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# encoding: utf-8
require "thread"
require "forwardable"
require "fileutils"

module LogStash
module Outputs
class S3
# Wrap the actual file descriptor into an utility classe
# It make it more OOP and easier to reason with the paths.
class TemporaryFile
extend Forwardable
DELEGATES_METHODS = [:path, :write, :close, :size, :fsync]

def_delegators :@fd, *DELEGATES_METHODS

def initialize(key, fd, temp_path)
@fd = fd
@key = key
@temp_path = temp_path
@created_at = Time.now
end

def ctime
@created_at
end

def temp_path
@temp_path
end

def key
@key.gsub(/^\//, "")
end

# Each temporary file is made inside a directory named with an UUID,
# instead of deleting the file directly and having the risk of deleting other files
# we delete the root of the UUID, using a UUID also remove the risk of deleting unwanted file, it acts as
# a sandbox.
def delete!
@fd.close
::FileUtils.rm_rf(@temp_path, :secure => true)
end

def empty?
size == 0
end
end
end
end
end
93 changes: 93 additions & 0 deletions lib/logstash/outputs/s3/temporary_file_factory.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# encoding: utf-8
require "socket"
require "securerandom"
require "fileutils"

module LogStash
module Outputs
class S3
# Since the file can contains dynamic part, we have to handle a more local structure to
# allow a nice recovery from a crash.
#
# The local structure will look like this.
#
# <TEMPORARY_PATH>/<UUID>/<prefix>/ls.s3.localhost.%Y-%m-%dT%H.%m.tag_es_fb.part1.txt.gz
#
# Since the UUID should be fairly unique I can destroy the whole path when an upload is complete.
# I do not have to mess around to check if the other directory have file in it before destroying them.
class TemporaryFileFactory
FILE_MODE = "a"
GZIP_ENCODING = "gzip"
GZIP_EXTENSION = "txt.gz"
TXT_EXTENSION = "txt"
STRFTIME = "%Y-%m-%dT%H.%M"

attr_accessor :counter, :tags, :prefix, :encoding, :temporary_directory, :current

def initialize(prefix, tags, encoding, temporary_directory)
@counter = 0
@prefix = prefix

@tags = tags
@encoding = encoding
@temporary_directory = temporary_directory
@lock = Mutex.new

rotate!
end

def rotate!
@lock.synchronize {
@current = new_file
increment_counter
@current
}
end

private
def extension
gzip? ? GZIP_EXTENSION : TXT_EXTENSION
end

def gzip?
encoding == GZIP_ENCODING
end

def increment_counter
@counter += 1
end

def current_time
Time.now.strftime(STRFTIME)
end

def generate_name
filename = "ls.s3.#{SecureRandom.uuid}.#{current_time}"

if tags.size > 0
"#{filename}.tag_#{tags.join('.')}.part#{counter}.#{extension}"
else
"#{filename}.part#{counter}.#{extension}"
end
end

def new_file
uuid = SecureRandom.uuid
name = generate_name
path = ::File.join(temporary_directory, uuid)
key = ::File.join(prefix, name)

FileUtils.mkdir_p(::File.join(path, prefix))

io = if gzip?
Zlib::GzipWriter.open(::File.join(path, key))
else
::File.open(::File.join(path, key), FILE_MODE)
end

TemporaryFile.new(key, io, path)
end
end
end
end
end
26 changes: 26 additions & 0 deletions lib/logstash/outputs/s3/time_rotation_policy.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# encoding: utf-8
module LogStash
module Outputs
class S3
class TimeRotationPolicy
attr_reader :time_file

def initialize(time_file)
if time_file <= 0
raise LogStash::ConfigurationError, "`time_file` need to be greather than 0"
end

@time_file = time_file * 60
end

def rotate?(file)
file.size > 0 && (Time.now - file.ctime) >= time_file
end

def need_periodic?
true
end
end
end
end
end
Loading