Skip to content

Commit

Permalink
Avoid to run Beats parser and Beats protocol handler in separate exec…
Browse files Browse the repository at this point in the history
…utors group (beatsHandlerExecutorGroup)
  • Loading branch information
andsel committed Sep 26, 2023
1 parent 339913c commit 23f96d6
Showing 1 changed file with 2 additions and 4 deletions.
6 changes: 2 additions & 4 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5;

private final EventExecutorGroup idleExecutorGroup;
private final EventExecutorGroup beatsHandlerExecutorGroup;
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

Expand All @@ -121,7 +120,6 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
}

public void initChannel(SocketChannel socket){
Expand All @@ -134,7 +132,8 @@ public void initChannel(SocketChannel socket){
new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
pipeline.addLast(BEATS_ACKER, new AckEncoder());
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
pipeline.addLast(new BeatsParser());
pipeline.addLast(new BeatsHandler(localMessageListener));
}


Expand All @@ -152,7 +151,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
public void shutdownEventExecutor() {
try {
idleExecutorGroup.shutdownGracefully().sync();
beatsHandlerExecutorGroup.shutdownGracefully().sync();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
Expand Down

0 comments on commit 23f96d6

Please sign in to comment.