Skip to content

Commit

Permalink
[#3727]Fix metrics not correct when timeout/connection timeout (#4006)
Browse files Browse the repository at this point in the history
  • Loading branch information
liubao68 authored Nov 1, 2023
1 parent 4ed65bf commit a2a1c2d
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.servicecomb.transport.rest.client;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nonnull;

import org.apache.servicecomb.common.rest.RestConst;
import org.apache.servicecomb.core.Invocation;
import org.apache.servicecomb.core.filter.ConsumerFilter;
import org.apache.servicecomb.core.filter.FilterNode;
Expand Down Expand Up @@ -67,9 +70,13 @@ public RestClientCodecFilter setDecoder(RestClientDecoder decoder) {
public CompletableFuture<Response> onFilter(Invocation invocation, FilterNode nextNode) {
invocation.getInvocationStageTrace().startGetConnection();
startClientFiltersRequest(invocation);
CompletionStage<HttpClientRequest> createRequest =
transportContextFactory.createHttpClientRequest(invocation).toCompletionStage()
.whenComplete((c, e) -> invocation.getInvocationStageTrace().finishGetConnection());
return CompletableFuture.completedFuture(null)
.thenCompose(v -> transportContextFactory.createHttpClientRequest(invocation).toCompletionStage())
.thenCompose(v -> createRequest)
.thenAccept(httpClientRequest -> prepareTransportContext(invocation, httpClientRequest))
.thenAccept(v -> invocation.onStartSendRequest())
.thenAccept(v -> encoder.encode(invocation))
.thenCompose(v -> nextNode.onFilter(invocation))
.thenApply(response -> decoder.decode(invocation, response))
Expand All @@ -81,12 +88,29 @@ protected void startClientFiltersRequest(Invocation invocation) {
}

protected void prepareTransportContext(Invocation invocation, HttpClientRequest httpClientRequest) {
invocation.getInvocationStageTrace().finishGetConnection();
copyExtraHttpHeaders(invocation, httpClientRequest);

RestClientTransportContext transportContext = transportContextFactory.create(invocation, httpClientRequest);
invocation.setTransportContext(transportContext);
}

@SuppressWarnings("unchecked")
protected void copyExtraHttpHeaders(Invocation invocation, HttpClientRequest httpClientRequest) {
Map<String, String> httpHeaders = (Map<String, String>) invocation.getHandlerContext()
.get(RestConst.CONSUMER_HEADER);
if (httpHeaders == null) {
return;
}
httpHeaders.forEach((key, value) -> {
if ("Content-Length".equalsIgnoreCase(key)) {
return;
}
if (null != value) {
httpClientRequest.putHeader(key, value);
}
});
}

protected void finishClientFiltersResponse(Invocation invocation) {
invocation.getInvocationStageTrace().finishClientFiltersResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ public CompletableFuture<Response> send() {

protected void runInVertxContext() {
sendInVertxContext()
.exceptionally(throwable -> {
future.completeExceptionally(throwable);
return null;
.whenComplete((v, e) -> {
if (e != null) {
future.completeExceptionally(e);
}
writeFinished();
});
}

Expand All @@ -87,11 +89,12 @@ protected CompletableFuture<Void> sendInVertxContext() {
Multimap<String, Part> uploads = requestParameters.getUploads();
if (uploads == null) {
if (requestParameters.getBodyBuffer() != null) {
httpClientRequest.end(requestParameters.getBodyBuffer());
return CompletableFuture.completedFuture(null).thenCompose(
v -> httpClientRequest.end(requestParameters.getBodyBuffer()).toCompletionStage());
} else {
httpClientRequest.end();
return CompletableFuture.completedFuture(null).thenCompose(
v -> httpClientRequest.end().toCompletionStage());
}
return CompletableFuture.completedFuture(null);
}

if (requestParameters.getBodyBuffer() != null) {
Expand All @@ -100,6 +103,10 @@ protected CompletableFuture<Void> sendInVertxContext() {
return sendFiles();
}

private void writeFinished() {
invocation.getInvocationStageTrace().finishWriteToBuffer(System.nanoTime());
}

protected CompletableFuture<Void> sendFiles() {
CompletableFuture<Void> sendFileFuture = CompletableFuture.completedFuture(null);

Expand All @@ -111,8 +118,7 @@ protected CompletableFuture<Void> sendFiles() {
sendFileFuture = sendFileFuture.thenCompose(v -> sendFile(entry.getValue(), name, boundary));
}

return sendFileFuture
.thenAccept(v -> httpClientRequest.end(genBoundaryEndBuffer(boundary)));
return sendFileFuture.thenCompose(v -> httpClientRequest.end(genBoundaryEndBuffer(boundary)).toCompletionStage());
}

private CompletableFuture<Void> sendFile(Part part, String name, String boundary) {
Expand Down Expand Up @@ -167,8 +173,7 @@ protected void afterSend(Response response, Throwable throwable) {

protected void processMetrics() {
InvocationStageTrace stageTrace = invocation.getInvocationStageTrace();
stageTrace.finishWriteToBuffer(System.nanoTime());
// even failed and did not received response, still set time for it
// even failed and did not receive response, still set time for it
// that will help to know the real timeout time
stageTrace.finishReceiveResponse();
stageTrace.startClientFiltersResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,9 @@ public void invoke(Invocation invocation, AsyncResponse asyncResp) throws Except
IpPort ipPort = (IpPort) invocation.getEndpoint().getAddress();

Future<HttpClientRequest> requestFuture = createRequest(ipPort, path);

invocation.getInvocationStageTrace().startGetConnection();
requestFuture.onComplete(r -> invocation.getInvocationStageTrace().finishGetConnection());
requestFuture.compose(clientRequest -> {
invocation.getInvocationStageTrace().finishGetConnection();

this.clientRequest = clientRequest;

clientRequest.putHeader(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE, invocation.getMicroserviceName());
Expand Down Expand Up @@ -270,10 +268,6 @@ protected void fail(Throwable e) {

InvocationStageTrace stageTrace = invocation.getInvocationStageTrace();

if (stageTrace.getFinishWriteToBuffer() == 0) {
stageTrace.finishWriteToBuffer(System.nanoTime());
}

// even failed and did not received response, still set time for it
// that will help to know the real timeout time
if (stageTrace.getFinishReceiveResponse() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ public void invoke(@Mocked Response resp) throws Exception {
MatcherAssert.assertThat(headers.names(),
Matchers.containsInAnyOrder(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE,
org.apache.servicecomb.core.Const.CSE_CONTEXT));
Assertions.assertEquals(TARGET_MICROSERVICE_NAME, headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE));
Assertions.assertEquals(TARGET_MICROSERVICE_NAME,
headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE));
Assertions.assertEquals("{}", headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT));
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersRequest());
}
Expand Down Expand Up @@ -216,7 +217,8 @@ public void invoke_3rdPartyServiceExposeServiceCombHeaders(@Mocked Response resp
MatcherAssert.assertThat(headers.names(),
Matchers.containsInAnyOrder(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE,
org.apache.servicecomb.core.Const.CSE_CONTEXT));
Assertions.assertEquals(TARGET_MICROSERVICE_NAME, headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE));
Assertions.assertEquals(TARGET_MICROSERVICE_NAME,
headers.get(org.apache.servicecomb.core.Const.TARGET_MICROSERVICE));
Assertions.assertEquals("{}", headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT));
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersRequest());
operationConfig.setClientRequestHeaderFilterEnabled(true);
Expand Down Expand Up @@ -286,11 +288,11 @@ public void testSetCseContext_enable_unicode() throws Exception {
when(invocation.getContext()).thenReturn(contextMap);
restClientInvocation.setCseContext();

String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT);
HttpServletRequestEx requestEx = new MockUp<HttpServletRequestEx>(){
String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT);
HttpServletRequestEx requestEx = new MockUp<HttpServletRequestEx>() {
@Mock
public String getHeader(String name){
if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)){
public String getHeader(String name) {
if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)) {
return context;
} else {
return null;
Expand Down Expand Up @@ -324,11 +326,11 @@ public String writeUnicodeValueAsString(Object value) throws JsonProcessingExcep
};

restClientInvocation.setCseContext();
String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT);
HttpServletRequestEx requestEx = new MockUp<HttpServletRequestEx>(){
String context = headers.get(org.apache.servicecomb.core.Const.CSE_CONTEXT);
HttpServletRequestEx requestEx = new MockUp<HttpServletRequestEx>() {
@Mock
public String getHeader(String name){
if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)){
public String getHeader(String name) {
if (StringUtils.equals(name, org.apache.servicecomb.core.Const.CSE_CONTEXT)) {
return context;
} else {
return null;
Expand Down Expand Up @@ -419,7 +421,6 @@ public void processResponseBody_throw() {
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getStartClientFiltersResponse());
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishClientFiltersResponse());
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishReceiveResponse());
Assertions.assertEquals(nanoTime, invocation.getInvocationStageTrace().getFinishWriteToBuffer());
}

@Test
Expand Down

0 comments on commit a2a1c2d

Please sign in to comment.