Skip to content

Commit

Permalink
[Optimize]统一DB元信息更新格式-Part1
Browse files Browse the repository at this point in the history
1、引入KafkaMetaService;
2、将Connector的更新按照KafkaMetaService进行更新;
3、简化Connect-MirrorMaker的关联逻辑;
4、GroupService创建的AdminClient中的ClientID增加时间戳,减少Mbean冲突;
  • Loading branch information
ZQKC committed Aug 15, 2023
1 parent ca696dd commit de9fb3e
Show file tree
Hide file tree
Showing 12 changed files with 652 additions and 538 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.xiaojukeji.know.streaming.km.common.bean.vo.connect.connector.ConnectorStateVO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import org.apache.kafka.connect.runtime.AbstractStatus;
Expand All @@ -30,6 +31,9 @@ public class ConnectorManagerImpl implements ConnectorManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand All @@ -44,37 +48,37 @@ public Result<Void> updateConnectorConfig(Long connectClusterId, String connecto
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "Connector参数错误");
}

return connectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
return opConnectorService.updateConnectorConfig(connectClusterId, connectorName, configs, operator);
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}

connectorService.addNewToDB(ksConnectorResult.getData());
opConnectorService.addNewToDB(ksConnectorResult.getData());
return Result.buildSuc();
}

