Skip to content

Commit

Permalink
fixbug/#432: The insert exception also sets the state to idle (#435)
Browse files Browse the repository at this point in the history
  • Loading branch information
li-keguo authored Feb 17, 2023
1 parent ffd0a65 commit 21f2906
Showing 1 changed file with 10 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -287,16 +287,18 @@ public QueryResult sendQueryRequest(final String query, ClickHouseConfig cfg) th

public int sendInsertRequest(Block block) throws SQLException {
Validate.isTrue(this.state.get() == SessionState.WAITING_INSERT, "Call getSampleBlock before insert.");

NativeClient nativeClient = getNativeClient();
nativeClient.sendData(block);
nativeClient.sendData(new Block());
nativeClient.receiveEndOfStream(cfg.get().queryTimeout(), nativeCtx.serverCtx());
Validate.isTrue(this.state.compareAndSet(SessionState.WAITING_INSERT, SessionState.IDLE));
try {
NativeClient nativeClient = getNativeClient();
nativeClient.sendData(block);
nativeClient.sendData(new Block());
nativeClient.receiveEndOfStream(cfg.get().queryTimeout(), nativeCtx.serverCtx());
} finally {
Validate.isTrue(this.state.compareAndSet(SessionState.WAITING_INSERT, SessionState.IDLE));
}
return block.rowCnt();
}

synchronized private NativeClient getHealthyNativeClient() throws SQLException {
private synchronized NativeClient getHealthyNativeClient() throws SQLException {
NativeContext oldCtx = nativeCtx;
if (!oldCtx.nativeClient().ping(cfg.get().queryTimeout(), nativeCtx.serverCtx())) {
LOG.warn("connection loss with state[{}], create new connection and reset state", state);
Expand Down

0 comments on commit 21f2906

Please sign in to comment.