Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upgrade to natchez 0.3.x #101

Merged
merged 5 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.vscode/
dist/*
target/
lib_managed/
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import microsites.MicrositesPlugin.autoImport.micrositeDescription

val scala213Version = "2.13.8"
val scala3Version = "3.2.2"
val scala3Version = "3.3.0"

val scalaVersions = Seq(scala213Version, scala3Version)

Expand Down Expand Up @@ -63,7 +63,7 @@ lazy val metricsCommon = projectMatrix
.settings(common :+ (name := "natchez-extras-metrics"))

val log4catsVersion = "2.2.0"
val natchezVersion = "0.1.6"
val natchezVersion = "0.3.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

natchez 0.3.3 is compiled with Scala 3.3.0 which means we need to bump our version as well (see compilation errors in github actions)

val http4sMilestoneVersion = "1.0.0-M38"
val http4sStableVersion = "0.23.14"
val circeVersion = "0.14.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ class IOLocalEntrypoint(private val ep: EntryPoint[IO], private val local: IOLoc
private def localise(span: Span[IO]): Resource[IO, Span[IO]] =
Resource.make(local.getAndSet(span))(parentSpan => local.set(parentSpan))

override def root(name: String): Resource[IO, Span[IO]] = ep.root(name).flatTap(localise)
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] =
ep.root(name, options).flatTap(localise)

override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
ep.continue(name, kernel).flatTap(localise)
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] =
ep.continue(name, kernel, options).flatTap(localise)

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
ep.continueOrElseRoot(name, kernel).flatTap(localise)
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
ep.continueOrElseRoot(name, kernel, options).flatTap(localise)
}

object IOLocalEntrypoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import natchez.{Kernel, Span, Trace, TraceValue}
import cats.effect.{IO, IOLocal, MonadCancel}

import java.net.URI
import cats.effect.Resource
import cats.~>

class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {

Expand All @@ -16,9 +18,9 @@ class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {
private def scope[G](t: Span[IO])(f: IO[G]): IO[G] =
MonadCancel[IO, Throwable].bracket(local.getAndSet(t))(_ => f)(local.set)

override def span[A](name: String)(k: IO[A]): IO[A] =
override def span[A](name: String, options: Span.Options)(k: IO[A]): IO[A] =
local.get.flatMap {
_.span(name).use { span =>
_.span(name, options).use { span =>
scope(span)(k)
}
}
Expand All @@ -28,4 +30,22 @@ class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {

override def traceUri: IO[Option[URI]] =
local.get.flatMap(_.traceUri)

override def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] =
local.get.flatMap(_.attachError(err, fields: _*))
override def log(event: String): IO[Unit] = local.get.flatMap(_.log(event))
override def log(fields: (String, TraceValue)*): IO[Unit] = local.get.flatMap(_.log(fields: _*))
override def spanR(name: String, options: Span.Options): Resource[IO, IO ~> IO] =
for {
parent <- Resource.eval(local.get)
child <- parent.span(name, options)
} yield new (IO ~> IO) {
def apply[A](fa: IO[A]): IO[A] =
local.get.flatMap { old =>
local
.set(child)
.bracket(_ => fa.onError(child.attachError(_)))(_ => local.set(old))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
spanR = epUnderTest.continue("", Kernel(Map.empty))
Expand All @@ -26,7 +27,8 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def root(name: String): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
spanR = epUnderTest.root("")
Expand All @@ -40,7 +42,11 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
Expand All @@ -59,10 +65,14 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
}

private class TestEntrypoint extends EntryPoint[IO] {
override def root(name: String): Resource[IO, Span[IO]] = ???
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] = ???

override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] = ???
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] = ???

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] = ???
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ import java.net.URI
protected[ce3] abstract class TestSpan(val name: String) extends Span[IO] {
override def put(fields: (String, TraceValue)*): IO[Unit] = ???
override def kernel: IO[Kernel] = ???
override def span(name: String): Resource[IO, Span[IO]] = ???
override def span(name: String, options: Span.Options): Resource[IO, Span[IO]] = ???
override def traceId: IO[Option[String]] = ???
override def spanId: IO[Option[String]] = ???
override def traceUri: IO[Option[URI]] = ???
override def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] = ???
override def log(event: String): IO[Unit] = ???
override def log(fields: (String, TraceValue)*): IO[Unit] = ???
}

protected[ce3] object TestSpan {
val childSpan = new TestSpan("child") {}
val rootSpan = new TestSpan("root") {
override def span(name: String): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def span(name: String, options: Span.Options): Resource[IO, Span[IO]] = Resource.pure(childSpan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ object Combine {
def kernel: F[Kernel] =
(s1.kernel, s2.kernel).mapN { case (k1, k2) => Kernel(k1.toHeaders ++ k2.toHeaders) }

def span(name: String): Resource[F, Span[F]] =
(s1.span(name), s2.span(name)).mapN[Span[F]](combineSpan[F])

def put(fields: (String, TraceValue)*): F[Unit] =
(s1.put(fields: _*), s2.put(fields: _*)).tupled.as(())

Expand All @@ -33,17 +30,32 @@ object Combine {

def traceUri: F[Option[URI]] =
OptionT(s1.traceUri).orElseF(s2.traceUri).value

def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
(s1.attachError(err, fields: _*), s2.attachError(err, fields: _*)).tupled.as(())

def log(event: String): F[Unit] = (s1.log(event), s2.log(event)).tupled.as(())

def log(fields: (String, TraceValue)*): F[Unit] = (s1.log(fields: _*), s2.log(fields: _*)).tupled.as(())

def span(name: String, options: Span.Options): Resource[F, Span[F]] =
(s1.span(name, options), s2.span(name, options)).mapN[Span[F]](combineSpan[F])
}

def combine[F[_]: Sync](e1: EntryPoint[F], e2: EntryPoint[F]): EntryPoint[F] =
new EntryPoint[F] {
def root(name: String): Resource[F, Span[F]] =
(e1.root(name), e2.root(name)).mapN(combineSpan[F])

def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
(e1.continue(name, kernel), e2.continue(name, kernel)).mapN(combineSpan[F])

def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
(e1.continueOrElseRoot(name, kernel), e2.continueOrElseRoot(name, kernel)).mapN(combineSpan[F])
override def root(name: String, options: Span.Options): Resource[F, Span[F]] =
(e1.root(name, options), e2.root(name, options)).mapN(combineSpan[F])

override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
(e1.continue(name, kernel, options), e2.continue(name, kernel, options)).mapN(combineSpan[F])

override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[F, Span[F]] =
(e1.continueOrElseRoot(name, kernel, options), e2.continueOrElseRoot(name, kernel, options))
.mapN(combineSpan[F])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ object Datadog {
_ <- submitter(client, agentHost, queue)
} yield {
new EntryPoint[F] {
def root(name: String): Resource[F, Span[F]] =
def root(name: String, options: Span.Options): Resource[F, Span[F]] =
Resource
.eval(SpanIdentifiers.create.flatMap(Ref.of[F, SpanIdentifiers]))
.flatMap(DatadogSpan.create(queue, names(name)))
.widen

def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromKernel(queue, names(name), kernel).widen

def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
def continueOrElseRoot(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromKernel(queue, names(name), kernel).widen
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import natchez.TraceValue.{BooleanValue, NumberValue, StringValue}
import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import natchez.Tags

/**
* Models an in-progress span we'll eventually send to Datadog.
Expand Down Expand Up @@ -42,7 +43,7 @@ case class DatadogSpan[F[_]: Async](
meta.update(m => fields.foldLeft(m) { case (m, (k, v)) => m.updated(k, v) }) >>
updateTraceToken(fields.toMap)

def span(name: String): Resource[F, Span[F]] =
def span(name: String, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromParent(name, parent = this).widen

def kernel: F[Kernel] =
Expand All @@ -56,6 +57,13 @@ case class DatadogSpan[F[_]: Async](

def traceUri: F[Option[URI]] =
Monad[F].pure(None)

override def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
put(Tags.error(true) :: fields.toList: _*)

override def log(event: String): F[Unit] = put("event" -> TraceValue.StringValue(event))

override def log(fields: (String, TraceValue)*): F[Unit] = put(fields: _*)
}

object DatadogSpan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object SpanIdentifiers {
* partial data (i.e. just a trace token) is still useful to us
*/
def fromKernel[F[_]: Sync](rawKernel: Kernel): F[SpanIdentifiers] = {
val headers = Headers(rawKernel.toHeaders.toSeq)
val headers = Headers(rawKernel.toHeaders.map { case (k, v) => k.toString -> v }.toSeq)
(
traceId(headers),
UnsignedLong.random[F],
Expand All @@ -70,6 +70,6 @@ object SpanIdentifiers {
`X-B3-Trace-Id`(ids.traceId),
`X-B3-Span-Id`(ids.spanId),
"X-Trace-Token" -> ids.traceToken
).headers.map(r => r.name.toString -> r.value).toMap
).headers.map(r => r.name -> r.value).toMap
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import natchez.EntryPoint
import org.http4s.Request
import org.http4s.circe.CirceEntityDecoder._
import org.http4s.syntax.literals._
import org.typelevel.ci.CIStringSyntax

import scala.concurrent.duration._

Expand Down Expand Up @@ -42,7 +43,7 @@ class DatadogTest extends CatsEffectSuite {
client <- TestClient[IO]
ep = entryPoint(client.client, "a", "b", agentHost = uri"http://example.com")
kernel <- ep.use(_.root("foo").use(s => s.put("traceToken" -> "foo") >> s.kernel))
} yield kernel.toHeaders.get("X-Trace-Token")
} yield kernel.toHeaders.get(ci"X-Trace-Token")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.ovoenergy.natchez.extras.datadog.SpanIdentifiers._
import com.ovoenergy.natchez.extras.datadog.data.UnsignedLong
import munit.CatsEffectSuite
import natchez.Kernel
import org.typelevel.ci._