@Override
public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName, String checkpointName, String operator) {
dto.getSuitableConfig().put(KafkaConnectConstant.MIRROR_MAKER_NAME_FIELD_NAME, dto.getConnectorName());

Result<KSConnectorInfo> createResult = connectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
Result<KSConnectorInfo> createResult = opConnectorService.createConnector(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
if (createResult.failed()) {
return Result.buildFromIgnoreData(createResult);
}

Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(dto.getConnectClusterId(), dto.getConnectorName());
Result<KSConnector> ksConnectorResult = connectorService.getConnectorFromKafka(dto.getConnectClusterId(), dto.getConnectorName());
if (ksConnectorResult.failed()) {
return Result.buildFromRSAndMsg(ResultStatus.SUCCESS, "创建成功,但是获取元信息失败,页面元信息会存在1分钟延迟");
}
Expand All @@ -83,7 +87,7 @@ public Result<Void> createConnector(ConnectorCreateDTO dto, String heartbeatName
connector.setCheckpointConnectorName(checkpointName);
connector.setHeartbeatConnectorName(heartbeatName);

connectorService.addNewToDB(connector);
opConnectorService.addNewToDB(connector);
return Result.buildSuc();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.ConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.connector.OpConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.mm2.MirrorMakerMetricService;
import com.xiaojukeji.know.streaming.km.core.service.connect.plugin.PluginService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
Expand Down Expand Up @@ -67,6 +68,9 @@ public class MirrorMakerManagerImpl implements MirrorMakerManager {
@Autowired
private ConnectorService connectorService;

@Autowired
private OpConnectorService opConnectorService;

@Autowired
private WorkerConnectorService workerConnectorService;

Expand Down Expand Up @@ -156,20 +160,20 @@ public Result<Void> deleteMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.deleteConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.deleteConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -181,20 +185,20 @@ public Result<Void> modifyMirrorMakerConfig(MirrorMakerCreateDTO dto, String ope

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName()) && dto.getCheckpointConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getCheckpointConnectorName(), dto.getCheckpointConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName()) && dto.getHeartbeatConnectorConfigs() != null) {
rv = connectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
rv = opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), connectorPO.getHeartbeatConnectorName(), dto.getHeartbeatConnectorConfigs(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
return opConnectorService.updateConnectorConfig(dto.getConnectClusterId(), dto.getConnectorName(), dto.getSuitableConfig(), operator);
}

@Override
Expand All @@ -206,20 +210,20 @@ public Result<Void> restartMirrorMaker(Long connectClusterId, String sourceConne

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.restartConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.restartConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -231,20 +235,20 @@ public Result<Void> stopMirrorMaker(Long connectClusterId, String sourceConnecto

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.stopConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.stopConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand All @@ -256,20 +260,20 @@ public Result<Void> resumeMirrorMaker(Long connectClusterId, String sourceConnec

Result<Void> rv = Result.buildSuc();
if (!ValidateUtils.isBlank(connectorPO.getCheckpointConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getCheckpointConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

if (!ValidateUtils.isBlank(connectorPO.getHeartbeatConnectorName())) {
rv = connectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
rv = opConnectorService.resumeConnector(connectClusterId, connectorPO.getHeartbeatConnectorName(), operator);
}
if (rv.failed()) {
return rv;
}

return connectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
return opConnectorService.resumeConnector(connectClusterId, sourceConnectorName, operator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public List<ConnectorMetrics> collectConnectMetrics(ConnectCluster connectCluste
Long connectClusterId = connectCluster.getId();

List<VersionControlItem> items = versionControlService.listVersionControlItem(this.getClusterVersion(connectCluster), collectorType().getCode());
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectClusterId);
Result<List<String>> connectorList = connectorService.listConnectorsFromCluster(connectCluster);

FutureWaitUtil<Void> future = this.getFutureUtilByClusterPhyId(connectClusterId);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.meta;

import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Kafka元信息服务接口
*/
public interface KafkaMetaService<T> {
/**
* 从Kafka中获取数据
* @param connectCluster connect集群
* @return 全部资源列表, 成功的资源列表
*/
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ConnectCluster connectCluster) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }

/**
* 从Kafka中获取数据
* @param clusterPhy kafka集群
* @return 全部资源集合, 成功的资源列表
*/
default Result<Tuple<Set<String>, List<T>>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); }

/**
* 元信息同步至DB中
* @param clusterId 集群ID
* @param fullNameSet 全部资源列表
* @param dataList 成功的资源列表
*/
default void writeToDB(Long clusterId, Set<String> fullNameSet, List<T> dataList) {}

/**
* 依据kafka集群ID删除数据
* @param clusterPhyId kafka集群ID
*/
default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.Triple;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;

import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -24,6 +26,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;
import static com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME;

public class ConnectConverter {
public static ConnectorBasicCombineExistVO convert2BasicVO(ConnectCluster connectCluster, ConnectorPO connectorPO) {
ConnectorBasicCombineExistVO vo = new ConnectorBasicCombineExistVO();
Expand Down Expand Up @@ -153,6 +158,66 @@ public static KSConnector convert2KSConnector(Long kafkaClusterPhyId, Long conne
return ksConnector;
}

public static List<KSConnector> convertAndSupplyMirrorMakerInfo(ConnectCluster connectCluster, List<Triple<KSConnectorInfo, List<String>, KSConnectorStateInfo>> connectorFullInfoList) {
// <connectorName, targetBootstrapServers + "@" + sourceBootstrapServers>
Map<String, String> sourceMap = new HashMap<>();

// <targetBootstrapServers + "@" + sourceBootstrapServers, connectorName>
Map<String, String> heartbeatMap = new HashMap<>();
Map<String, String> checkpointMap = new HashMap<>();

// 获取每个类型的connector的map信息
connectorFullInfoList.forEach(connector -> {
Map<String, String> mm2Map = null;
if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = sourceMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = heartbeatMap;
} else if (KafkaConnectConstant.MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
mm2Map = checkpointMap;
}

String targetBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);
String sourceBootstrapServers = connector.v1().getConfig().get(MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME);

if (ValidateUtils.anyBlank(targetBootstrapServers, sourceBootstrapServers) || mm2Map == null) {
return;
}

if (KafkaConnectConstant.MIRROR_MAKER_SOURCE_CONNECTOR_TYPE.equals(connector.v1().getConfig().get(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME))) {
// source 类型的格式和 heartbeat & checkpoint 的不一样
mm2Map.put(connector.v1().getName(), targetBootstrapServers + "@" + sourceBootstrapServers);
} else {
mm2Map.put(targetBootstrapServers + "@" + sourceBootstrapServers, connector.v1().getName());
}
});


List<KSConnector> connectorList = new ArrayList<>();
connectorFullInfoList.forEach(connector -> {
// 转化并添加到list中
KSConnector ksConnector = ConnectConverter.convert2KSConnector(
connectCluster.getKafkaClusterPhyId(),
connectCluster.getId(),
connector.v1(),
connector.v3(),
connector.v2()
);
connectorList.add(ksConnector);

// 补充mm2信息
String targetAndSource = sourceMap.get(ksConnector.getConnectorName());
if (ValidateUtils.isBlank(targetAndSource)) {
return;
}

ksConnector.setHeartbeatConnectorName(heartbeatMap.getOrDefault(targetAndSource, ""));
ksConnector.setCheckpointConnectorName(checkpointMap.getOrDefault(targetAndSource, ""));
});

return connectorList;
}

private static String genConnectorKey(Long connectorId, String connectorName){
return connectorId + "#" + connectorName;
}
Expand Down
Loading

0 comments on commit de9fb3e

Please sign in to comment.