diff --git a/pushy/src/main/java/com/turo/pushy/apns/ApnsClientHandler.java b/pushy/src/main/java/com/turo/pushy/apns/ApnsClientHandler.java index 5ab60249e..003b48137 100644 --- a/pushy/src/main/java/com/turo/pushy/apns/ApnsClientHandler.java +++ b/pushy/src/main/java/com/turo/pushy/apns/ApnsClientHandler.java @@ -75,6 +75,8 @@ class ApnsClientHandler extends Http2ConnectionHandler implements Http2FrameList private static final IOException STREAM_CLOSED_BEFORE_REPLY_EXCEPTION = new IOException("Stream closed before a reply was received"); + private static final ApnsServerException APNS_SERVER_EXCEPTION = new ApnsServerException(); + private static final Gson GSON = new GsonBuilder() .registerTypeAdapter(Date.class, new DateAsTimeSinceEpochTypeAdapter(TimeUnit.MILLISECONDS)) .create(); @@ -287,13 +289,7 @@ public int onDataRead(final ChannelHandlerContext context, final int streamId, f if (endOfStream) { final Http2Stream stream = this.connection().stream(streamId); - - final Http2Headers headers = stream.getProperty(this.responseHeadersPropertyKey); - final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey); - - final ErrorResponse errorResponse = GSON.fromJson(data.toString(StandardCharsets.UTF_8), ErrorResponse.class); - - this.handleErrorResponse(context, streamId, headers, pushNotification, errorResponse); + this.handleEndOfStream(context, this.connection().stream(streamId), (Http2Headers) stream.getProperty(this.responseHeadersPropertyKey), data); } else { log.error("Gateway sent a DATA frame that was not the end of a stream."); } @@ -301,21 +297,6 @@ public int onDataRead(final ChannelHandlerContext context, final int streamId, f return bytesProcessed; } - protected void handleErrorResponse(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final ApnsPushNotification pushNotification, final ErrorResponse errorResponse) { - final Promise> responsePromise = - this.connection().stream(streamId).getProperty(this.responsePromisePropertyKey); - - final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status()); - - if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) { - log.warn("APNs server reported an internal error when sending {}.", pushNotification); - responsePromise.tryFailure(new ApnsServerException(GSON.toJson(errorResponse))); - } else { - responsePromise.trySuccess(new SimplePushNotificationResponse<>(pushNotification, - HttpResponseStatus.OK.equals(status), errorResponse.getReason(), errorResponse.getTimestamp())); - } - } - @Override public void onHeadersRead(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final int streamDependency, final short weight, final boolean exclusive, final int padding, final boolean endOfStream) throws Http2Exception { this.onHeadersRead(context, streamId, headers, padding, endOfStream); @@ -327,25 +308,50 @@ public void onHeadersRead(final ChannelHandlerContext context, final int streamI final Http2Stream stream = this.connection().stream(streamId); if (endOfStream) { - final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status()); - final boolean success = HttpResponseStatus.OK.equals(status); + this.handleEndOfStream(context, stream, headers, null); + } else { + stream.setProperty(this.responseHeadersPropertyKey, headers); + } + } - if (!success) { - log.warn("Gateway sent an end-of-stream HEADERS frame for an unsuccessful notification."); - } + private void handleEndOfStream(final ChannelHandlerContext context, final Http2Stream stream, final Http2Headers headers, final ByteBuf data) { + + final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey); + final Promise> responsePromise = stream.getProperty(this.responsePromisePropertyKey); - final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey); - final Promise> responsePromise = stream.getProperty(this.responsePromisePropertyKey); + final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status()); + if (HttpResponseStatus.OK.equals(status)) { + responsePromise.trySuccess( + new SimplePushNotificationResponse<>(pushNotification, true, null, null)); + } else { if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) { log.warn("APNs server reported an internal error when sending {}.", pushNotification); - responsePromise.tryFailure(new ApnsServerException()); + responsePromise.tryFailure(APNS_SERVER_EXCEPTION); + context.channel().close(); } else { - responsePromise.trySuccess( - new SimplePushNotificationResponse<>(pushNotification, success, null, null)); + if (data != null) { + final ErrorResponse errorResponse = GSON.fromJson(data.toString(StandardCharsets.UTF_8), ErrorResponse.class); + this.handleErrorResponse(context, stream.id(), headers, pushNotification, errorResponse); + } else { + log.warn("Gateway sent an end-of-stream HEADERS frame for an unsuccessful notification."); + } } + } + } + + protected void handleErrorResponse(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final ApnsPushNotification pushNotification, final ErrorResponse errorResponse) { + final Promise> responsePromise = + this.connection().stream(streamId).getProperty(this.responsePromisePropertyKey); + + final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status()); + + if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) { + log.warn("APNs server reported an internal error when sending {}.", pushNotification); + responsePromise.tryFailure(new ApnsServerException(GSON.toJson(errorResponse))); } else { - stream.setProperty(this.responseHeadersPropertyKey, headers); + responsePromise.trySuccess(new SimplePushNotificationResponse<>(pushNotification, + HttpResponseStatus.OK.equals(status), errorResponse.getReason(), errorResponse.getTimestamp())); } } diff --git a/pushy/src/test/java/com/turo/pushy/apns/ApnsClientTest.java b/pushy/src/test/java/com/turo/pushy/apns/ApnsClientTest.java index 3605ca01d..7ed241dcc 100644 --- a/pushy/src/test/java/com/turo/pushy/apns/ApnsClientTest.java +++ b/pushy/src/test/java/com/turo/pushy/apns/ApnsClientTest.java @@ -511,11 +511,14 @@ public void testSendNotificationWithInternalServerError() throws Exception { try { terribleTerribleServer.registerVerificationKey(this.verificationKey, DEFAULT_TOPIC); + final TestMetricsListener metricsListener = new TestMetricsListener(); + final ApnsClient unfortunateClient = new ApnsClientBuilder() .setApnsServer(HOST, PORT) .setSigningKey(this.signingKey) .setTrustedServerCertificateChain(CA_CERTIFICATE) .setEventLoopGroup(EVENT_LOOP_GROUP) + .setMetricsListener(metricsListener) .build(); try { @@ -530,6 +533,11 @@ public void testSendNotificationWithInternalServerError() throws Exception { assertTrue(future.isDone()); assertFalse(future.isSuccess()); assertTrue(future.cause() instanceof ApnsServerException); + + unfortunateClient.sendNotification(pushNotification).await(); + + assertTrue("Connections should be replaced after receiving an InternalServerError.", + metricsListener.getConnectionsRemoved().get() == 1); } finally { unfortunateClient.close().await(); Thread.sleep(10);