class SpanIdentifiersTest extends CatsEffectSuite {

Expand Down Expand Up @@ -33,18 +34,18 @@ class SpanIdentifiersTest extends CatsEffectSuite {

test("fromKernel should succeed in converting from a kernel even if info is missing") {
assertIOBoolean(fromKernel[IO](Kernel(Map.empty)).attempt.map(_.isRight))
assertIO(fromKernel[IO](Kernel(Map("X-Trace-Token" -> "foo"))).map(_.traceToken), "foo")
assertIO(fromKernel[IO](Kernel(Map(ci"X-Trace-Token" -> "foo"))).map(_.traceToken), "foo")
}

test("fromKernel should ignore header case when extracting info") {
assertIO(fromKernel[IO](Kernel(Map("x-TRACe-tokeN" -> "foo"))).map(_.traceToken), "foo")
assertIO(fromKernel[IO](Kernel(Map(ci"x-TRACe-tokeN" -> "foo"))).map(_.traceToken), "foo")
}

test("toKernel should output hex-encoded B3 Trace IDs alongside decimal encoded Datadog IDs") {
for {
ids <- SpanIdentifiers.create[IO].map(SpanIdentifiers.toKernel)
ddSpanId <- IO.fromOption(ids.toHeaders.get("X-Trace-Id"))(new Exception("Missing X-Trace-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get("X-B3-Trace-Id"))(new Exception("Missing X-B3-Trace-Id"))
ddSpanId <- IO.fromOption(ids.toHeaders.get(ci"X-Trace-Id"))(new Exception("Missing X-Trace-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get(ci"X-B3-Trace-Id"))(new Exception("Missing X-B3-Trace-Id"))
} yield {
val ddULong = UnsignedLong.fromString(ddSpanId, 10)
val b3ULong = UnsignedLong.fromString(b3SpanId, 16)
Expand All @@ -55,8 +56,8 @@ class SpanIdentifiersTest extends CatsEffectSuite {
test("toKernel should output hex-encoded B3 Span IDs alongside decimal encoded Datadog Parent IDs") {
for {
ids <- SpanIdentifiers.create[IO].map(SpanIdentifiers.toKernel)
ddSpanId <- IO.fromOption(ids.toHeaders.get("X-Parent-Id"))(new Exception("Missing X-Parent-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get("X-B3-Span-Id"))(new Exception("Missing X-B3-Span-Id"))
ddSpanId <- IO.fromOption(ids.toHeaders.get(ci"X-Parent-Id"))(new Exception("Missing X-Parent-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get(ci"X-B3-Span-Id"))(new Exception("Missing X-B3-Span-Id"))
} yield {
val ddULong = UnsignedLong.fromString(ddSpanId, 10)
val b3ULong = UnsignedLong.fromString(b3SpanId, 16)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import scala.concurrent.ExecutionContext.global
import natchez.Tags

class TracedTransactorTest extends CatsEffectSuite {

Expand All @@ -22,7 +23,7 @@ class TracedTransactorTest extends CatsEffectSuite {
def run[A](a: Traced[IO, A]): IO[List[SpanData]] =
Ref.of[IO, List[SpanData]](List(SpanData("root", Map.empty))).flatMap { sps =>
lazy val spanMock: Span[IO] = new Span[IO] {
def span(name: String): Resource[IO, Span[IO]] =
def span(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.eval(sps.update(_ :+ SpanData(name, Map.empty)).as(spanMock))
def kernel: IO[Kernel] =
IO.pure(Kernel(Map.empty))
Expand All @@ -34,6 +35,10 @@ class TracedTransactorTest extends CatsEffectSuite {
IO.pure(None)
def traceUri: IO[Option[URI]] =
IO.pure(None)
def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] =
put(Tags.error(true) :: fields.toList: _*)
def log(event: String): IO[Unit] = put("event" -> TraceValue.StringValue(event))
def log(fields: (String, TraceValue)*): IO[Unit] = put(fields: _*)
}
a.run(spanMock).attempt.flatMap(_ => sps.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import fs2.{Pipe, Stream}
import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import natchez.Tags

/**
* A Natchez span that has been pre-allocated and will stay open
Expand Down Expand Up @@ -45,8 +46,8 @@ object AllocatedSpan {
spn.kernel
def put(fields: (String, TraceValue)*): F[Unit] =
spn.put(fields: _*)
def span(name: String): Resource[F, Span[F]] =
spn.span(name)
def span(name: String, options: Span.Options): Resource[F, Span[F]] =
spn.span(name, options)
def addSubmitTask(task: F[Unit]): AllocatedSpan[F] =
createSpan(spn, F.uncancelable(_ => F.attempt(task) >> submit))
def submit: F[Unit] =
Expand All @@ -57,6 +58,10 @@ object AllocatedSpan {
spn.spanId
def traceUri: F[Option[URI]] =
spn.traceUri
def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
put(Tags.error(true) :: fields.toList: _*)
def log(event: String): F[Unit] = put("event" -> TraceValue.StringValue(event))
def log(fields: (String, TraceValue)*): F[Unit] = put(fields: _*)
}

/**
Expand Down
Loading