Skip to content

Commit

Permalink
jdbc driver support url default db
Browse files Browse the repository at this point in the history
  • Loading branch information
peacewong committed Jun 7, 2024
1 parent 8460137 commit 8389e40
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.linkis.engineplugin.hive.executor

import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{ByteTimeUtils, Logging, Utils}
import org.apache.linkis.engineconn.computation.executor.conf.ComputationExecutorConf
import org.apache.linkis.engineconn.computation.executor.execute.{
ConcurrentComputationExecutor,
EngineExecutionContext
Expand Down Expand Up @@ -223,9 +224,11 @@ class HiveEngineConcurrentConnExecutor(
var compileRet = -1
Utils.tryCatch {
compileRet = driver.compile(realCode)
logger.info(s"driver compile realCode : ${realCode} finished, status : ${compileRet}")
logger.info(
s"driver compile realCode : \n ${realCode} \n finished, status : ${compileRet}"
)
if (0 != compileRet) {
logger.warn(s"compile realCode : ${realCode} error status : ${compileRet}")
logger.warn(s"compile realCode : \n ${realCode} \n error status : ${compileRet}")
throw HiveQueryFailedException(
COMPILE_HIVE_QUERY_ERROR.getErrorCode,
COMPILE_HIVE_QUERY_ERROR.getErrorDesc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.linkis.engineplugin.hive.errorcode.HiveErrorCodeSummary.{
}
import org.apache.linkis.engineplugin.hive.exception.HiveQueryFailedException
import org.apache.linkis.engineplugin.hive.progress.HiveProgressHelper
import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.governance.common.utils.JobUtils
import org.apache.linkis.hadoop.common.conf.HadoopConf
Expand Down Expand Up @@ -144,49 +145,82 @@ class HiveEngineConnExecutor(
val realCode = code.trim()
LOG.info(s"hive client begins to run hql code:\n ${realCode.trim}")
val jobId = JobUtils.getJobIdFromMap(engineExecutorContext.getProperties)

if (StringUtils.isNotBlank(jobId)) {
LOG.info(s"set mapreduce.job.tags=LINKIS_$jobId")
hiveConf.set("mapreduce.job.tags", s"LINKIS_$jobId")
val jobTags = JobUtils.getJobSourceTagsFromObjectMap(engineExecutorContext.getProperties)
val tags = if (StringUtils.isAsciiPrintable(jobTags)) {
s"LINKIS_$jobId,$jobTags"
} else {
s"LINKIS_$jobId"
}
LOG.info(s"set mapreduce.job.tags=$tags")
hiveConf.set("mapreduce.job.tags", tags)
}

if (realCode.trim.length > 500) {
engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim.substring(0, 500)} ...")
} else engineExecutorContext.appendStdout(s"$getId >> ${realCode.trim}")
val tokens = realCode.trim.split("""\s+""")
SessionState.setCurrentSessionState(sessionState)
sessionState.setLastCommand(code)
if (
engineExecutorContext.getCurrentParagraph == 1 && engineExecutorContext.getProperties
.containsKey(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
) {
val defaultDB =
engineExecutorContext.getProperties
.get(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
.asInstanceOf[String]
logger.info(s"set default DB to $defaultDB")
sessionState.setCurrentDatabase(defaultDB)
}
val proc = CommandProcessorFactory.get(tokens, hiveConf)
this.proc = proc
LOG.debug("ugi is " + ugi.getUserName)
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
logger.info(s"driver is $any")
thread = Thread.currentThread()
driver = new HiveDriverProxy(any)
executeHQL(realCode, driver)
case _ =>
val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
logger.info("RESULT => {}", result)
engineExecutorContext.appendStdout(result)
baos.reset()
if (resp.getResponseCode != 0) {
clearCurrentProgress()
Utils.tryFinally {
ugi.doAs(new PrivilegedExceptionAction[ExecuteResponse]() {
override def run(): ExecuteResponse = {
proc match {
case any if HiveDriverProxy.isDriver(any) =>
logger.info(s"driver is $any")
thread = Thread.currentThread()
driver = new HiveDriverProxy(any)
executeHQL(realCode, driver)
case _ =>
val resp = proc.run(realCode.substring(tokens(0).length).trim)
val result = new String(baos.toByteArray)
logger.info("RESULT => {}", result)
engineExecutorContext.appendStdout(result)
baos.reset()
if (resp.getResponseCode != 0) {
clearCurrentProgress()
HiveProgressHelper.clearHiveProgress()
onComplete()
singleSqlProgressMap.clear()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
throw resp.getException
}
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
onComplete()
singleSqlProgressMap.clear()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
throw resp.getException
}
HiveProgressHelper.clearHiveProgress()
HiveProgressHelper.storeSingleSQLProgress(0.0f)
onComplete()
singleSqlProgressMap.clear()
SuccessExecuteResponse()
SuccessExecuteResponse()
}
}
})
} {
if (this.driver != null) {
Utils.tryQuietly {
driver.close()
this.driver = null
val ss = SessionState.get()
if (ss != null) {
ss.deleteTmpOutputFile()
ss.deleteTmpErrOutputFile()
}
}
}
})
}
}

private def executeHQL(realCode: String, driver: HiveDriverProxy): ExecuteResponse = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ object SQLSession extends Logging {
"Spark application sc has already stopped, please restart it."
)
}

val startTime = System.currentTimeMillis()
// sc.setJobGroup(jobGroup, "Get IDE-SQL Results.", false)

Expand Down Expand Up @@ -121,7 +120,7 @@ object SQLSession extends Logging {
)
)
.toArray[Column]
columns.foreach(c => logger.info(s"c is ${c.getColumnName()}, comment is ${c.getComment()}"))
columns.foreach(c => logger.info(s"c is ${c.columnName}, comment is ${c.comment}"))
if (columns == null || columns.isEmpty) return
val metaData = new TableMetaData(columns)
val writer =
Expand All @@ -147,11 +146,11 @@ object SQLSession extends Logging {
)
}
val taken = ByteTimeUtils.msDurationToString(System.currentTimeMillis - startTime)
logger.info(s"Time taken: ${taken}, Fetched $index row(s).")
logger.info(s"Time taken: ${taken}, Fetched $index row(s)")
// to register TempTable
// Utils.tryAndErrorMsg(CSTableRegister.registerTempTable(engineExecutorContext, writer, alias, columns))("Failed to register tmp table:")
engineExecutionContext.appendStdout(
s"${EngineUtils.getName} >> Time taken: ${taken}, Fetched $index row(s)."
s"${EngineUtils.getName} >> Time taken: ${taken}, Fetched ${columns.length} col(s) : $index row(s)"
)
engineExecutionContext.sendResultSet(writer)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
override def init(): Unit = {
setCodeParser(new PythonCodeParser)
super.init()
logger.info("spark sql executor start")
logger.info("spark python executor start")
}

override def killTask(taskID: String): Unit = {
Expand All @@ -113,7 +113,14 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
}

override def close: Unit = {
logger.info("python executor ready to close")

logger.info(s"To remove pyspark executor")
Utils.tryAndError(
ExecutorManager.getInstance.removeExecutor(getExecutorLabels().asScala.toArray)
)
logger.info(s"Finished remove pyspark executor")

logger.info("To kill pyspark process")
if (process != null) {
if (gatewayServer != null) {
Utils.tryAndError(gatewayServer.shutdown())
Expand All @@ -126,18 +133,14 @@ class SparkPythonExecutor(val sparkEngineSession: SparkEngineSession, val id: In
logger.info(s"Try to kill pyspark process with: [kill -15 ${p}]")
GovernanceUtils.killProcess(String.valueOf(p), s"kill pyspark process,pid: $pid", false)
})
if (pid.isEmpty) {
process.destroy()
process = null
}

Utils.tryQuietly(process.destroy())
process = null
logger.info("Finished kill pyspark process")
}("process close failed")
}
logger.info(s"To delete python executor")
Utils.tryAndError(
ExecutorManager.getInstance.removeExecutor(getExecutorLabels().asScala.toArray)
)
logger.info(s"Finished to kill python")
logger.info("python executor Finished to close")

logger.info("python executor finished to close")
}

