Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1638551 Start Open Telemetry Span before Process Actions #153

Merged
merged 9 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 54 additions & 56 deletions src/main/scala/com/snowflake/snowpark/internal/OpenTelemetry.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,39 +59,34 @@ object OpenTelemetry extends Logging {
funcName: String,
execName: String,
execHandler: String,
execFilePath: String)(func: => T): T = {
execFilePath: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newSpan =
UdfInfo(className, funcName, fileName, lineNumber, execName, execHandler, execFilePath)
emitSpan(newSpan, className, funcName, func)
emitSpan(newSpan, thunk)
}
// wrapper of all action functions
def action[T](className: String, funcName: String, methodChain: String)(func: => T): T = {
def action[T](className: String, funcName: String, methodChain: String)(thunk: => T): T = {
val stacks = Thread.currentThread().getStackTrace
val (fileName, lineNumber) = findLineNumber(stacks)
val newInfo =
ActionInfo(className, funcName, fileName, lineNumber, s"$methodChain.$funcName")
emitSpan(newInfo, className, funcName, func)
emitSpan(newInfo, thunk)
}

private def emitSpan[T](span: SpanInfo, className: String, funcName: String, thunk: => T): T = {
private def emitSpan[T](span: SpanInfo, thunk: => T): T = {
try {
spanInfo.value match {
case None =>
spanInfo.withValue(Some(span)) {
val result: T = thunk
// only emit one time, in the top level action
OpenTelemetry.emit(spanInfo.value.get)
result
span.emitSpan(thunk)
}
case _ =>
thunk
}
} catch {
case error: Throwable =>
OpenTelemetry.reportError(className, funcName, error)
throw error
case error: Throwable => throw span.reportError(error)
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -118,58 +113,49 @@ object OpenTelemetry extends Logging {
}
}
}
}

def emit(info: SpanInfo): Unit =
emit(info.className, info.funcName) { span =>
{
span.setAttribute("code.filepath", info.fileName)
span.setAttribute("code.lineno", info.lineNumber)
info match {
case ActionInfo(_, _, _, _, methodChain) =>
span.setAttribute("method.chain", methodChain)
case UdfInfo(_, _, _, _, execName, execHandler, execFilePath) =>
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
}
trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int

def reportError(className: String, funcName: String, error: Throwable): Unit =
emit(className, funcName) { span =>
{
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
}
}
lazy private val span =
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't init until emit function call.

GlobalOpenTelemetry
.getTracer(s"snow.snowpark.$className")
.spanBuilder(funcName)
.startSpan()

private def emit(className: String, funcName: String)(report: Span => Unit): Unit = {
val name = s"snow.snowpark.$className"
val tracer = GlobalOpenTelemetry.getTracer(name)
val span = tracer.spanBuilder(funcName).startSpan()
private def emit[T](thunk: => T): T = {
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
val scope = span.makeCurrent()
// Using Manager is not available in Scala 2.12 yet
try {
report(span)
} catch {
case e: Exception =>
logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
} finally {
scope.close()
}
thunk
} catch {
case e: Exception =>
OpenTelemetry.logWarning(s"Error when acquiring span attributes. ${e.getMessage}")
throw e
} finally {
scope.close()
span.end()
}
}

}
protected def withAdditionalInfo(span: Span): Unit

trait SpanInfo {
val className: String
val funcName: String
val fileName: String
val lineNumber: Int
def emitSpan[T](thunk: => T): T = emit {
span.setAttribute("code.filepath", fileName)
span.setAttribute("code.lineno", lineNumber)
withAdditionalInfo(span)
thunk
}

def reportError(error: Throwable): Throwable = emit {
span.setStatus(StatusCode.ERROR, error.getMessage)
span.recordException(error)
error
}
}

case class ActionInfo(
Expand All @@ -178,7 +164,12 @@ case class ActionInfo(
override val fileName: String,
override val lineNumber: Int,
methodChain: String)
extends SpanInfo
extends SpanInfo {

override protected def withAdditionalInfo(span: Span): Unit = {
span.setAttribute("method.chain", methodChain)
}
}

case class UdfInfo(
override val className: String,
Expand All @@ -188,4 +179,11 @@ case class UdfInfo(
execName: String,
execHandler: String,
execFilePath: String)
extends SpanInfo
extends SpanInfo {

override protected def withAdditionalInfo(span: Span): Unit = {
span.setAttribute("snow.executable.name", execName)
span.setAttribute("snow.executable.handler", execHandler)
span.setAttribute("snow.executable.filepath", execFilePath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -430,13 +430,14 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
}

test("OpenTelemetry.emit") {
OpenTelemetry.emit(ActionInfo("ClassA", "functionB", "fileC", 123, "chainD"))
ActionInfo("ClassA", "functionB", "fileC", 123, "chainD").emitSpan(1)
checkSpan("snow.snowpark.ClassA", "functionB", "fileC", 123, "chainD")
}

test("report error") {
val error = new Exception("test")
OpenTelemetry.reportError("ClassA1", "functionB1", error)
val span = ActionInfo("ClassA1", "functionB1", "", 0, "")
span.reportError(error)
checkSpanError("snow.snowpark.ClassA1", "functionB1", error)
}

Expand All @@ -446,6 +447,16 @@ class OpenTelemetrySuite extends OpenTelemetryEnabled {
assert(l.size() == 1)
}

test("actions should be processed in the span time period") {
val result = session.sql("select current_timestamp()").collect().head.getTimestamp(0)
sfc-gh-enie marked this conversation as resolved.
Show resolved Hide resolved
val l = testSpanExporter.getFinishedSpanItems
val spanStart = l.get(0).getStartEpochNanos / 1000000
val time = result.getTime
val spanEnd = l.get(0).getEndEpochNanos / 1000000
assert(spanStart < time)
assert(time < spanEnd)
}

override def beforeAll: Unit = {
super.beforeAll
createStage(stageName1)
Expand Down
Loading