From 23f96d6ee3b12c3625a4ca7bf1f7e129db4dbe7b Mon Sep 17 00:00:00 2001 From: andsel Date: Wed, 5 Jul 2023 15:48:10 +0200 Subject: [PATCH] Avoid to run Beats parser and Beats protocol handler in separate executors group (beatsHandlerExecutorGroup) --- src/main/java/org/logstash/beats/Server.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index c343aaf6..5c4e0d25 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -112,7 +112,6 @@ private class BeatsInitializer extends ChannelInitializer { private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5; private final EventExecutorGroup idleExecutorGroup; - private final EventExecutorGroup beatsHandlerExecutorGroup; private final IMessageListener localMessageListener; private final int localClientInactivityTimeoutSeconds; @@ -121,7 +120,6 @@ private class BeatsInitializer extends ChannelInitializer { this.localMessageListener = messageListener; this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds; idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD); - beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread); } public void initChannel(SocketChannel socket){ @@ -134,7 +132,8 @@ public void initChannel(SocketChannel socket){ new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds)); pipeline.addLast(BEATS_ACKER, new AckEncoder()); pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler()); - pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener)); + pipeline.addLast(new BeatsParser()); + pipeline.addLast(new BeatsHandler(localMessageListener)); } @@ -152,7 +151,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E public void shutdownEventExecutor() { try { idleExecutorGroup.shutdownGracefully().sync(); - beatsHandlerExecutorGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new IllegalStateException(e); }