From 6756d30db683aca84837a97a06ebaec16028c3e4 Mon Sep 17 00:00:00 2001 From: liubao Date: Wed, 1 Nov 2023 15:55:36 +0800 Subject: [PATCH 1/2] [#3727]Fix metrics not correct when timeout/connection timeout --- .../rest/client/RestClientCodecFilter.java | 28 +++++++++++++++++-- .../rest/client/RestClientSender.java | 25 ++++++++++------- .../client/http/RestClientInvocation.java | 8 +----- 3 files changed, 42 insertions(+), 19 deletions(-) diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientCodecFilter.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientCodecFilter.java index 6a530f256a..2cf13cbad8 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientCodecFilter.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientCodecFilter.java @@ -16,10 +16,13 @@ */ package org.apache.servicecomb.transport.rest.client; +import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import javax.annotation.Nonnull; +import org.apache.servicecomb.common.rest.RestConst; import org.apache.servicecomb.core.Invocation; import org.apache.servicecomb.core.filter.ConsumerFilter; import org.apache.servicecomb.core.filter.FilterNode; @@ -67,9 +70,13 @@ public RestClientCodecFilter setDecoder(RestClientDecoder decoder) { public CompletableFuture onFilter(Invocation invocation, FilterNode nextNode) { invocation.getInvocationStageTrace().startGetConnection(); startClientFiltersRequest(invocation); + CompletionStage createRequest = + transportContextFactory.createHttpClientRequest(invocation).toCompletionStage() + .whenComplete((c, e) -> invocation.getInvocationStageTrace().finishGetConnection()); return CompletableFuture.completedFuture(null) - .thenCompose(v -> transportContextFactory.createHttpClientRequest(invocation).toCompletionStage()) + .thenCompose(v -> createRequest) .thenAccept(httpClientRequest -> prepareTransportContext(invocation, httpClientRequest)) + .thenAccept(v -> invocation.onStartSendRequest()) .thenAccept(v -> encoder.encode(invocation)) .thenCompose(v -> nextNode.onFilter(invocation)) .thenApply(response -> decoder.decode(invocation, response)) @@ -81,12 +88,29 @@ protected void startClientFiltersRequest(Invocation invocation) { } protected void prepareTransportContext(Invocation invocation, HttpClientRequest httpClientRequest) { - invocation.getInvocationStageTrace().finishGetConnection(); + copyExtraHttpHeaders(invocation, httpClientRequest); RestClientTransportContext transportContext = transportContextFactory.create(invocation, httpClientRequest); invocation.setTransportContext(transportContext); } + @SuppressWarnings("unchecked") + protected void copyExtraHttpHeaders(Invocation invocation, HttpClientRequest httpClientRequest) { + Map httpHeaders = (Map) invocation.getHandlerContext() + .get(RestConst.CONSUMER_HEADER); + if (httpHeaders == null) { + return; + } + httpHeaders.forEach((key, value) -> { + if ("Content-Length".equalsIgnoreCase(key)) { + return; + } + if (null != value) { + httpClientRequest.putHeader(key, value); + } + }); + } + protected void finishClientFiltersResponse(Invocation invocation) { invocation.getInvocationStageTrace().finishClientFiltersResponse(); } diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientSender.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientSender.java index 3bd08dbc13..6816d24ec1 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientSender.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/RestClientSender.java @@ -75,9 +75,11 @@ public CompletableFuture send() { protected void runInVertxContext() { sendInVertxContext() - .exceptionally(throwable -> { - future.completeExceptionally(throwable); - return null; + .whenComplete((v, e) -> { + if (e != null) { + future.completeExceptionally(e); + } + writeFinished(); }); } @@ -87,11 +89,12 @@ protected CompletableFuture sendInVertxContext() { Multimap uploads = requestParameters.getUploads(); if (uploads == null) { if (requestParameters.getBodyBuffer() != null) { - httpClientRequest.end(requestParameters.getBodyBuffer()); + return CompletableFuture.completedFuture(null).thenCompose( + v -> httpClientRequest.end(requestParameters.getBodyBuffer()).toCompletionStage()); } else { - httpClientRequest.end(); + return CompletableFuture.completedFuture(null).thenCompose( + v -> httpClientRequest.end().toCompletionStage()); } - return CompletableFuture.completedFuture(null); } if (requestParameters.getBodyBuffer() != null) { @@ -100,6 +103,10 @@ protected CompletableFuture sendInVertxContext() { return sendFiles(); } + private void writeFinished() { + invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime()); + } + protected CompletableFuture sendFiles() { CompletableFuture sendFileFuture = CompletableFuture.completedFuture(null); @@ -111,8 +118,7 @@ protected CompletableFuture sendFiles() { sendFileFuture = sendFileFuture.thenCompose(v -> sendFile(entry.getValue(), name, boundary)); } - return sendFileFuture - .thenAccept(v -> httpClientRequest.end(genBoundaryEndBuffer(boundary))); + return sendFileFuture.thenCompose(v -> httpClientRequest.end(genBoundaryEndBuffer(boundary)).toCompletionStage()); } private CompletableFuture sendFile(Part part, String name, String boundary) { @@ -167,8 +173,7 @@ protected void afterSend(Response response, Throwable throwable) { protected void processMetrics() { InvocationStageTrace stageTrace = invocation.getInvocationStageTrace(); - stageTrace.finishWriteToBuffer(System.nanoTime()); - // even failed and did not received response, still set time for it + // even failed and did not receive response, still set time for it // that will help to know the real timeout time stageTrace.finishReceiveResponse(); stageTrace.startClientFiltersResponse(); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index 6ddf1e23b4..ded7b6e2cc 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -103,11 +103,9 @@ public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Except IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress(); Future requestFuture = createRequest(ipPort, path); - invocation.getInvocationStageTrace().startGetConnection(); + requestFuture.onComplete(r -> invocation.getInvocationStageTrace().finishGetConnection()); requestFuture.compose(clientRequest -> { - invocation.getInvocationStageTrace().finishGetConnection(); - this.clientRequest = clientRequest; clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName()); @@ -270,10 +268,6 @@ protected void fail(Throwable e) { InvocationStageTrace stageTrace = invocation.getInvocationStageTrace(); - if (stageTrace.getFinishWriteToBuffer() == 0) { - stageTrace.finishWriteToBuffer(System.nanoTime()); - } - // even failed and did not received response, still set time for it // that will help to know the real timeout time if (stageTrace.getFinishReceiveResponse() == 0) { From 693768a64db0b1b90389ee2b8f5a379f49de3ccb Mon Sep 17 00:00:00 2001 From: liubao Date: Wed, 1 Nov 2023 16:19:25 +0800 Subject: [PATCH 2/2] [#3727]Fix test fail --- .../client/http/TestRestClientInvocation.java | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java index ea5dd4800e..9e806176fe 100644 --- a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java @@ -179,7 +179,8 @@ public void invoke(@Mocked Response resp) throws Exception { MatcherAssert.assertThat(headers.names(), Matchers.containsInAnyOrder(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, org.apache.servicecomb.core.Const.CSE_CONTEXT)); - Assertions.assertEquals(TARGET_MICROSERVICE_NAME, headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE)); + Assertions.assertEquals(TARGET_MICROSERVICE_NAME, + headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE)); Assertions.assertEquals("{}", headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT)); Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersRequest()); } @@ -216,7 +217,8 @@ public void invoke_3rdPartyServiceExposeServiceCombHeaders(@Mocked Response resp MatcherAssert.assertThat(headers.names(), Matchers.containsInAnyOrder(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, org.apache.servicecomb.core.Const.CSE_CONTEXT)); - Assertions.assertEquals(TARGET_MICROSERVICE_NAME, headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE)); + Assertions.assertEquals(TARGET_MICROSERVICE_NAME, + headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE)); Assertions.assertEquals("{}", headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT)); Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersRequest()); operationConfig.setClientRequestHeaderFilterEnabled(true); @@ -286,11 +288,11 @@ public void testSetCseContext_enable_unicode() throws Exception { when(invocation.getContext()).thenReturn(contextMap); restClientInvocation.setCseContext(); - String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT); - HttpServletRequestEx requestEx = new MockUp(){ + String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT); + HttpServletRequestEx requestEx = new MockUp() { @Mock - public String getHeader(String name){ - if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)){ + public String getHeader(String name) { + if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)) { return context; } else { return null; @@ -324,11 +326,11 @@ public String writeUnicodeValueAsString(Object value) throws JsonProcessingExcep }; restClientInvocation.setCseContext(); - String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT); - HttpServletRequestEx requestEx = new MockUp(){ + String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT); + HttpServletRequestEx requestEx = new MockUp() { @Mock - public String getHeader(String name){ - if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)){ + public String getHeader(String name) { + if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)) { return context; } else { return null; @@ -419,7 +421,6 @@ public void processResponseBody_throw() { Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersResponse()); Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishClientFiltersResponse()); Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishReceiveResponse()); - Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishWriteToBuffer()); } @Test