Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a strategy to handle OOM in direct memory #475

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Jul 5, 2023

Release notes

Direct memory used by plugin weren't tracked and this could generate out-of-memory crashes when unattended spikes or traffic loads reached the plugin.
The plugin is changed to monitor how much direct memory it's using and in case it's close to the limit dropping connection to free some space.
To control spikes of memory usage, the incoming message reads switched from push mode to pull, so that the plugin has control on rate of ingestion and it's not the determined by the clients.

What does this PR do?

Changes the way the plugin pulls data and handle incoming connections to respect the max direct memory size and avoid out of memory errors.

  • switches to pull read instead of push read. Instead to respond to every buffer that a connection present, it's the plugin that actively grab the byte buffer to process.
  • reject new connections if the direct memory is almost exhausted.
  • if OOM on direct memory happens, close the connection that receive it, trying to lowering the pressure on the memory.

This work is based on the existing #410.

Why is it important/What is the impact to the user?

Provides a set of countermeasures to limit the probability of OutOfMemory errors when creating new buffers in direct memory.
In addition to this, it introduces a minimal amount fo direct memory (256MB) that's required to start processing data, if not respected, the pipeline used fails to start.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

  • run with some real beats instances

How to test this PR locally

The test plan has some steps:

  • generate TLS certificates
  • build the plugin and configure Logstash
  • set up memory size limits, configure pipeline
  • run Logstash and benchmark it

create TLS certificates

build the plugin and configure Logstash

gem "logstash-input-beats", :path => "/Users/andrea/workspace/logstash_plugins/logstash-input-beats"
  • install bin/logstash-plugin install --no-verify

set up memory size limits, configure pipeline

  • in Logstash's config/jvm.options add:
-XX:MaxDirectMemorySize=128m
-XX:-MaxFDLimit
  • edit the test pipeline as
input {
  beats {
    port => 3333
    ssl_enabled => true 
    ssl_key => "/<path_to>/certificates/server_from_root.key"
    ssl_certificate => "/<path_to>/certificates/server_from_root.crt"
    ssl_client_authentication => "none"
  }
}
filter {
  ruby {
  init => "Thread.new { loop { puts 'Direct mem: ' + Java::io.netty.buffer.ByteBufAllocator::DEFAULT.metric.used_direct_memory.to_s;sleep 5 } }"
    code => ""
  }
}

output{
  sink {}
}
  • [optional] set ulimit properly if expects lots of connections
ulimit -S -n 1048576

run Logstash and benchmark it

From a terminal launch Logstash process bin/logstash -f path_to_config and the benchmark script from another:

ruby -J-Xmx16g -J-XX:-MaxFDLimit benchmark_client.rb --test=beats -a yes

Expected outcome

The expectation is that direct memory consumption never goes up to the limit and if the client doesn't consume the ACKs messages (-a no) is the client that goes in error and not the Logstash side.

Test with real Filebeat

To start a bunch of Filebeat clients sending data to Beats input, just use the script ingest_all_hands/filebeat/multiple_filebeats.rb present in PRhttps://github.com/elastic/logstash/pull/15151

ruby multiple_filebeats.rb

It download a filebeat distribution, unpacks, generate some input file, prepare the data and logs folder for each Filebeat instance and run the processes.

Related issues

Use cases

Screenshots

Logs

