Skip to content

Commit

Permalink
Merge pull request #4982 from WeDataSphere/master-1.5.0
Browse files Browse the repository at this point in the history
Feature][EC] EC index rich, add last unlock time and add ec exit log push
  • Loading branch information
casionone authored Nov 27, 2023
2 parents a8fb845 + 9ed39ef commit b704a2a
Show file tree
Hide file tree
Showing 49 changed files with 1,595 additions and 607 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,8 @@ public static byte[] mergeByteArrays(byte[] arr1, byte[] arr2) {
System.arraycopy(arr2, 0, mergedArray, arr1.length, arr2.length);
return mergedArray;
}

public static boolean isHDFSPath(FsPath fsPath) {
return HDFS.equalsIgnoreCase(fsPath.getFsType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
* limitations under the License.
*/

package org.apache.linkis.entrance.scheduler.cache
package org.apache.linkis.governance.common.constant;

import org.apache.linkis.scheduler.executer.OutputExecuteResponse

case class CacheOutputExecuteResponse(alias: String, output: String) extends OutputExecuteResponse {
override def getOutput: String = output
public class CodeConstants {
// will auto append at end of scala code; make sure the last line is not a comment
public static String SCALA_CODE_AUTO_APPEND_CODE = "val linkisVar=123";
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.governance.common.paser

import org.apache.linkis.common.utils.{CodeAndRunTypeUtils, Logging, Utils}
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.constant.CodeConstants
import org.apache.linkis.governance.common.paser.CodeType.CodeType

import org.apache.commons.lang3.StringUtils
Expand Down Expand Up @@ -116,7 +117,7 @@ class ScalaCodeParser extends SingleCodeParser with Logging {
if (statementBuffer.nonEmpty) codeBuffer.append(statementBuffer.mkString("\n"))
// Make sure the last line is not a comment
codeBuffer.append("\n")
codeBuffer.append("val linkisVar=123")
codeBuffer.append(CodeConstants.SCALA_CODE_AUTO_APPEND_CODE)
codeBuffer.toArray
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ class BmlEnginePreExecuteHook extends ComputationExecutorHook with Logging {
): String = {
val props = engineExecutionContext.getProperties
if (null != props && props.containsKey(GovernanceConstant.TASK_RESOURCES_STR)) {
val workDir = ComputationEngineUtils.getCurrentWorkDir
val jobId = engineExecutionContext.getJobId
props.get(GovernanceConstant.TASK_RESOURCES_STR) match {
case resources: util.List[Object] =>
Expand All @@ -71,9 +70,7 @@ class BmlEnginePreExecuteHook extends ComputationExecutorHook with Logging {
val fileName = resource.get(GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR).toString
val resourceId = resource.get(GovernanceConstant.TASK_RESOURCE_ID_STR).toString
val version = resource.get(GovernanceConstant.TASK_RESOURCE_VERSION_STR).toString
val fullPath =
if (workDir.endsWith(seperator)) pathType + workDir + fileName
else pathType + workDir + seperator + fileName
val fullPath = fileName
val response = Utils.tryCatch {
bmlClient.downloadShareResource(processUser, resourceId, version, fullPath, true)
} {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ object ComputationExecutorConf {
"Maximum number of tasks executed by the synchronization EC"
)

val PRINT_TASK_PARAMS_SKIP_KEYS = CommonVars(
"linkis.engineconn.print.task.params.skip.keys",
"jobId",
"skip to print params key at job logs"
)

val ENGINE_PROGRESS_FETCH_INTERVAL =
CommonVars(
"wds.linkis.engineconn.progresss.fetch.interval-in-seconds",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

package org.apache.linkis.engineconn.computation.executor.cs

import org.apache.linkis.common.utils.Logging
import org.apache.linkis.cs.client.service.CSResourceService
import org.apache.linkis.engineconn.common.conf.EngineConnConf
import org.apache.linkis.governance.common.utils.GovernanceConstant

import org.apache.commons.lang3.StringUtils

Expand All @@ -27,7 +30,7 @@ import java.util.regex.Pattern
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class CSResourceParser {
class CSResourceParser extends Logging {

private val pb = Pattern.compile("cs://[^\\s\"]+[$\\s]{0,1}", Pattern.CASE_INSENSITIVE)

Expand All @@ -47,7 +50,6 @@ class CSResourceParser {
nodeNameStr: String
): String = {

// TODO getBMLResource
val bmlResourceList =
CSResourceService.getInstance().getUpstreamBMLResource(contextIDValueStr, nodeNameStr)

Expand All @@ -56,23 +58,25 @@ class CSResourceParser {

val preFixNames = new ArrayBuffer[String]()
val parsedNames = new ArrayBuffer[String]()
val prefixName = System.currentTimeMillis().toString + "_"
preFixResourceNames.foreach { preFixResourceName =>
val resourceName = preFixResourceName.replace(PREFIX, "").trim
val bmlResourceOption =
bmlResourceList.asScala.find(_.getDownloadedFileName.equals(resourceName))
if (bmlResourceOption.isDefined) {
val replacementName = EngineConnConf.getEngineTmpDir + prefixName + resourceName
val bmlResource = bmlResourceOption.get
val map = new util.HashMap[String, Object]()
map.put("resourceId", bmlResource.getResourceId)
map.put("version", bmlResource.getVersion)
map.put("fileName", resourceName)
map.put(GovernanceConstant.TASK_RESOURCE_ID_STR, bmlResource.getResourceId)
map.put(GovernanceConstant.TASK_RESOURCE_VERSION_STR, bmlResource.getVersion)
map.put(GovernanceConstant.TASK_RESOURCE_FILE_NAME_STR, replacementName)
parsedResources.add(map)
preFixNames.append(preFixResourceName)
parsedNames.append(resourceName)
parsedNames.append(replacementName)
logger.warn(s"Replace cs file from {$preFixResourceName} to {$replacementName}")
}

}
props.put("resources", parsedResources)
props.put(GovernanceConstant.TASK_RESOURCES_STR, parsedResources)
StringUtils.replaceEach(code, preFixNames.toArray, parsedNames.toArray)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.linkis.common.log.LogUtils
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.engineconn.acessible.executor.entity.AccessibleExecutor
import org.apache.linkis.engineconn.acessible.executor.listener.event.{
TaskLogUpdateEvent,
TaskResponseErrorEvent,
TaskStatusChangedEvent
}
Expand All @@ -40,7 +41,14 @@ import org.apache.linkis.governance.common.paser.CodeParser
import org.apache.linkis.governance.common.protocol.task.{EngineConcurrentInfo, RequestTask}
import org.apache.linkis.governance.common.utils.{JobUtils, LoggerUtils}
import org.apache.linkis.manager.common.entity.enumeration.NodeStatus
import org.apache.linkis.manager.label.entity.engine.UserCreatorLabel
import org.apache.linkis.manager.label.entity.engine.{
CodeLanguageLabel,
EngineType,
EngineTypeLabel,
RunType,
UserCreatorLabel
}
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.engine.JobProgressInfo
import org.apache.linkis.scheduler.executer._

Expand All @@ -50,6 +58,8 @@ import org.apache.commons.lang3.exception.ExceptionUtils
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.cache.{Cache, CacheBuilder}

abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Expand Down Expand Up @@ -132,6 +142,12 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)

override def close(): Unit = {
if (null != lastTask) CLOSE_LOCKER.synchronized {
listenerBusContext.getEngineConnSyncListenerBus.postToAll(
TaskLogUpdateEvent(
lastTask.getTaskId,
LogUtils.generateERROR("EC exits unexpectedly and actively kills the task")
)
)
killTask(lastTask.getTaskId)
}
else {
Expand Down Expand Up @@ -169,9 +185,11 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
Utils.tryFinally {
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
val engineExecutionContext = createEngineExecutionContext(engineConnTask)

val engineCreationContext = EngineConnObject.getEngineCreationContext

var hookedCode = engineConnTask.getCode
Utils.tryCatch {
val engineCreationContext = EngineConnObject.getEngineCreationContext
ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
hookedCode =
hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
Expand All @@ -182,12 +200,24 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
} else {
logger.info(s"hooked after code: $hookedCode ")
}

// task params log
// spark engine: at org.apache.linkis.engineplugin.spark.executor.SparkEngineConnExecutor.executeLine log special conf
Utils.tryAndWarn {
val engineType = LabelUtil.getEngineType(engineCreationContext.getLabels())
EngineType.mapStringToEngineType(engineType) match {
case EngineType.HIVE | EngineType.TRINO => printTaskParamsLog(engineExecutionContext)
case _ =>
}
}

val localPath = EngineConnConf.getLogDir
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"
)
)

var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes =
Expand Down Expand Up @@ -244,6 +274,11 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
case s: SuccessExecuteResponse =>
succeedTasks.increase()
s
case incompleteExecuteResponse: IncompleteExecuteResponse =>
ErrorExecuteResponse(
s"The task cannot be an incomplete response ${incompleteExecuteResponse.message}",
null
)
case _ => response
}
response
Expand Down Expand Up @@ -336,6 +371,30 @@ abstract class ComputationExecutor(val outputPrintLimit: Int = 1000)
}
}

/**
* job task log print task params info
*
* @param engineExecutorContext
* @return
* Unit
*/

def printTaskParamsLog(engineExecutorContext: EngineExecutionContext): Unit = {
val sb = new StringBuilder

EngineConnObject.getEngineCreationContext.getOptions.asScala.foreach({ case (key, value) =>
// skip log jobId because it corresponding jobid when the ec created
if (!ComputationExecutorConf.PRINT_TASK_PARAMS_SKIP_KEYS.getValue.contains(key)) {
sb.append(s"${key}=${value.toString}\n")
}
})

sb.append("\n")
engineExecutorContext.appendStdout(
LogUtils.generateInfo(s"Your job exec with configs:\n${sb.toString()}\n")
)
}

def transformTaskStatus(task: EngineConnTask, newStatus: ExecutionNodeStatus): Unit = {
val oriStatus = task.getStatus
logger.info(s"task ${task.getTaskId} from status $oriStatus to new status $newStatus")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,12 @@ object ComputationEngineConnMetrics {
getTotalBusyTimeMills(nodeStatus) + getTotalIdleTimeMills(nodeStatus)

def getUnlockToShutdownDurationMills(): Long = unlockToShutdownDurationMills.get()

def getLastUnlockTimestamp(nodeStatus: NodeStatus): Long = {
nodeStatus match {
case NodeStatus.Unlock => lastUnlockTimeMills
case _ => 0
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ class DefaultNodeHeartbeatMsgManager extends NodeHeartbeatMsgManager with Loggin
ECConstants.EC_TOTAL_LOCK_TIME_MILLS_KEY,
ComputationEngineConnMetrics.getTotalLockTimeMills(status).asInstanceOf[Object]
)
msgMap.put(
ECConstants.EC_LAST_UNLOCK_TIMESTAMP,
ComputationEngineConnMetrics.getLastUnlockTimestamp(status).asInstanceOf[Object]
)
case _ =>
}
val engineParams = EngineConnObject.getEngineCreationContext.getOptions
Expand Down
Loading

0 comments on commit b704a2a

Please sign in to comment.