Skip to content

Commit

Permalink
Close connections after receiving an InternalServerError.
Browse files Browse the repository at this point in the history
  • Loading branch information
jchambers committed Oct 4, 2017
1 parent 49a11a6 commit a13efe3
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 33 deletions.
72 changes: 39 additions & 33 deletions pushy/src/main/java/com/turo/pushy/apns/ApnsClientHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ class ApnsClientHandler extends Http2ConnectionHandler implements Http2FrameList
private static final IOException STREAM_CLOSED_BEFORE_REPLY_EXCEPTION =
new IOException("Stream closed before a reply was received");

private static final ApnsServerException APNS_SERVER_EXCEPTION = new ApnsServerException();

private static final Gson GSON = new GsonBuilder()
.registerTypeAdapter(Date.class, new DateAsTimeSinceEpochTypeAdapter(TimeUnit.MILLISECONDS))
.create();
Expand Down Expand Up @@ -287,35 +289,14 @@ public int onDataRead(final ChannelHandlerContext context, final int streamId, f

if (endOfStream) {
final Http2Stream stream = this.connection().stream(streamId);

final Http2Headers headers = stream.getProperty(this.responseHeadersPropertyKey);
final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey);

final ErrorResponse errorResponse = GSON.fromJson(data.toString(StandardCharsets.UTF_8), ErrorResponse.class);

this.handleErrorResponse(context, streamId, headers, pushNotification, errorResponse);
this.handleEndOfStream(context, this.connection().stream(streamId), (Http2Headers) stream.getProperty(this.responseHeadersPropertyKey), data);
} else {
log.error("Gateway sent a DATA frame that was not the end of a stream.");
}

return bytesProcessed;
}

protected void handleErrorResponse(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final ApnsPushNotification pushNotification, final ErrorResponse errorResponse) {
final Promise<PushNotificationResponse<ApnsPushNotification>> responsePromise =
this.connection().stream(streamId).getProperty(this.responsePromisePropertyKey);

final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status());

if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) {
log.warn("APNs server reported an internal error when sending {}.", pushNotification);
responsePromise.tryFailure(new ApnsServerException(GSON.toJson(errorResponse)));
} else {
responsePromise.trySuccess(new SimplePushNotificationResponse<>(pushNotification,
HttpResponseStatus.OK.equals(status), errorResponse.getReason(), errorResponse.getTimestamp()));
}
}

@Override
public void onHeadersRead(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final int streamDependency, final short weight, final boolean exclusive, final int padding, final boolean endOfStream) throws Http2Exception {
this.onHeadersRead(context, streamId, headers, padding, endOfStream);
Expand All @@ -327,25 +308,50 @@ public void onHeadersRead(final ChannelHandlerContext context, final int streamI
final Http2Stream stream = this.connection().stream(streamId);

if (endOfStream) {
final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status());
final boolean success = HttpResponseStatus.OK.equals(status);
this.handleEndOfStream(context, stream, headers, null);
} else {
stream.setProperty(this.responseHeadersPropertyKey, headers);
}
}

if (!success) {
log.warn("Gateway sent an end-of-stream HEADERS frame for an unsuccessful notification.");
}
private void handleEndOfStream(final ChannelHandlerContext context, final Http2Stream stream, final Http2Headers headers, final ByteBuf data) {

final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey);
final Promise<PushNotificationResponse<ApnsPushNotification>> responsePromise = stream.getProperty(this.responsePromisePropertyKey);

final ApnsPushNotification pushNotification = stream.getProperty(this.pushNotificationPropertyKey);
final Promise<PushNotificationResponse<ApnsPushNotification>> responsePromise = stream.getProperty(this.responsePromisePropertyKey);
final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status());

if (HttpResponseStatus.OK.equals(status)) {
responsePromise.trySuccess(
new SimplePushNotificationResponse<>(pushNotification, true, null, null));
} else {
if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) {
log.warn("APNs server reported an internal error when sending {}.", pushNotification);
responsePromise.tryFailure(new ApnsServerException());
responsePromise.tryFailure(APNS_SERVER_EXCEPTION);
context.channel().close();
} else {
responsePromise.trySuccess(
new SimplePushNotificationResponse<>(pushNotification, success, null, null));
if (data != null) {
final ErrorResponse errorResponse = GSON.fromJson(data.toString(StandardCharsets.UTF_8), ErrorResponse.class);
this.handleErrorResponse(context, stream.id(), headers, pushNotification, errorResponse);
} else {
log.warn("Gateway sent an end-of-stream HEADERS frame for an unsuccessful notification.");
}
}
}
}

protected void handleErrorResponse(final ChannelHandlerContext context, final int streamId, final Http2Headers headers, final ApnsPushNotification pushNotification, final ErrorResponse errorResponse) {
final Promise<PushNotificationResponse<ApnsPushNotification>> responsePromise =
this.connection().stream(streamId).getProperty(this.responsePromisePropertyKey);

final HttpResponseStatus status = HttpResponseStatus.parseLine(headers.status());

if (HttpResponseStatus.INTERNAL_SERVER_ERROR.equals(status)) {
log.warn("APNs server reported an internal error when sending {}.", pushNotification);
responsePromise.tryFailure(new ApnsServerException(GSON.toJson(errorResponse)));
} else {
stream.setProperty(this.responseHeadersPropertyKey, headers);
responsePromise.trySuccess(new SimplePushNotificationResponse<>(pushNotification,
HttpResponseStatus.OK.equals(status), errorResponse.getReason(), errorResponse.getTimestamp()));
}
}

Expand Down
8 changes: 8 additions & 0 deletions pushy/src/test/java/com/turo/pushy/apns/ApnsClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,14 @@ public void testSendNotificationWithInternalServerError() throws Exception {
try {
terribleTerribleServer.registerVerificationKey(this.verificationKey, DEFAULT_TOPIC);

final TestMetricsListener metricsListener = new TestMetricsListener();

final ApnsClient unfortunateClient = new ApnsClientBuilder()
.setApnsServer(HOST, PORT)
.setSigningKey(this.signingKey)
.setTrustedServerCertificateChain(CA_CERTIFICATE)
.setEventLoopGroup(EVENT_LOOP_GROUP)
.setMetricsListener(metricsListener)
.build();

try {
Expand All @@ -530,6 +533,11 @@ public void testSendNotificationWithInternalServerError() throws Exception {
assertTrue(future.isDone());
assertFalse(future.isSuccess());
assertTrue(future.cause() instanceof ApnsServerException);

unfortunateClient.sendNotification(pushNotification).await();

assertTrue("Connections should be replaced after receiving an InternalServerError.",
metricsListener.getConnectionsRemoved().get() == 1);
} finally {
unfortunateClient.close().await();
Thread.sleep(10);
Expand Down

0 comments on commit a13efe3

Please sign in to comment.