From 93d1de744269fe099fb1d10f3bba1bd5bba361d0 Mon Sep 17 00:00:00 2001 From: guoshupei <15764973965@163.com> Date: Thu, 27 Jul 2023 16:33:27 +0800 Subject: [PATCH] [optimize] Limits the log length that the ec sends to the entrance this close #4830 --- .../executor/conf/ComputationExecutorConf.scala | 6 ++++++ .../executor/execute/EngineExecutionContext.scala | 10 +++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala index fec2fe5e7b..c072c32794 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/conf/ComputationExecutorConf.scala @@ -118,4 +118,10 @@ object ComputationExecutorConf { val TASK_SUBMIT_WAIT_TIME_MS = CommonVars("linkis.ec.task.submit.wait.time.ms", 2L, "Task submit wait time(ms)").getValue + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED = + CommonVars("linkis.ec.send.log.entrance.limit.enabled", true) + + val ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH = + CommonVars("linkis.ec.send.log.entrance.limit.length", 2000) + } diff --git a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala index 7367dd5330..377c32c193 100644 --- a/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala +++ b/linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/EngineExecutionContext.scala @@ -193,8 +193,16 @@ class EngineExecutionContext(executor: ComputationExecutor, executorUser: String def appendStdout(log: String): Unit = if (executor.isInternalExecute) { logger.info(log) } else { + var taskLog = log + if ( + ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_ENABLED.getValue && + log.length > ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue + ) { + taskLog = + s"${log.substring(0, ComputationExecutorConf.ENGINE_SEND_LOG_TO_ENTRANCE_LIMIT_LENGTH.getValue)}..." + } val listenerBus = getEngineSyncListenerBus - getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, log))) + getJobId.foreach(jId => listenerBus.postToAll(TaskLogUpdateEvent(jId, taskLog))) } override def close(): Unit = {