Skip to content

experimental introduction of a bounded queue and rejection handler #377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def register
end # def register

def create_server
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads)
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, 5)
if @ssl
ssl_context_builder = new_ssl_context_builder
if client_authentification?
Expand Down
2 changes: 1 addition & 1 deletion spec/inputs/beats_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
subject(:plugin) { LogStash::Inputs::Beats.new(config) }

it "sends the required options to the server" do
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads)
expect(org.logstash.beats.Server).to receive(:new).with(host, port, client_inactivity_timeout, threads, 5)
subject.register
end
end
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.logstash.beats;

import io.netty.util.concurrent.RejectedExecutionHandler;
import io.netty.util.concurrent.SingleThreadEventExecutor;

public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
System.out.println("Requeueing the message");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
executor.execute(task);
}
}
30 changes: 30 additions & 0 deletions src/main/java/org/logstash/beats/DaemonThreadFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.logstash.beats;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class DaemonThreadFactory implements ThreadFactory {

final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;

DaemonThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
group = Thread.currentThread().getThreadGroup();
}

@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + "[T#" + threadNumber.getAndIncrement() + "]",
0);
t.setDaemon(true);
return t;
}

public static ThreadFactory daemonThreadFactory(String namePrefix) {
return new DaemonThreadFactory(namePrefix);
}

}
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ static public void main(String[] args) throws Exception {
// Check for leaks.
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors());
Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), 128);

if(args.length > 0 && args[0].equals("ssl")) {
logger.debug("Using SSL");
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,30 @@
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;


import static org.logstash.beats.DaemonThreadFactory.daemonThreadFactory;

public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private final int port;
private final String host;
private final int beatsHeandlerThreadCount;
private final int beatsHandlerThreadCount;
private final int maxPendingRequests;
private NioEventLoopGroup workGroup;
private IMessageListener messageListener = new MessageListener();
private SslHandlerProvider sslHandlerProvider;
private BeatsInitializer beatsInitializer;
private final int connectionBacklog = 128;

private final int clientInactivityTimeoutSeconds;

public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount) {
public Server(String host, int p, int clientInactivityTimeoutSeconds, int threadCount, int maxPendingRequests) {
this.host = host;
port = p;
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
beatsHeandlerThreadCount = threadCount;
beatsHandlerThreadCount = threadCount;
this.maxPendingRequests = maxPendingRequests;
}

public void setSslHandlerProvider(SslHandlerProvider sslHandlerProvider){
Expand All @@ -49,11 +55,12 @@ public Server listen() throws InterruptedException {
try {
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);
beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, beatsHandlerThreadCount, maxPendingRequests);

ServerBootstrap server = new ServerBootstrap();
server.group(workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, connectionBacklog)
.childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket.
.childHandler(beatsInitializer);

Expand Down Expand Up @@ -108,12 +115,13 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThreadCount, int maxPendingRequests) {
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThreadCount, daemonThreadFactory("beats-input-handler-executor"), maxPendingRequests, new CustomRejectedExecutionHandler());
//beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
}

public void initChannel(SocketChannel socket){
Expand All @@ -127,6 +135,7 @@ public void initChannel(SocketChannel socket){
pipeline.addLast(BEATS_ACKER, new AckEncoder());
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));

}


Expand Down
7 changes: 4 additions & 3 deletions src/test/java/org/logstash/beats/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class ServerTest {
private EventLoopGroup group;
private final String host = "0.0.0.0";
private final int threadCount = 10;
private final int maxPendingRequests = 128;

@Before
public void setUp() {
Expand All @@ -50,7 +51,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte

final CountDownLatch latch = new CountDownLatch(concurrentConnections);

final Server server = new Server(host, randomPort, inactivityTime, threadCount);
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
final AtomicBoolean otherCause = new AtomicBoolean(false);
server.setMessageListener(new MessageListener() {
public void onNewConnection(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -114,7 +115,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt

final CountDownLatch latch = new CountDownLatch(concurrentConnections);
final AtomicBoolean exceptionClose = new AtomicBoolean(false);
final Server server = new Server(host, randomPort, inactivityTime, threadCount);
final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests);
server.setMessageListener(new MessageListener() {
@Override
public void onNewConnection(ChannelHandlerContext ctx) {
Expand Down Expand Up @@ -170,7 +171,7 @@ public void run() {

@Test
public void testServerShouldAcceptConcurrentConnection() throws InterruptedException {
final Server server = new Server(host, randomPort, 30, threadCount);
final Server server = new Server(host, randomPort, 30, threadCount, maxPendingRequests);
SpyListener listener = new SpyListener();
server.setMessageListener(listener);
Runnable serverTask = new Runnable() {
Expand Down