Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Correct log level when refresh tableEntry cache expired #215

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
397 changes: 275 additions & 122 deletions src/main/java/com/alipay/oceanbase/rpc/ObTableClient.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
ObRpcResultCode resultCode = new ObRpcResultCode();
resultCode.decode(buf);
// If response indicates the request is routed to wrong server, we should refresh the routing meta.
if (!conn.getObTable().getReRouting() &&response.getHeader().isRoutingWrong()) {
if (!conn.getObTable().isEnableRerouting() && response.getHeader().isRoutingWrong()) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand All @@ -139,7 +139,8 @@ public ObPayload invokeSync(final ObTableConnection conn, final ObPayload reques
throw new ObTableNeedFetchAllException(errMessage, resultCode.getRcode());
}
}
if (resultCode.getRcode() != 0 && response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
if (resultCode.getRcode() != 0
&& response.getHeader().getPcode() != Pcodes.OB_TABLE_API_MOVE) {
String errMessage = TraceUtil.formatTraceMessage(conn, request,
"routed to the wrong server: " + response.getMessage());
logger.warn(errMessage);
Expand Down Expand Up @@ -193,6 +194,8 @@ private boolean needFetchAll(int errorCode, int pcode) {
|| errorCode == ResultCodes.OB_TABLE_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_TABLET_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST.errorCode
|| errorCode == ResultCodes.OB_SNAPSHOT_DISCARDED.errorCode
|| (pcode == Pcodes.OB_TABLE_API_LS_EXECUTE && errorCode == ResultCodes.OB_NOT_MASTER.errorCode);
}

Expand Down
402 changes: 289 additions & 113 deletions src/main/java/com/alipay/oceanbase/rpc/location/LocationUtil.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@
import com.alipay.oceanbase.rpc.location.model.partition.ObPartitionLevel;
import com.alipay.oceanbase.rpc.protocol.payload.Constants;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;

import static com.google.common.base.Preconditions.checkArgument;

Expand Down Expand Up @@ -53,7 +54,9 @@ public class TableEntry {
// partition location
private TableEntryKey tableEntryKey = null;
private volatile ObPartitionEntry partitionEntry = null;


public ConcurrentHashMap<Long, Lock> refreshLockMap = new ConcurrentHashMap<>();

/*
* Is valid.
*/
Expand Down Expand Up @@ -218,8 +221,6 @@ public void prepare() throws IllegalArgumentException {
checkArgument(partitionInfo != null, "partition table partition info is not ready. key"
+ tableEntryKey);
partitionInfo.prepare();
checkArgument(partitionEntry != null,
"partition table partition entry is not ready. key" + tableEntryKey);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,23 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


public class ObPartitionEntry {
private Map<Long, ObPartitionLocation> partitionLocation = new HashMap<Long, ObPartitionLocation>();

// mapping from tablet id to ls id, and the part id to tablet id mapping is in ObPartitionInfo
private Map<Long, Long> tabletLsIdMap = new HashMap<>();

// tabelt id -> (PartitionLocation, LsId)
private ConcurrentHashMap<Long, ObPartitionLocationInfo> partitionInfos = new ConcurrentHashMap<>();


public ObPartitionLocationInfo getPartitionInfo(long tabletId) {
return partitionInfos.computeIfAbsent(tabletId, id -> new ObPartitionLocationInfo());
}

public Map<Long, ObPartitionLocation> getPartitionLocation() {
return partitionLocation;
}
Expand All @@ -39,6 +49,16 @@ public void setPartitionLocation(Map<Long, ObPartitionLocation> partitionLocatio
this.partitionLocation = partitionLocation;
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }

/*
* Get partition location with part id.
*/
Expand Down Expand Up @@ -86,14 +106,4 @@ public void prepareForWeakRead(ObServerLdcLocation ldcLocation) {
public String toString() {
return "ObPartitionEntry{" + "partitionLocation=" + partitionLocation + '}';
}

public Map<Long, Long> getTabletLsIdMap() {
return tabletLsIdMap;
}

public void setTabletLsIdMap(Map<Long, Long> tabletLsIdMap) {
this.tabletLsIdMap = tabletLsIdMap;
}

public long getLsId(long tabletId) { return tabletLsIdMap.get(tabletId); }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*-
* #%L
* com.oceanbase:obkv-table-client
* %%
* Copyright (C) 2021 - 2024 OceanBase
* %%
* OBKV Table Client Framework is licensed under Mulan PSL v2.
* You can use this software according to the terms and conditions of the Mulan PSL v2.
* You may obtain a copy of Mulan PSL v2 at:
* http://license.coscl.org.cn/MulanPSL2
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PSL v2 for more details.
* #L%
*/

package com.alipay.oceanbase.rpc.location.model.partition;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import static com.alipay.oceanbase.rpc.protocol.payload.Constants.OB_INVALID_ID;

public class ObPartitionLocationInfo {
private ObPartitionLocation partitionLocation = null;
private Long tabletLsId = OB_INVALID_ID;
private Long lastUpdateTime = 0L;
public ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public AtomicBoolean initialized = new AtomicBoolean(false);
public final CountDownLatch initializationLatch = new CountDownLatch(1);

public ObPartitionLocation getPartitionLocation() {
rwLock.readLock().lock();
try {
return partitionLocation;
} finally {
rwLock.readLock().unlock();
}
}

public void updateLocation(ObPartitionLocation newLocation, Long tabletLsId) {
rwLock.writeLock().lock();
try {
this.partitionLocation = newLocation;
this.tabletLsId = tabletLsId;
this.lastUpdateTime = System.currentTimeMillis();
} finally {
rwLock.writeLock().unlock();
}
}

public Long getTabletLsId() {
rwLock.readLock().lock();
try {
return tabletLsId;
} finally {
rwLock.readLock().unlock();
}
}

public Long getLastUpdateTime() {
rwLock.readLock().lock();
try {
return lastUpdateTime;
} finally {
rwLock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public enum Property {
NETTY_BLOCKING_WAIT_INTERVAL("bolt.netty.blocking.wait.interval", 1, "netty写缓存满后等待时间"),

// [ObTable][OTHERS]
SERVER_ENABLE_REROUTING("server.enable.rerouting", true, "开启server端的重定向回复功能"),
SERVER_ENABLE_REROUTING("server.enable.rerouting", false, "开启server端的重定向回复功能"),

/*
* other config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,9 @@ public enum ResultCodes {
OB_CLUSTER_NO_MATCH(-4666), //
OB_CHECK_ZONE_MERGE_ORDER(-4667), //
OB_ERR_ZONE_NOT_EMPTY(-4668), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), OB_LS_NOT_EXIST(-4719), //
OB_USE_DUP_FOLLOW_AFTER_DML(-4686), //
OB_LS_NOT_EXIST(-4719), //
OB_MAPPING_BETWEEN_TABLET_AND_LS_NOT_EXIST(-4723), //
OB_TABLET_NOT_EXIST(-4725), //
OB_ERR_PARSER_INIT(-5000), //
OB_ERR_PARSE_SQL(-5001), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package com.alipay.oceanbase.rpc.protocol.payload.impl.execute.query;

import com.alipay.oceanbase.rpc.ObGlobal;
import com.alipay.oceanbase.rpc.ObTableClient;
import com.alipay.oceanbase.rpc.bolt.transport.ObTableConnection;
import com.alipay.oceanbase.rpc.exception.*;
import com.alipay.oceanbase.rpc.location.model.ObReadConsistency;
import com.alipay.oceanbase.rpc.location.model.ObServerRoute;
import com.alipay.oceanbase.rpc.location.model.TableEntry;
import com.alipay.oceanbase.rpc.location.model.partition.ObPair;
import com.alipay.oceanbase.rpc.protocol.payload.AbstractPayload;
import com.alipay.oceanbase.rpc.protocol.payload.ObPayload;
Expand All @@ -33,6 +35,7 @@
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableEntityType;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.ObTableStreamRequest;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.QueryStreamResult;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncRequest;
import com.alipay.oceanbase.rpc.protocol.payload.impl.execute.syncquery.ObTableQueryAsyncResult;
import com.alipay.oceanbase.rpc.table.ObTable;
import com.alipay.oceanbase.rpc.table.ObTableParam;
Expand All @@ -43,6 +46,8 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import static com.alipay.oceanbase.rpc.util.TableClientLoggerFactory.RUNTIME;

public abstract class AbstractQueryStreamResult extends AbstractPayload implements
QueryStreamResult {

Expand All @@ -51,7 +56,6 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
protected volatile boolean closed = false;
protected volatile List<ObObj> row = null;
protected volatile int rowIndex = -1;
// 调整它的startKey
protected ObTableQuery tableQuery;
protected long operationTimeout = -1;
protected String tableName;
Expand All @@ -60,14 +64,15 @@ public abstract class AbstractQueryStreamResult extends AbstractPayload implemen
// global index: key is index table name (be like: __idx_<data_table_id>_<index_name>)
protected String indexTableName;
protected ObTableEntityType entityType;
protected Map<Long, ObPair<Long, ObTableParam>> expectant; // Map<logicId, ObPair<logicId, param>>
protected Map<Long, ObPair<Long, ObTableParam>> expectant;
protected List<String> cacheProperties = new LinkedList<String>();
protected LinkedList<List<ObObj>> cacheRows = new LinkedList<List<ObObj>>();
private LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>> partitionLastResult = new LinkedList<ObPair<ObPair<Long, ObTableParam>, ObTableQueryResult>>();
private ObReadConsistency readConsistency = ObReadConsistency.STRONG;
// ObRowKey objs: [startKey, MIN_OBJECT, MIN_OBJECT]
public List<ObObj> currentStartKey;

protected ObTableClient client;

/*
* Get pcode.
*/
Expand Down Expand Up @@ -227,7 +232,7 @@ protected ObPayload commonExecute(ObTableClient client, Logger logger,
} else if (e instanceof ObTableException) {
if ((((ObTableException) e).getErrorCode() == ResultCodes.OB_TABLE_NOT_EXIST.errorCode || ((ObTableException) e)
.getErrorCode() == ResultCodes.OB_NOT_SUPPORTED.errorCode)
&& ((ObTableQueryRequest) request).getTableQuery().isHbaseQuery()
&& ((ObTableQueryAsyncRequest) request).getObTableQueryRequest().getTableQuery().isHbaseQuery()
&& client.getTableGroupInverted().get(indexTableName) != null) {
// table not exists && hbase mode && table group exists , three condition both
client.eraseTableGroupFromCache(tableName);
Expand Down Expand Up @@ -324,10 +329,7 @@ public boolean next() throws Exception {

} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
// Adjust the start key and refresh the expectant
this.tableQuery.adjustStartKey(currentStartKey);
setExpectant(refreshPartition(tableQuery, tableName));

// Reset the iterator to start over
it = expectant.entrySet().iterator();
referPartition.clear(); // Clear the referPartition if needed
Expand Down Expand Up @@ -362,7 +364,7 @@ public boolean next() throws Exception {
}

protected Map<Long, ObPair<Long, ObTableParam>> buildPartitions(ObTableClient client, ObTableQuery tableQuery, String tableName) throws Exception {
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new HashMap<>();
Map<Long, ObPair<Long, ObTableParam>> partitionObTables = new LinkedHashMap<>();
String indexName = tableQuery.getIndexName();
String indexTableName = null;

Expand Down Expand Up @@ -551,9 +553,32 @@ public void init() throws Exception {
return;
}
if (tableQuery.getBatchSize() == -1) {
for (Map.Entry<Long, ObPair<Long, ObTableParam>> entry : expectant.entrySet()) {
// mark the refer partition
referToNewPartition(entry.getValue());
if (!expectant.isEmpty()) {
Iterator<Map.Entry<Long, ObPair<Long, ObTableParam>>> it = expectant.entrySet()
.iterator();
int retryTimes = 0;
while (it.hasNext()) {
Map.Entry<Long, ObPair<Long, ObTableParam>> entry = it.next();
try {
// try access new partition, async will not remove useless expectant
referToNewPartition(entry.getValue());
} catch (Exception e) {
if (e instanceof ObTableNeedFetchAllException) {
setExpectant(refreshPartition(tableQuery, tableName));
it = expectant.entrySet().iterator();
retryTimes++;
if (retryTimes > client.getRuntimeRetryTimes()) {
RUNTIME.error("Fail to get refresh table entry response after {}",
retryTimes);
throw new ObTableRetryExhaustedException(
"Fail to get refresh table entry response after " + retryTimes);

}
} else {
throw e;
}
}
}
}
expectant.clear();
} else {
Expand Down Expand Up @@ -694,4 +719,19 @@ public ObReadConsistency getReadConsistency() {
public void setReadConsistency(ObReadConsistency readConsistency) {
this.readConsistency = readConsistency;
}

/**
* Get client.
* @return client
*/
public ObTableClient getClient() {
return client;
}

/*
* Set client.
*/
public void setClient(ObTableClient client) {
this.client = client;
}
}
Loading
Loading