Skip to content

Commit

Permalink
spark support yarn cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
ChengJie1053 committed Aug 11, 2023
1 parent ec67464 commit 160ab55
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ public EngineNode getEngineNodeInfoByDB(EngineNode engineNode) {
return dbEngineNode;
}

@Override
public EngineNode getEngineNodeInfoByTicketId(String ticketId) {
EngineNode dbEngineNode = nodeManagerPersistence.getEngineNode(ticketId);
if (null == dbEngineNode) {
throw new LinkisRetryException(AMConstant.ENGINE_ERROR_CODE, ticketId + " not exists in db");
}
metricsConverter.fillMetricsToNode(
dbEngineNode, nodeMetricManagerPersistence.getNodeMetrics(dbEngineNode));
return dbEngineNode;
}

@Override
public void updateEngineStatus(
ServiceInstance serviceInstance, NodeStatus fromState, NodeStatus toState) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public interface EngineNodeManager {

EngineNode getEngineNodeInfoByDB(EngineNode engineNode);

EngineNode getEngineNodeInfoByTicketId(String ticketId);

/**
* Get detailed engine information from the persistence
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,15 +337,16 @@ private List<Label<?>> fromEMGetEngineLabels(List<Label<?>> emLabels) {
}

private boolean ensuresIdle(EngineNode engineNode, String resourceTicketId) {
EngineNode engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
EngineNode engineNodeInfo;
if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByTicketId(resourceTicketId);
} else {
engineNodeInfo = getEngineNodeManager().getEngineNodeInfoByDB(engineNode);
}
if (null == engineNodeInfo) {
return false;
}

// if (engineNode.getMark().equals(AMConstant.CLUSTER_PROCESS_MARK)) {
// return resourceTicketId.equals( engineNodeInfo.getServiceInstance());
// }

if (NodeStatus.isCompleted(engineNodeInfo.getNodeStatus())) {
NodeMetrics metrics = nodeMetricManagerPersistence.getNodeMetrics(engineNodeInfo);
Pair<String, Optional<Boolean>> errorInfo = getStartErrorInfo(metrics.getHeartBeatMsg());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ void updateNodeInstance(

PersistenceNode getNodeInstance(@Param("instance") String instance);

PersistenceNode getNodeInstanceByTicketId(@Param("ticketId") String ticketId);

PersistenceNode getNodeInstanceById(@Param("id") int id);

PersistenceNode getEMNodeInstanceByEngineNode(@Param("instance") String instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ void updateEngineNode(ServiceInstance serviceInstance, Node node)
*/
EngineNode getEngineNode(ServiceInstance serviceInstance);

EngineNode getEngineNode(String ticketId);

/**
* 通过Em的ServiceInstance 获取EM下面Engine的列表
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,38 @@ public EngineNode getEngineNode(ServiceInstance serviceInstance) {
return amEngineNode;
}

@Override
public EngineNode getEngineNode(String ticketId) {
AMEngineNode amEngineNode = new AMEngineNode();
// amEngineNode.setServiceInstance(serviceInstance);
PersistenceNode engineNode = nodeManagerMapper.getNodeInstanceByTicketId(ticketId);
if (null == engineNode) {
return null;
}
amEngineNode.setOwner(engineNode.getOwner());
amEngineNode.setMark(engineNode.getMark());
amEngineNode.setIdentifier(engineNode.getIdentifier());
amEngineNode.setTicketId(engineNode.getTicketId());
amEngineNode.setStartTime(engineNode.getCreateTime());
// PersistenceNode emNode =
//
// nodeManagerMapper.getEMNodeInstanceByEngineNode(serviceInstance.getInstance());
// if (emNode != null) {
// String emInstance = emNode.getInstance();
// String emName = emNode.getName();
// ServiceInstance emServiceInstance = new ServiceInstance();
// emServiceInstance.setApplicationName(emName);
// emServiceInstance.setInstance(emInstance);
// AMEMNode amemNode = new AMEMNode();
// amemNode.setMark(emNode.getMark());
// amemNode.setOwner(emNode.getOwner());
// amemNode.setServiceInstance(emServiceInstance);
// amemNode.setStartTime(emNode.getCreateTime());
// amEngineNode.setEMNode(amemNode);
// }
return amEngineNode;
}

@Override
public List<EngineNode> getEngineNodeByEM(ServiceInstance serviceInstance) {
// serviceinstance for a given EM(给定EM的 serviceinstance)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -34,9 +34,9 @@
<insert id="addNodeInstance" useGeneratedKeys="true" keyColumn="id" keyProperty="id"
parameterType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
INSERT INTO linkis_cg_manager_service_instance (instance, name, owner, mark, ticketId, update_time
, create_time, updator, creator)
, create_time, updator, creator)
VALUES (#{instance}, #{name}, #{owner}, #{mark}, #{ticketId}, #{updateTime}
, #{createTime}, #{updator}, #{creator})
, #{createTime}, #{updator}, #{creator})
</insert>

<update id="updateNodeInstance">
Expand Down Expand Up @@ -119,9 +119,9 @@

<select id="getNodeInstanceIds" resultType="java.lang.Integer">
SELECT id FROM linkis_cg_manager_service_instance WHERE instance IN (
<foreach collection='instances' separator=',' item='instance'>
#{instance}
</foreach> )
<foreach collection='instances' separator=',' item='instance'>
#{instance}
</foreach> )
</select>

<select id="getNodeInstance" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
Expand All @@ -130,6 +130,12 @@
WHERE instance = #{instance}
</select>

<select id="getNodeInstanceByTicketId" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT *
FROM linkis_cg_manager_service_instance
WHERE ticketId = #{ticketId}
</select>

<select id="getNodeInstanceById" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT *
FROM linkis_cg_manager_service_instance
Expand All @@ -140,27 +146,27 @@
SELECT *
FROM linkis_cg_manager_service_instance
WHERE instance IN (
SELECT em_instance
FROM linkis_cg_manager_engine_em
WHERE engine_instance = #{instance}
SELECT em_instance
FROM linkis_cg_manager_engine_em
WHERE engine_instance = #{instance}
)
</select>

<select id="getNodeInstances" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT *
FROM linkis_cg_manager_service_instance
WHERE instance IN (
SELECT engine_instance
FROM linkis_cg_manager_engine_em
WHERE em_instance = #{instance}
SELECT engine_instance
FROM linkis_cg_manager_engine_em
WHERE em_instance = #{instance}
)
</select>

<select id="getNodesByInstances" resultType="org.apache.linkis.manager.common.entity.persistence.PersistenceNode">
SELECT * FROM linkis_cg_manager_service_instance WHERE instance IN(
<foreach collection='instances' separator=',' item='instance'>
#{instance}
</foreach>)
<foreach collection='instances' separator=',' item='instance'>
#{instance}
</foreach>)
</select>

<insert id="addEngineNode">
Expand Down

0 comments on commit 160ab55

Please sign in to comment.