-
Notifications
You must be signed in to change notification settings - Fork 30
refactor plugin for the new shared concurrency model #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ | |
class LogStash::Outputs::Tcp < LogStash::Outputs::Base | ||
|
||
config_name "tcp" | ||
concurrency :shared | ||
|
||
default :codec, "json" | ||
|
||
|
@@ -115,7 +116,7 @@ def register | |
end # @ssl_enable | ||
|
||
if server? | ||
workers_not_supported | ||
@server_mutex = Mutex.new | ||
|
||
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") | ||
begin | ||
|
@@ -144,35 +145,6 @@ def register | |
end | ||
end | ||
end | ||
|
||
@codec.on_event do |event, payload| | ||
@client_threads.each do |client_thread| | ||
client_thread[:client].write(payload) | ||
end | ||
@client_threads.reject! {|t| !t.alive? } | ||
end | ||
else | ||
client_socket = nil | ||
@codec.on_event do |event, payload| | ||
begin | ||
client_socket = connect unless client_socket | ||
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) | ||
# don't expect any reads, but a readable socket might | ||
# mean the remote end closed, so read it and throw it away. | ||
# we'll get an EOFError if it happens. | ||
client_socket.sysread(16384) if r.any? | ||
|
||
# Now send the payload | ||
client_socket.syswrite(payload) if w.any? | ||
rescue => e | ||
@logger.warn("tcp output exception", :host => @host, :port => @port, | ||
:exception => e, :backtrace => e.backtrace) | ||
client_socket.close rescue nil | ||
client_socket = nil | ||
sleep @reconnect_interval | ||
retry | ||
end | ||
end | ||
end | ||
end # def register | ||
|
||
|
@@ -204,7 +176,38 @@ def server? | |
end # def server? | ||
|
||
public | ||
def receive(event) | ||
@codec.encode(event) | ||
end # def receive | ||
def multi_receive_encoded(encoded) | ||
if server? | ||
@server_mutex.synchronize do | ||
@client_threads.each do |client_thread| | ||
encoded.each do |event,data| | ||
client_thread[:client].write(data) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I realize this was in the original, but maybe this would be a good time to rename There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It occurs to me that if you start up two TCP servers they would step on each other's use of |
||
end | ||
end | ||
@client_threads.reject! {|t| !t.alive? } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the purpose of this? Why would some client threads be alive? I know this is original code but it's confusing to me. I don't understand it. |
||
end | ||
else | ||
client_socket = nil | ||
begin | ||
client_socket = connect unless client_socket | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if you already planned this or not, but just copying the code from the original here is problematic. This will always reconnect on each batch! We should have a client socket per TCP output per worker thread. Code like this might be good.
|
||
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) | ||
# don't expect any reads, but a readable socket might | ||
# mean the remote end closed, so read it and throw it away. | ||
# we'll get an EOFError if it happens. | ||
client_socket.sysread(16384) if r.any? | ||
|
||
# Now send the payload | ||
encoded.each do |event,data| | ||
client_socket.syswrite(data) if w.any? | ||
end | ||
rescue => e | ||
@logger.warn("tcp output exception", :host => @host, :port => @port, | ||
:exception => e, :backtrace => e.backtrace) | ||
client_socket.close rescue nil | ||
client_socket = nil | ||
sleep @reconnect_interval | ||
retry | ||
end | ||
end | ||
end | ||
end # class LogStash::Outputs::Tcp |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've been naming this variable
events_and_encoded
. I'm not saying it should be named the exact same thing here, butencoded
to me sounds like a bytes/strings only.