Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix client couldn't cancel forward query #52185

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ConnectContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLContext;

// When one client connect in, we create a connection context for it.
Expand Down Expand Up @@ -236,6 +237,9 @@ public class ConnectContext {

private UUID sessionId;

private String proxyHostName;
private AtomicInteger pendingForwardRequests = new AtomicInteger(0);

// QueryMaterializationContext is different from MaterializationContext that it keeps the context during the query
// lifecycle instead of per materialized view.
private QueryMaterializationContext queryMVContext;
Expand Down Expand Up @@ -575,6 +579,24 @@ public void setConnectionId(int connectionId) {
this.connectionId = connectionId;
}

public String getProxyHostName() {
return proxyHostName;
}

public void setProxyHostName(String address) {
this.proxyHostName = address;
}

public boolean hasPendingForwardRequest() {
return pendingForwardRequests.intValue() > 0;
}
public void incPendingForwardRequest() {
pendingForwardRequests.incrementAndGet();
}
public void decPendingForwardRequest() {
pendingForwardRequests.decrementAndGet();
}

public void resetConnectionStartTime() {
this.connectionStartTime = System.currentTimeMillis();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ public Void visitRelation(Relation relation, Void context) {
statement.setOrigStmt(new OriginStatement(request.getSql(), idx));

executor = new StmtExecutor(ctx, statement);
ctx.setExecutor(executor);
executor.setProxy();
executor.execute();
} catch (IOException e) {
Expand All @@ -832,6 +833,8 @@ public Void visitRelation(Relation relation, Void context) {
// If reach here, maybe StarRocks bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(e.getMessage());
} finally {
ctx.setExecutor(null);
}

// If stmt is also forwarded during execution, just return the forward result.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public TMasterOpRequest createTMasterOpRequest(ConnectContext ctx, int forwardTi
params.setCurrent_user_ident(ctx.getCurrentUserIdentity().toThrift());
params.setForward_times(forwardTimes);
params.setSession_id(ctx.getSessionId().toString());
params.setConnectionId(ctx.getConnectionId());

TUserRoles currentRoles = new TUserRoles();
Preconditions.checkState(ctx.getCurrentRoleIds() != null);
Expand Down
91 changes: 91 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ProxyContextManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.google.common.collect.Maps;

import java.util.Map;

// When a query is forwarded from the follower FE to the leader FE, this ConnectContext on the leader is
// not managed by the ConnectScheduler. These ConnectContexts are managed by the ProxyContextManager.
// We can find these ConnectContexts on the leader by hostname and connection id, and perform operations such as kill.
public class ProxyContextManager {
stdpain marked this conversation as resolved.
Show resolved Hide resolved
private final Map<String, Map<Integer, ConnectContext>> connectionMaps = Maps.newConcurrentMap();

static ProxyContextManager instance = new ProxyContextManager();

public static ProxyContextManager getInstance() {
return instance;
}

public ScopeGuard guard(String hostName, int connectionId, ConnectContext context, boolean set) {
return new ScopeGuard(this, hostName, connectionId, context, set);
}

public synchronized void addContext(String hostname, Integer connectionId, ConnectContext context) {
final Map<Integer, ConnectContext> contextMap =
connectionMaps.computeIfAbsent(hostname, (String host) -> Maps.newConcurrentMap());
contextMap.put(connectionId, context);
contextMap.computeIfAbsent(connectionId, cid -> context);
}

public ConnectContext getContext(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap == null) {
return null;
}
return contextMap.get(connectionId);
}

public ConnectContext getContextByQueryId(String queryId) {
return connectionMaps.values().stream().flatMap(item -> item.values().stream()).filter(ctx ->
ctx.getQueryId() != null && queryId.equals(ctx.getQueryId().toString())).findFirst().orElse(null);
}

public synchronized void remove(String hostname, Integer connectionId) {
final Map<Integer, ConnectContext> contextMap = connectionMaps.get(hostname);
if (contextMap != null) {
contextMap.remove(connectionId);
if (contextMap.isEmpty()) {
connectionMaps.remove(hostname);
}
}
}

public static class ScopeGuard implements AutoCloseable {
private ProxyContextManager manager;
private boolean set = false;
private String hostName;
private int connectionId;

public ScopeGuard(ProxyContextManager manager, String hostName, int connectionId, ConnectContext context,
boolean set) {
if (set) {
this.manager = manager;
this.hostName = hostName;
this.connectionId = connectionId;
manager.addContext(hostName, connectionId, context);
}
this.set = set;
}

@Override
public void close() throws Exception {
if (set) {
manager.remove(hostName, connectionId);
}
}
}
}
stdpain marked this conversation as resolved.
Show resolved Hide resolved
33 changes: 28 additions & 5 deletions fe/fe-core/src/main/java/com/starrocks/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -913,9 +913,14 @@ private void forwardToLeader() throws Exception {
if (parsedStmt instanceof ExecuteStmt) {
throw new AnalysisException("ExecuteStmt Statement don't support statement need to be forward to leader");
}
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
try {
context.incPendingForwardRequest();
leaderOpExecutor = new LeaderOpExecutor(parsedStmt, originStmt, context, redirectStatus);
LOG.debug("need to transfer to Leader. stmt: {}", context.getStmtId());
leaderOpExecutor.execute();
} finally {
context.decPendingForwardRequest();
}
}

private boolean tryProcessProfileAsync(ExecPlan plan, int retryIndex) {
Expand Down Expand Up @@ -1031,16 +1036,31 @@ private void handleKill() throws DdlException {
handleKillQuery(killStmt.getQueryId());
} else {
long id = killStmt.getConnectionId();
ConnectContext killCtx = context.getConnectScheduler().getContext(id);
ConnectContext killCtx = null;
if (isProxy) {
final String hostName = context.getProxyHostName();
stdpain marked this conversation as resolved.
Show resolved Hide resolved
killCtx = ProxyContextManager.getInstance().getContext(hostName, (int) id);
} else {
killCtx = context.getConnectScheduler().getContext(id);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_THREAD, id);
}
handleKill(killCtx, killStmt.isConnectionKill());
handleKill(killCtx, killStmt.isConnectionKill() && !isProxy);
}
}

// Handle kill statement.
private void handleKill(ConnectContext killCtx, boolean killConnection) {
try {
if (killCtx.hasPendingForwardRequest()) {
forwardToLeader();
return;
}
} catch (Exception e) {
LOG.warn("failed to kill connection", e);
}

Preconditions.checkNotNull(killCtx);
if (context == killCtx) {
// Suicide
stdpain marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1101,6 +1121,9 @@ private void handleKillQuery(String queryId) throws DdlException {
if (killCtx == null) {
killCtx = ExecuteEnv.getInstance().getScheduler().findContextByQueryId(queryId);
}
if (killCtx == null) {
killCtx = ProxyContextManager.getInstance().getContextByQueryId(queryId);
}
if (killCtx == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_NO_SUCH_QUERY, queryId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import com.starrocks.qe.ConnectProcessor;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.GlobalVariable;
import com.starrocks.qe.ProxyContextManager;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.QueryStatisticsInfo;
import com.starrocks.qe.ShowExecutor;
Expand Down Expand Up @@ -351,10 +352,12 @@ public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private final LeaderImpl leaderImpl;
private final ExecuteEnv exeEnv;
private final ProxyContextManager proxyContextManager;
public AtomicLong partitionRequestNum = new AtomicLong(0);

public FrontendServiceImpl(ExecuteEnv exeEnv) {
leaderImpl = new LeaderImpl();
proxyContextManager = ProxyContextManager.getInstance();
this.exeEnv = exeEnv;
}

Expand Down Expand Up @@ -1120,12 +1123,26 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
LOG.info("receive forwarded stmt {} from FE: {}",
params.getStmt_id(), clientAddr != null ? clientAddr.getHostname() : "unknown");
ConnectContext context = new ConnectContext(null);
ConnectProcessor processor = new ConnectProcessor(context);
TMasterOpResult result = processor.proxyExecute(params);
ConnectContext.remove();
return result;
String hostname = "";
if (clientAddr != null) {
hostname = clientAddr.getHostname();
}
context.setProxyHostName(hostname);
boolean addToProxyManager = params.isSetConnectionId();
final int connectionId = params.getConnectionId();

try (var guard = proxyContextManager.guard(hostname, connectionId, context, addToProxyManager)) {
ConnectProcessor processor = new ConnectProcessor(context);
return processor.proxyExecute(params);
} catch (Exception e) {
LOG.warn("unreachable path:", e);
final TMasterOpResult result = new TMasterOpResult();
result.setErrorMsg(e.getMessage());
return result;
}
}


private void checkPasswordAndLoadPriv(String user, String passwd, String db, String tbl,
String clientIp) throws AuthenticationException {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
stdpain marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -1712,7 +1729,7 @@ public TRefreshTableResponse refreshTable(TRefreshTableRequest request) throws T
}
}

private TNetworkAddress getClientAddr() {
public TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
Expand Down
104 changes: 104 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/qe/ForwardTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.common.util.UUIDUtil;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.service.FrontendServiceImpl;
import com.starrocks.thrift.TMasterOpRequest;
import com.starrocks.thrift.TMasterOpResult;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TUserIdentity;
import mockit.Mock;
import mockit.MockUp;
import org.junit.Assert;
import org.junit.Test;

public class ForwardTest {

TMasterOpRequest makeRequest() {
TMasterOpRequest request = new TMasterOpRequest();
request.setCatalog("default");
request.setCluster("default");
request.setDb("information_schema");
request.setQueryId(UUIDUtil.genTUniqueId());
final TUserIdentity userIdentity = new TUserIdentity();
request.setCurrent_user_ident(userIdentity);
return request;
}

@Test
public void testKillConnection() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setConnectionId(1);
request.setSql("kill QUERY 1");
new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "");
}

@Test
public void testUpgradeKillConnection() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setSql("kill QUERY 1");
new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "Unknown thread id: 1");
}

@Test
public void testKillWithUnknownException() throws Exception {
final FrontendServiceImpl service = new FrontendServiceImpl(ExecuteEnv.getInstance());
final TMasterOpRequest request = makeRequest();
request.setSql("kill QUERY 1");
request.setConnectionId(1);

new MockUp<GlobalStateMgr>() {
@Mock
public Long getMaxJournalId() {
return 1L;
}
};
new MockUp<FrontendServiceImpl>() {
@Mock
public TNetworkAddress getClientAddr() {
return null;
}
};

new MockUp<ConnectProcessor>() {
@Mock
public TMasterOpResult proxyExecute(TMasterOpRequest request) {
throw new RuntimeException("unknown exception");
}
};

final TMasterOpResult result = service.forward(request);
Assert.assertEquals(result.errorMsg, "unknown exception");
}
}
1 change: 1 addition & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ struct TMasterOpRequest {
33: optional Types.TUserRoles user_roles
34: optional i32 forward_times
35: optional string session_id
36: optional i32 connectionId

101: optional i64 warehouse_id // begin from 101, in case of conflict with other's change
}
Expand Down
Loading