Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support memory overflow protection. #1095

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ApnsChannelPool {

private final Set<Future<Channel>> pendingCreateChannelFutures = new HashSet<>();
private final Queue<Promise<Channel>> pendingAcquisitionPromises = new ArrayDeque<>();
private final int maxPendingAcquisition;

private boolean isClosed = false;

Expand Down Expand Up @@ -91,12 +92,14 @@ public void handleConnectionCreationFailed() {
*
* @param channelFactory the factory to be used to create new channels
* @param capacity the maximum number of channels that may be held in this pool
* @param maxPendingAcquisition the maximum number of pending acquisition promises
* @param executor the executor on which listeners for acquisition/release promises will be called
* @param metricsListener an optional listener for metrics describing the performance and behavior of the pool
*/
ApnsChannelPool(final PooledObjectFactory<Channel> channelFactory, final int capacity, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) {
ApnsChannelPool(final PooledObjectFactory<Channel> channelFactory, final int capacity, final int maxPendingAcquisition, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) {
this.channelFactory = channelFactory;
this.capacity = capacity;
this.maxPendingAcquisition = maxPendingAcquisition;
this.executor = executor;

this.metricsListener = metricsListener != null ? metricsListener : new NoopChannelPoolMetricsListener();
Expand Down Expand Up @@ -180,7 +183,11 @@ private void acquireWithinEventExecutor(final Promise<Channel> acquirePromise) {
} else {
// We don't have any connections ready to go, and don't have any more capacity to create new
// channels. Add this acquisition to the queue waiting for channels to become available.
pendingAcquisitionPromises.add(acquirePromise);
if (pendingAcquisitionPromises.size() < maxPendingAcquisition) {
pendingAcquisitionPromises.add(acquirePromise);
} else {
acquirePromise.tryFailure(new RejectedAcquisitionException(maxPendingAcquisition));
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void handleConnectionCreationFailed() {

this.channelPool = new ApnsChannelPool(channelFactory,
clientConfiguration.getConcurrentConnections(),
clientConfiguration.getMaxPendingAcquisition(),
this.clientResources.getEventLoopGroup().next(),
channelPoolMetricsListener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class ApnsClientBuilder {
private ApnsClientResources apnsClientResources;

private int concurrentConnections = 1;
private int maxPendingAcquisition = Integer.MAX_VALUE;

private ApnsClientMetricsListener metricsListener;

Expand Down Expand Up @@ -450,6 +451,22 @@ public ApnsClientBuilder setConcurrentConnections(final int concurrentConnection
return this;
}

/**
* <p>Sets the maximum number of the pending acquisition promises.
* Under certain conditions, the pending acquisition queue may continue to expand and cause memory overflow.
*
* <p>By default, clients will not attempt to check the size of the pending acquisition queue.
* Provide an appropriate value to avoid memory overflow.
*
* @param maxPendingAcquisition the maximum number of pending acquisition promises
*
* @return a reference to this builder
*/
public ApnsClientBuilder setMaxPendingAcquisition(final int maxPendingAcquisition) {
this.maxPendingAcquisition = maxPendingAcquisition;
return this;
}

/**
* Sets the metrics listener for the client under construction. Metrics listeners gather information that describes
* the performance and behavior of a client, and are completely optional.
Expand Down Expand Up @@ -654,6 +671,7 @@ public ApnsClient build() throws SSLException {
this.closeAfterIdleDuration,
this.gracefulShutdownTimeout,
this.concurrentConnections,
this.maxPendingAcquisition,
this.metricsListener,
this.frameLogger);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class ApnsClientConfiguration {
private final Duration closeAfterIdleDuration;
private final Duration gracefulShutdownTimeout;
private final int concurrentConnections;
private final int maxPendingAcquisition;
private final ApnsClientMetricsListener metricsListener;
private final Http2FrameLogger frameLogger;

Expand All @@ -62,6 +63,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress,
final Duration closeAfterIdleDuration,
final Duration gracefulShutdownTimeout,
final int concurrentConnections,
final int maxPendingAcquisition,
final ApnsClientMetricsListener metricsListener,
final Http2FrameLogger frameLogger) {

Expand All @@ -75,6 +77,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress,
this.closeAfterIdleDuration = closeAfterIdleDuration;
this.gracefulShutdownTimeout = gracefulShutdownTimeout;
this.concurrentConnections = concurrentConnections;
this.maxPendingAcquisition = maxPendingAcquisition;
this.metricsListener = metricsListener;
this.frameLogger = frameLogger;
}
Expand Down Expand Up @@ -119,6 +122,10 @@ public int getConcurrentConnections() {
return concurrentConnections;
}

public int getMaxPendingAcquisition() {
return maxPendingAcquisition;
}

public Optional<ApnsClientMetricsListener> getMetricsListener() {
return Optional.ofNullable(metricsListener);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.eatthepath.pushy.apns;

/**
* An exception thrown to indicate that a push notification should be rejected by the client
* to avoid a large number of pending acquisitions.
*/
public class RejectedAcquisitionException extends Exception {

public RejectedAcquisitionException(int maxPendingAcquisition) {
super("The number of pending acquisitions has reached the upper limit [" + maxPendingAcquisition + "]");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public static void setUpBeforeClass() {
@BeforeEach
public void setUp() {
this.metricsListener = new TestChannelPoolMetricListener();
this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, EVENT_EXECUTOR, this.metricsListener);
this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);
}

@AfterAll
Expand Down Expand Up @@ -183,7 +183,7 @@ public Future<Void> destroy(final Channel channel, final Promise<Void> promise)
}
};

final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener);
final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);

assertFalse(pool.acquire().await().isSuccess());

Expand Down Expand Up @@ -259,7 +259,7 @@ public Future<Void> destroy(final Channel channel, final Promise<Void> promise)
}
};

final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener);
final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener);

final Future<Channel> acquireNewChannelFuture = pool.acquire();
final Future<Channel> acquireReturnedChannelFuture = pool.acquire();
Expand Down