diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index 5d65df93..3b85dbe0 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -163,7 +163,7 @@ def register end # def register def create_server - server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads) + server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @executor_threads, 5) if @ssl begin diff --git a/src/main/java/org/logstash/beats/CustomRejectedExecutionHandler.java b/src/main/java/org/logstash/beats/CustomRejectedExecutionHandler.java new file mode 100644 index 00000000..b3390301 --- /dev/null +++ b/src/main/java/org/logstash/beats/CustomRejectedExecutionHandler.java @@ -0,0 +1,18 @@ +package org.logstash.beats; + +import io.netty.util.concurrent.RejectedExecutionHandler; +import io.netty.util.concurrent.SingleThreadEventExecutor; + +public class CustomRejectedExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejected(Runnable task, SingleThreadEventExecutor executor) { + System.out.println("Requeueing the message"); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + executor.execute(task); + } +} diff --git a/src/main/java/org/logstash/beats/DaemonThreadFactory.java b/src/main/java/org/logstash/beats/DaemonThreadFactory.java new file mode 100644 index 00000000..56053df8 --- /dev/null +++ b/src/main/java/org/logstash/beats/DaemonThreadFactory.java @@ -0,0 +1,30 @@ +package org.logstash.beats; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class DaemonThreadFactory implements ThreadFactory { + + final ThreadGroup group; + final AtomicInteger threadNumber = new AtomicInteger(1); + final String namePrefix; + + DaemonThreadFactory(String namePrefix) { + this.namePrefix = namePrefix; + group = Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, + namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", + 0); + t.setDaemon(true); + return t; + } + + public static ThreadFactory daemonThreadFactory(String namePrefix) { + return new DaemonThreadFactory(namePrefix); + } + +} diff --git a/src/main/java/org/logstash/beats/Runner.java b/src/main/java/org/logstash/beats/Runner.java index a5eb6fd3..01609dcf 100644 --- a/src/main/java/org/logstash/beats/Runner.java +++ b/src/main/java/org/logstash/beats/Runner.java @@ -18,7 +18,7 @@ static public void main(String[] args) throws Exception { // Check for leaks. // ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); - Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors()); + Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, Runtime.getRuntime().availableProcessors(), 128); if(args.length > 0 && args[0].equals("ssl")) { logger.debug("Using SSL"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index 74624d60..37faf6f4 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -16,25 +16,33 @@ import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.logstash.beats.DaemonThreadFactory.daemonThreadFactory; public class Server { private final static Logger logger = LogManager.getLogger(Server.class); private final int port; private final String host; - private final int beatsHeandlerThreadCount; + private final int beatsHandlerThreadCount; + private final int maxPendingRequests; private NioEventLoopGroup workGroup; private IMessageListener messageListener = new MessageListener(); private SslSimpleBuilder sslBuilder; private BeatsInitializer beatsInitializer; + private final int connectionBacklog = 128; private final int clientInactivityTimeoutSeconds; - public Server(String host, int p, int timeout, int threadCount) { + public Server(String host, int p, int timeout, int threadCount, int maxPendingRequests) { this.host = host; port = p; clientInactivityTimeoutSeconds = timeout; - beatsHeandlerThreadCount = threadCount; + beatsHandlerThreadCount = threadCount; + this.maxPendingRequests = maxPendingRequests; } public void enableSSL(SslSimpleBuilder builder) { @@ -54,11 +62,12 @@ public Server listen() throws InterruptedException { try { logger.info("Starting server on port: {}", this.port); - beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount); + beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHandlerThreadCount, maxPendingRequests); ServerBootstrap server = new ServerBootstrap(); server.group(workGroup) .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, connectionBacklog) .childOption(ChannelOption.SO_LINGER, 0) // Since the protocol doesn't support yet a remote close from the server and we don't want to have 'unclosed' socket lying around we have to use `SO_LINGER` to force the close of the socket. .childHandler(beatsInitializer); @@ -114,14 +123,14 @@ private class BeatsInitializer extends ChannelInitializer { private final int localClientInactivityTimeoutSeconds; private final boolean localEnableSSL; - BeatsInitializer(Boolean enableSSL, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) { + BeatsInitializer(Boolean enableSSL, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread, int maxPendingRequests) { // Keeps a local copy of Server settings, so they can't be modified once it starts listening this.localEnableSSL = enableSSL; this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); - beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); - + beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread, daemonThreadFactory("beats-input-handler-executor"), maxPendingRequests, new CustomRejectedExecutionHandler()); + //beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket) throws IOException, NoSuchAlgorithmException, CertificateException { @@ -136,6 +145,7 @@ public void initChannel(SocketChannel socket) throws IOException, NoSuchAlgorith pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); + } @Override diff --git a/src/test/java/org/logstash/beats/ServerTest.java b/src/test/java/org/logstash/beats/ServerTest.java index 37512cdc..be747251 100644 --- a/src/test/java/org/logstash/beats/ServerTest.java +++ b/src/test/java/org/logstash/beats/ServerTest.java @@ -34,6 +34,7 @@ public class ServerTest { private EventLoopGroup group; private final String host = "0.0.0.0"; private final int threadCount = 10; + private final int maxPendingRequests = 128; @Before public void setUp() { @@ -50,7 +51,7 @@ public void testServerShouldTerminateConnectionWhenExceptionHappen() throws Inte final CountDownLatch latch = new CountDownLatch(concurrentConnections); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests); final AtomicBoolean otherCause = new AtomicBoolean(false); server.setMessageListener(new MessageListener() { public void onNewConnection(ChannelHandlerContext ctx) { @@ -114,7 +115,7 @@ public void testServerShouldTerminateConnectionIdleForTooLong() throws Interrupt final CountDownLatch latch = new CountDownLatch(concurrentConnections); final AtomicBoolean exceptionClose = new AtomicBoolean(false); - final Server server = new Server(host, randomPort, inactivityTime, threadCount); + final Server server = new Server(host, randomPort, inactivityTime, threadCount, maxPendingRequests); server.setMessageListener(new MessageListener() { @Override public void onNewConnection(ChannelHandlerContext ctx) { @@ -170,7 +171,7 @@ public void run() { @Test public void testServerShouldAcceptConcurrentConnection() throws InterruptedException { - final Server server = new Server(host, randomPort, 30, threadCount); + final Server server = new Server(host, randomPort, 30, threadCount, maxPendingRequests); SpyListener listener = new SpyListener(); server.setMessageListener(listener); Runnable serverTask = new Runnable() {