diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java index f1c3ebd09..db01fd27c 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsChannelPool.java @@ -62,6 +62,7 @@ class ApnsChannelPool { private final Set> pendingCreateChannelFutures = new HashSet<>(); private final Queue> pendingAcquisitionPromises = new ArrayDeque<>(); + private final int maxPendingAcquisition; private boolean isClosed = false; @@ -91,12 +92,14 @@ public void handleConnectionCreationFailed() { * * @param channelFactory the factory to be used to create new channels * @param capacity the maximum number of channels that may be held in this pool + * @param maxPendingAcquisition the maximum number of pending acquisition promises * @param executor the executor on which listeners for acquisition/release promises will be called * @param metricsListener an optional listener for metrics describing the performance and behavior of the pool */ - ApnsChannelPool(final PooledObjectFactory channelFactory, final int capacity, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) { + ApnsChannelPool(final PooledObjectFactory channelFactory, final int capacity, final int maxPendingAcquisition, final OrderedEventExecutor executor, final ApnsChannelPoolMetricsListener metricsListener) { this.channelFactory = channelFactory; this.capacity = capacity; + this.maxPendingAcquisition = maxPendingAcquisition; this.executor = executor; this.metricsListener = metricsListener != null ? metricsListener : new NoopChannelPoolMetricsListener(); @@ -180,7 +183,11 @@ private void acquireWithinEventExecutor(final Promise acquirePromise) { } else { // We don't have any connections ready to go, and don't have any more capacity to create new // channels. Add this acquisition to the queue waiting for channels to become available. - pendingAcquisitionPromises.add(acquirePromise); + if (pendingAcquisitionPromises.size() < maxPendingAcquisition) { + pendingAcquisitionPromises.add(acquirePromise); + } else { + acquirePromise.tryFailure(new RejectedAcquisitionException(maxPendingAcquisition)); + } } } } else { diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java index 6e119448c..0c1f1efab 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClient.java @@ -151,6 +151,7 @@ public void handleConnectionCreationFailed() { this.channelPool = new ApnsChannelPool(channelFactory, clientConfiguration.getConcurrentConnections(), + clientConfiguration.getMaxPendingAcquisition(), this.clientResources.getEventLoopGroup().next(), channelPoolMetricsListener); } diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java index 2b8377094..dd4c47e00 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientBuilder.java @@ -72,6 +72,7 @@ public class ApnsClientBuilder { private ApnsClientResources apnsClientResources; private int concurrentConnections = 1; + private int maxPendingAcquisition = Integer.MAX_VALUE; private ApnsClientMetricsListener metricsListener; @@ -450,6 +451,22 @@ public ApnsClientBuilder setConcurrentConnections(final int concurrentConnection return this; } + /** + *

Sets the maximum number of the pending acquisition promises. + * Under certain conditions, the pending acquisition queue may continue to expand and cause memory overflow. + * + *

By default, clients will not attempt to check the size of the pending acquisition queue. + * Provide an appropriate value to avoid memory overflow. + * + * @param maxPendingAcquisition the maximum number of pending acquisition promises + * + * @return a reference to this builder + */ + public ApnsClientBuilder setMaxPendingAcquisition(final int maxPendingAcquisition) { + this.maxPendingAcquisition = maxPendingAcquisition; + return this; + } + /** * Sets the metrics listener for the client under construction. Metrics listeners gather information that describes * the performance and behavior of a client, and are completely optional. @@ -654,6 +671,7 @@ public ApnsClient build() throws SSLException { this.closeAfterIdleDuration, this.gracefulShutdownTimeout, this.concurrentConnections, + this.maxPendingAcquisition, this.metricsListener, this.frameLogger); diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientConfiguration.java b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientConfiguration.java index afde7d2f4..0266d8d84 100644 --- a/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientConfiguration.java +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/ApnsClientConfiguration.java @@ -49,6 +49,7 @@ class ApnsClientConfiguration { private final Duration closeAfterIdleDuration; private final Duration gracefulShutdownTimeout; private final int concurrentConnections; + private final int maxPendingAcquisition; private final ApnsClientMetricsListener metricsListener; private final Http2FrameLogger frameLogger; @@ -62,6 +63,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress, final Duration closeAfterIdleDuration, final Duration gracefulShutdownTimeout, final int concurrentConnections, + final int maxPendingAcquisition, final ApnsClientMetricsListener metricsListener, final Http2FrameLogger frameLogger) { @@ -75,6 +77,7 @@ public ApnsClientConfiguration(final InetSocketAddress apnsServerAddress, this.closeAfterIdleDuration = closeAfterIdleDuration; this.gracefulShutdownTimeout = gracefulShutdownTimeout; this.concurrentConnections = concurrentConnections; + this.maxPendingAcquisition = maxPendingAcquisition; this.metricsListener = metricsListener; this.frameLogger = frameLogger; } @@ -119,6 +122,10 @@ public int getConcurrentConnections() { return concurrentConnections; } + public int getMaxPendingAcquisition() { + return maxPendingAcquisition; + } + public Optional getMetricsListener() { return Optional.ofNullable(metricsListener); } diff --git a/pushy/src/main/java/com/eatthepath/pushy/apns/RejectedAcquisitionException.java b/pushy/src/main/java/com/eatthepath/pushy/apns/RejectedAcquisitionException.java new file mode 100644 index 000000000..862196b6e --- /dev/null +++ b/pushy/src/main/java/com/eatthepath/pushy/apns/RejectedAcquisitionException.java @@ -0,0 +1,12 @@ +package com.eatthepath.pushy.apns; + +/** + * An exception thrown to indicate that a push notification should be rejected by the client + * to avoid a large number of pending acquisitions. + */ +public class RejectedAcquisitionException extends Exception { + + public RejectedAcquisitionException(int maxPendingAcquisition) { + super("The number of pending acquisitions has reached the upper limit [" + maxPendingAcquisition + "]"); + } +} diff --git a/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelPoolTest.java b/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelPoolTest.java index e6dbd29be..1217c4833 100644 --- a/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelPoolTest.java +++ b/pushy/src/test/java/com/eatthepath/pushy/apns/ApnsChannelPoolTest.java @@ -134,7 +134,7 @@ public static void setUpBeforeClass() { @BeforeEach public void setUp() { this.metricsListener = new TestChannelPoolMetricListener(); - this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, EVENT_EXECUTOR, this.metricsListener); + this.pool = new ApnsChannelPool(new TestChannelFactory(), 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener); } @AfterAll @@ -183,7 +183,7 @@ public Future destroy(final Channel channel, final Promise promise) } }; - final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener); + final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener); assertFalse(pool.acquire().await().isSuccess()); @@ -259,7 +259,7 @@ public Future destroy(final Channel channel, final Promise promise) } }; - final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, EVENT_EXECUTOR, this.metricsListener); + final ApnsChannelPool pool = new ApnsChannelPool(factory, 1, Integer.MAX_VALUE, EVENT_EXECUTOR, this.metricsListener); final Future acquireNewChannelFuture = pool.acquire(); final Future acquireReturnedChannelFuture = pool.acquire();