@andsel andsel self-assigned this Jul 5, 2023
@andsel andsel marked this pull request as ready for review July 20, 2023 15:25
@andsel andsel changed the title Mitigate thundering herd 2 Implement a strategy to handle OOM in direct memory Jul 24, 2023
@roaksoax roaksoax requested review from jsvd and yaauie July 24, 2023 14:10
if (isDirectMemoryOOM(cause)) {
DirectMemoryUsage direct = DirectMemoryUsage.capture();
logger.info("Direct memory status, used: {}, pinned: {}, ratio: {}", direct.used, direct.pinned, direct.ratio);
logger.warn("Dropping connection {} because run out of direct memory. To fix it, in Filebeat you can" +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should also consider that the source is not filebeat. either it's another beat like winlogbeat, or Logstash itself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 8f3b08c

Comment on lines 32 to 33
ctx.close();
logger.info("Dropping connection {} due to high resource consumption", ctx.channel());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be the order? also it's likely worth bumping to warn as it's a near OOM situation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's almost idempotent, but in e general we could have that pinned is close to used, but far from max direct. In such a case I think don't want to enter too much frequently the check on max memory.
For example considering:

  • max direct is 128MB
  • used is 64MB
  • pinned is constantly at 56 MB
    The system is balanced for the work it's doing to consume ~50MB and it's far from OOM condition, so it's not in harmful situation, I think.

@andsel andsel requested a review from jsvd July 28, 2023 09:05
@@ -134,7 +134,11 @@ public void initChannel(SocketChannel socket){
new IdleStateHandler(localClientInactivityTimeoutSeconds, IDLESTATE_WRITER_IDLE_TIME_SECONDS, localClientInactivityTimeoutSeconds));
pipeline.addLast(BEATS_ACKER, new AckEncoder());
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser(), new BeatsHandler(localMessageListener));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like us to be a bit more intentional in why we're removing the executor group here. in my test PR I removed it to simplify the number of threads I had to reason about, and having a single pool for both boss/worker loops would mean blocking the workers would stop boss from accepting new connections too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be the complete pipeline executed in the worker group, instead just BeatsParser and BeatsHandler, while BeatsHacker and Connectionhandler are still executed by the boss group?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean shouldn't use the ServerBootstrap.group(bossGroup, workerGroup) instead of assigning the group for just those 2 Beats handler? If we do this, we have at least one thread context switch on every pipeline. Maybe it's something I'm not grasping.

@jsvd
Copy link
Member

jsvd commented Jul 28, 2023

something I'm observing is that maybe linger is not being set in the client connections, as I continue to see the same connection ids reporting oom errors well after the script that does load generation is gone:

https://gist.github.com/jsvd/a047320ff68bbe064e93dec0d6a251f7

@andsel
Copy link
Contributor Author

andsel commented Aug 25, 2023

@jsvd I used the PR #477 to test in isolation the BeatsParser, what I've observed (with netstat -anv -p tcp | grep <pid> | grep CLOSE_WAIT ) is tha:

  • the client creates 1500 connections and terminates quite fast
  • once it terminates, the 1500 connections go in CLOSE_WAIT state
  • the Netty side start reporting OOM exceptions or connection resets, for all the 1500 channels
  • at a certain point it finish to report exceptions, but this phase takes ~5 minutes

I think that when we reach an OOM error and we have the client that terminates immediately, we expect that all the 1500 channels also terminates immediately on Netty side, but in reality it takes minutes and this gives the idea of a looping error on the connections. Why it takes 5 minutes to stop logging channel's exceptions is not clear to me, maybe it's due to memory shortage.

Do you think that on server side, on first OOM error notification, should we close immediately all the other connections?
In such case I think that it's an asynch request to the event loops responsible for the other connections, and it could also take such time, because at the end is what Netty is already doing.

andsel and others added 15 commits September 27, 2023 09:00
…omes not writable, to excert backpressure to the sender system
On new channel registration (that correspond to a new client connection),
verifies the direct memory stastus to understand if almost the totality max direct memory
is reached and also if the majoproity of that space is used by pinned byte buffers.
If the codition is verified that means direct memory avvailable is terminating, so no new
connection would help in the situation, and the incoming new connections are closed.
…ble status due to that offload every message to the outbound list
… during the read opeartion and not only on exception
@jsvd jsvd self-requested a review October 3, 2023 14:50
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some 1:1 discussion we think that splitting this PR into two would facilitate moving forward here.

  1. One PR would disable autoread
  2. Another would introduce the protect_direct_memory setting

Maybe we can start with PR1 and leave 2 for later. My suggestion would be to start with a scenario we know breaks in the current main beats input, but is handled better with the PR.
For example, the follow script causes OOM consistently on 256M direct memory: https://gist.github.com/jsvd/9cba50e01bb840d41dba07b5862be358

@andsel andsel mentioned this pull request Nov 7, 2023
6 tasks
@andsel
Copy link
Contributor Author

andsel commented Nov 7, 2023

@jsvd disabling of autoread is present in PR #485. When we will merge that PR, I'll remove the corresponding code from this, and refocus this to be second PR, to tackle the OOM problem.

@robost001
Copy link

I tested PR475, manually merged it to 6.8.0 and started with logstash-7.17.17, then when calling V2Batch.realease I got a reference counting issue.

[2024-02-06T17:23:15,862][INFO ][org.logstash.beats.OOMConnectionCloser][main][c84edf99f631ad281446a904b8d1587b2f1505e2a620655c3115851f704f29e5] Direct
memory status, used: 27829207040, pinned: 23017497600, ratio: 83
[2024-02-06T17:23:15,862][WARN ][io.netty.channel.DefaultChannelPipeline][main][c84edf99f631ad281446a904b8d1587b2f1505e2a620655c3115851f704f29e5] An exc
eptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the excepti
on.
io.netty.handler.codec.DecoderException: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:499) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.100.Final.jar:
4.1.100.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61) ~[netty-transport-4.1.100.Final.jar:4.1.100.
Final]
        at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425) ~[netty-transport-4.1.100.Final.jar:4.1.100.Fina
l]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.100.Final.jar:4.1.100.Final]
        at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
        at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:83) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:148) ~[netty-common-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-buffer-4.1.100.Final.jar:4.1.100.Final]
        at org.logstash.beats.V2Batch.release(V2Batch.java:105) ~[logstash-input-beats-6.8.0.jar:?]
        at org.logstash.beats.BeatsParser.decode(BeatsParser.java:211) ~[logstash-input-beats-6.8.0.jar:?]
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.100.Final.jar:4.1.100.Final]
        ... 10 more

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Beat stops processing events after OOM but keeps running
4 participants