override def getKind: Kind = PySpark()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,13 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
Utils.newCachedExecutionContext(5, "Spark-Scala-REPL-Thread-", true)

override def init(): Unit = {

System.setProperty("scala.repl.name.line", ("$line" + this.hashCode).replace('-', '0'))

setCodeParser(new ScalaCodeParser)
super.init()
logger.info("spark scala executor start")
}

def lazyInitLoadILoop(): Unit = {
if (sparkILoop == null) {
synchronized {
if (sparkILoop == null) createSparkILoop
Expand Down Expand Up @@ -146,20 +148,30 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
jobGroup: String
): ExecuteResponse = {
this.jobGroup.append(jobGroup)
lazyInitLoadILoop()
if (null != sparkILoop.intp && null != sparkILoop.intp.classLoader) {
Thread.currentThread().setContextClassLoader(sparkILoop.intp.classLoader)
}
if (engineExecutionContext != this.engineExecutionContextFactory.getEngineExecutionContext) {
lineOutputStream.reset(engineExecutionContext)
}

lazyLoadILoop
doBindSparkSession()

lineOutputStream.ready()
if (sparkILoopInited) {
this.engineExecutionContextFactory.setEngineExecutionContext(engineExecutionContext)
}
var res: ExecuteResponse = null

if (Thread.currentThread().isInterrupted) {
logger.error("The thread of execution has been interrupted and the task should be terminated")
return ErrorExecuteResponse(
"The thread of execution has been interrupted and the task should be terminated",
null
)
}

Utils.tryCatch {
res = executeLine(code, engineExecutionContext)
} { case e: Exception =>
Expand Down Expand Up @@ -220,30 +232,43 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
// error("incomplete code.")
IncompleteExecuteResponse(null)
case Results.Error =>
lineOutputStream.flush()
val output = lineOutputStream.toString
IOUtils.closeQuietly(lineOutputStream)
var errorMsg: String = null
if (StringUtils.isNotBlank(output)) {
errorMsg = Utils.tryCatch(EngineUtils.getResultStrByDolphinTextContent(output))(t =>
t.getMessage
if (Thread.currentThread().isInterrupted) {
logger.error(
"The thread of execution has been interrupted and the task should be terminated"
)
logger.error("Execute code error for " + errorMsg)
engineExecutionContext.appendStdout("Execute code error for " + errorMsg)
if (matchFatalLog(errorMsg)) {
logger.error("engine log fatal logs now to set status to shutdown")
ExecutorManager.getInstance.getReportExecutor.tryShutdown()
Utils.tryQuietly {
IOUtils.closeQuietly(lineOutputStream)
}
ErrorExecuteResponse(
"The thread of execution has been interrupted and the task should be terminated",
null
)
} else {
logger.error("No error message is captured, please see the detailed log")
}
ErrorExecuteResponse(
errorMsg,
ExecuteError(
EXECUTE_SPARKSCALA_FAILED.getErrorCode,
EXECUTE_SPARKSCALA_FAILED.getErrorDesc
lineOutputStream.flush()
val output = lineOutputStream.toString
IOUtils.closeQuietly(lineOutputStream)
var errorMsg: String = null
if (StringUtils.isNotBlank(output)) {
errorMsg = Utils.tryCatch(EngineUtils.getResultStrByDolphinTextContent(output))(t =>
t.getMessage
)
logger.error("Execute code error for " + errorMsg)
engineExecutionContext.appendStdout("Execute code error for " + errorMsg)
if (matchFatalLog(errorMsg)) {
logger.error("engine log fatal logs now to set status to shutdown")
ExecutorManager.getInstance.getReportExecutor.tryShutdown()
}
} else {
logger.error("No error message is captured, please see the detailed log")
}
ErrorExecuteResponse(
errorMsg,
ExecuteError(
EXECUTE_SPARKSCALA_FAILED.getErrorCode,
EXECUTE_SPARKSCALA_FAILED.getErrorDesc
)
)
)
}
}
}
// reset the java stdout
Expand Down Expand Up @@ -275,11 +300,10 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
}
}

private def lazyLoadILoop = { // lazy loaded.
private def doBindSparkSession() = { // lazy loaded.
if (!bindFlag) {
bindSparkSession
}

}

private def initSparkILoop = {
Expand Down Expand Up @@ -418,6 +442,7 @@ class SparkScalaExecutor(sparkEngineSession: SparkEngineSession, id: Long)
}

override protected def getExecutorIdPreFix: String = "SparkScalaExecutor_"

}

class EngineExecutionContextFactory {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.linkis.engineplugin.spark.common.{Kind, SparkSQL}
import org.apache.linkis.engineplugin.spark.config.SparkConfiguration
import org.apache.linkis.engineplugin.spark.entity.SparkEngineSession
import org.apache.linkis.engineplugin.spark.utils.EngineUtils
import org.apache.linkis.governance.common.constant.job.JobRequestConstants
import org.apache.linkis.governance.common.paser.SQLCodeParser
import org.apache.linkis.scheduler.executer.{
ErrorExecuteResponse,
Expand Down Expand Up @@ -53,6 +54,18 @@ class SparkSqlExecutor(sparkEngineSession: SparkEngineSession, id: Long)
jobGroup: String
): ExecuteResponse = {

if (
engineExecutionContext.getCurrentParagraph == 1 && engineExecutionContext.getProperties
.containsKey(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
) {
val defaultDB =
engineExecutionContext.getProperties
.get(JobRequestConstants.LINKIS_JDBC_DEFAULT_DB)
.asInstanceOf[String]
logger.info(s"set default DB to $defaultDB")
sparkEngineSession.sqlContext.sql(s"use $defaultDB")
}

logger.info("SQLExecutor run query: " + code)
engineExecutionContext.appendStdout(s"${EngineUtils.getName} >> $code")
val standInClassLoader = Thread.currentThread().getContextClassLoader
Expand Down

0 comments on commit 8389e40

Please sign in to comment.