Skip to content

Commit

Permalink
Merge pull request #101 from massimosiani/natchez-0.3.x
Browse files Browse the repository at this point in the history
upgrade to natchez 0.3.x
  • Loading branch information
voidcontext authored Aug 17, 2023
2 parents 63db567 + 4ca1374 commit bf11bb1
Show file tree
Hide file tree
Showing 25 changed files with 166 additions and 71 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.vscode/
dist/*
target/
lib_managed/
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import microsites.MicrositesPlugin.autoImport.micrositeDescription

val scala213Version = "2.13.8"
val scala3Version = "3.2.2"
val scala3Version = "3.3.0"

val scalaVersions = Seq(scala213Version, scala3Version)

Expand Down Expand Up @@ -63,7 +63,7 @@ lazy val metricsCommon = projectMatrix
.settings(common :+ (name := "natchez-extras-metrics"))

val log4catsVersion = "2.2.0"
val natchezVersion = "0.1.6"
val natchezVersion = "0.3.3"
val http4sMilestoneVersion = "1.0.0-M38"
val http4sStableVersion = "0.23.14"
val circeVersion = "0.14.1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@ class IOLocalEntrypoint(private val ep: EntryPoint[IO], private val local: IOLoc
private def localise(span: Span[IO]): Resource[IO, Span[IO]] =
Resource.make(local.getAndSet(span))(parentSpan => local.set(parentSpan))

override def root(name: String): Resource[IO, Span[IO]] = ep.root(name).flatTap(localise)
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] =
ep.root(name, options).flatTap(localise)

override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
ep.continue(name, kernel).flatTap(localise)
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] =
ep.continue(name, kernel, options).flatTap(localise)

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
ep.continueOrElseRoot(name, kernel).flatTap(localise)
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
ep.continueOrElseRoot(name, kernel, options).flatTap(localise)
}

object IOLocalEntrypoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import natchez.{Kernel, Span, Trace, TraceValue}
import cats.effect.{IO, IOLocal, MonadCancel}

import java.net.URI
import cats.effect.Resource
import cats.~>

class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {

Expand All @@ -16,9 +18,9 @@ class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {
private def scope[G](t: Span[IO])(f: IO[G]): IO[G] =
MonadCancel[IO, Throwable].bracket(local.getAndSet(t))(_ => f)(local.set)

override def span[A](name: String)(k: IO[A]): IO[A] =
override def span[A](name: String, options: Span.Options)(k: IO[A]): IO[A] =
local.get.flatMap {
_.span(name).use { span =>
_.span(name, options).use { span =>
scope(span)(k)
}
}
Expand All @@ -28,4 +30,22 @@ class IOLocalTrace(private val local: IOLocal[Span[IO]]) extends Trace[IO] {

override def traceUri: IO[Option[URI]] =
local.get.flatMap(_.traceUri)

override def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] =
local.get.flatMap(_.attachError(err, fields: _*))
override def log(event: String): IO[Unit] = local.get.flatMap(_.log(event))
override def log(fields: (String, TraceValue)*): IO[Unit] = local.get.flatMap(_.log(fields: _*))
override def spanR(name: String, options: Span.Options): Resource[IO, IO ~> IO] =
for {
parent <- Resource.eval(local.get)
child <- parent.span(name, options)
} yield new (IO ~> IO) {
def apply[A](fa: IO[A]): IO[A] =
local.get.flatMap { old =>
local
.set(child)
.bracket(_ => fa.onError(child.attachError(_)))(_ => local.set(old))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
spanR = epUnderTest.continue("", Kernel(Map.empty))
Expand All @@ -26,7 +27,8 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def root(name: String): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
spanR = epUnderTest.root("")
Expand All @@ -40,7 +42,11 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
for {
local <- IOLocal[Span[IO]](rootSpan)
ep = new TestEntrypoint {
override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] =
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] =
Resource.pure(childSpan)
}
epUnderTest = new IOLocalEntrypoint(ep, local)
Expand All @@ -59,10 +65,14 @@ class IOLocalEntrypointTest extends CatsEffectSuite {
}

private class TestEntrypoint extends EntryPoint[IO] {
override def root(name: String): Resource[IO, Span[IO]] = ???
override def root(name: String, options: Span.Options): Resource[IO, Span[IO]] = ???

override def continue(name: String, kernel: Kernel): Resource[IO, Span[IO]] = ???
override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[IO, Span[IO]] = ???

override def continueOrElseRoot(name: String, kernel: Kernel): Resource[IO, Span[IO]] = ???
override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[IO, Span[IO]] = ???
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ import java.net.URI
protected[ce3] abstract class TestSpan(val name: String) extends Span[IO] {
override def put(fields: (String, TraceValue)*): IO[Unit] = ???
override def kernel: IO[Kernel] = ???
override def span(name: String): Resource[IO, Span[IO]] = ???
override def span(name: String, options: Span.Options): Resource[IO, Span[IO]] = ???
override def traceId: IO[Option[String]] = ???
override def spanId: IO[Option[String]] = ???
override def traceUri: IO[Option[URI]] = ???
override def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] = ???
override def log(event: String): IO[Unit] = ???
override def log(fields: (String, TraceValue)*): IO[Unit] = ???
}

protected[ce3] object TestSpan {
val childSpan = new TestSpan("child") {}
val rootSpan = new TestSpan("root") {
override def span(name: String): Resource[IO, Span[IO]] = Resource.pure(childSpan)
override def span(name: String, options: Span.Options): Resource[IO, Span[IO]] = Resource.pure(childSpan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ object Combine {
def kernel: F[Kernel] =
(s1.kernel, s2.kernel).mapN { case (k1, k2) => Kernel(k1.toHeaders ++ k2.toHeaders) }

def span(name: String): Resource[F, Span[F]] =
(s1.span(name), s2.span(name)).mapN[Span[F]](combineSpan[F])

def put(fields: (String, TraceValue)*): F[Unit] =
(s1.put(fields: _*), s2.put(fields: _*)).tupled.as(())

Expand All @@ -33,17 +30,32 @@ object Combine {

def traceUri: F[Option[URI]] =
OptionT(s1.traceUri).orElseF(s2.traceUri).value

def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
(s1.attachError(err, fields: _*), s2.attachError(err, fields: _*)).tupled.as(())

def log(event: String): F[Unit] = (s1.log(event), s2.log(event)).tupled.as(())

def log(fields: (String, TraceValue)*): F[Unit] = (s1.log(fields: _*), s2.log(fields: _*)).tupled.as(())

def span(name: String, options: Span.Options): Resource[F, Span[F]] =
(s1.span(name, options), s2.span(name, options)).mapN[Span[F]](combineSpan[F])
}

def combine[F[_]: Sync](e1: EntryPoint[F], e2: EntryPoint[F]): EntryPoint[F] =
new EntryPoint[F] {
def root(name: String): Resource[F, Span[F]] =
(e1.root(name), e2.root(name)).mapN(combineSpan[F])

def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
(e1.continue(name, kernel), e2.continue(name, kernel)).mapN(combineSpan[F])

def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
(e1.continueOrElseRoot(name, kernel), e2.continueOrElseRoot(name, kernel)).mapN(combineSpan[F])
override def root(name: String, options: Span.Options): Resource[F, Span[F]] =
(e1.root(name, options), e2.root(name, options)).mapN(combineSpan[F])

override def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
(e1.continue(name, kernel, options), e2.continue(name, kernel, options)).mapN(combineSpan[F])

override def continueOrElseRoot(
name: String,
kernel: Kernel,
options: Span.Options
): Resource[F, Span[F]] =
(e1.continueOrElseRoot(name, kernel, options), e2.continueOrElseRoot(name, kernel, options))
.mapN(combineSpan[F])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,16 +127,16 @@ object Datadog {
_ <- submitter(client, agentHost, queue)
} yield {
new EntryPoint[F] {
def root(name: String): Resource[F, Span[F]] =
def root(name: String, options: Span.Options): Resource[F, Span[F]] =
Resource
.eval(SpanIdentifiers.create.flatMap(Ref.of[F, SpanIdentifiers]))
.flatMap(DatadogSpan.create(queue, names(name)))
.widen

def continue(name: String, kernel: Kernel): Resource[F, Span[F]] =
def continue(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromKernel(queue, names(name), kernel).widen

def continueOrElseRoot(name: String, kernel: Kernel): Resource[F, Span[F]] =
def continueOrElseRoot(name: String, kernel: Kernel, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromKernel(queue, names(name), kernel).widen
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import natchez.TraceValue.{BooleanValue, NumberValue, StringValue}
import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import natchez.Tags

/**
* Models an in-progress span we'll eventually send to Datadog.
Expand Down Expand Up @@ -42,7 +43,7 @@ case class DatadogSpan[F[_]: Async](
meta.update(m => fields.foldLeft(m) { case (m, (k, v)) => m.updated(k, v) }) >>
updateTraceToken(fields.toMap)

def span(name: String): Resource[F, Span[F]] =
def span(name: String, options: Span.Options): Resource[F, Span[F]] =
DatadogSpan.fromParent(name, parent = this).widen

def kernel: F[Kernel] =
Expand All @@ -56,6 +57,13 @@ case class DatadogSpan[F[_]: Async](

def traceUri: F[Option[URI]] =
Monad[F].pure(None)

override def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
put(Tags.error(true) :: fields.toList: _*)

override def log(event: String): F[Unit] = put("event" -> TraceValue.StringValue(event))

override def log(fields: (String, TraceValue)*): F[Unit] = put(fields: _*)
}

object DatadogSpan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object SpanIdentifiers {
* partial data (i.e. just a trace token) is still useful to us
*/
def fromKernel[F[_]: Sync](rawKernel: Kernel): F[SpanIdentifiers] = {
val headers = Headers(rawKernel.toHeaders.toSeq)
val headers = Headers(rawKernel.toHeaders.map { case (k, v) => k.toString -> v }.toSeq)
(
traceId(headers),
UnsignedLong.random[F],
Expand All @@ -70,6 +70,6 @@ object SpanIdentifiers {
`X-B3-Trace-Id`(ids.traceId),
`X-B3-Span-Id`(ids.spanId),
"X-Trace-Token" -> ids.traceToken
).headers.map(r => r.name.toString -> r.value).toMap
).headers.map(r => r.name -> r.value).toMap
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import natchez.EntryPoint
import org.http4s.Request
import org.http4s.circe.CirceEntityDecoder._
import org.http4s.syntax.literals._
import org.typelevel.ci.CIStringSyntax

import scala.concurrent.duration._

Expand Down Expand Up @@ -42,7 +43,7 @@ class DatadogTest extends CatsEffectSuite {
client <- TestClient[IO]
ep = entryPoint(client.client, "a", "b", agentHost = uri"http://example.com")
kernel <- ep.use(_.root("foo").use(s => s.put("traceToken" -> "foo") >> s.kernel))
} yield kernel.toHeaders.get("X-Trace-Token")
} yield kernel.toHeaders.get(ci"X-Trace-Token")
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import com.ovoenergy.natchez.extras.datadog.SpanIdentifiers._
import com.ovoenergy.natchez.extras.datadog.data.UnsignedLong
import munit.CatsEffectSuite
import natchez.Kernel
import org.typelevel.ci._

class SpanIdentifiersTest extends CatsEffectSuite {

Expand Down Expand Up @@ -33,18 +34,18 @@ class SpanIdentifiersTest extends CatsEffectSuite {

test("fromKernel should succeed in converting from a kernel even if info is missing") {
assertIOBoolean(fromKernel[IO](Kernel(Map.empty)).attempt.map(_.isRight))
assertIO(fromKernel[IO](Kernel(Map("X-Trace-Token" -> "foo"))).map(_.traceToken), "foo")
assertIO(fromKernel[IO](Kernel(Map(ci"X-Trace-Token" -> "foo"))).map(_.traceToken), "foo")
}

test("fromKernel should ignore header case when extracting info") {
assertIO(fromKernel[IO](Kernel(Map("x-TRACe-tokeN" -> "foo"))).map(_.traceToken), "foo")
assertIO(fromKernel[IO](Kernel(Map(ci"x-TRACe-tokeN" -> "foo"))).map(_.traceToken), "foo")
}

test("toKernel should output hex-encoded B3 Trace IDs alongside decimal encoded Datadog IDs") {
for {
ids <- SpanIdentifiers.create[IO].map(SpanIdentifiers.toKernel)
ddSpanId <- IO.fromOption(ids.toHeaders.get("X-Trace-Id"))(new Exception("Missing X-Trace-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get("X-B3-Trace-Id"))(new Exception("Missing X-B3-Trace-Id"))
ddSpanId <- IO.fromOption(ids.toHeaders.get(ci"X-Trace-Id"))(new Exception("Missing X-Trace-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get(ci"X-B3-Trace-Id"))(new Exception("Missing X-B3-Trace-Id"))
} yield {
val ddULong = UnsignedLong.fromString(ddSpanId, 10)
val b3ULong = UnsignedLong.fromString(b3SpanId, 16)
Expand All @@ -55,8 +56,8 @@ class SpanIdentifiersTest extends CatsEffectSuite {
test("toKernel should output hex-encoded B3 Span IDs alongside decimal encoded Datadog Parent IDs") {
for {
ids <- SpanIdentifiers.create[IO].map(SpanIdentifiers.toKernel)
ddSpanId <- IO.fromOption(ids.toHeaders.get("X-Parent-Id"))(new Exception("Missing X-Parent-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get("X-B3-Span-Id"))(new Exception("Missing X-B3-Span-Id"))
ddSpanId <- IO.fromOption(ids.toHeaders.get(ci"X-Parent-Id"))(new Exception("Missing X-Parent-Id"))
b3SpanId <- IO.fromOption(ids.toHeaders.get(ci"X-B3-Span-Id"))(new Exception("Missing X-B3-Span-Id"))
} yield {
val ddULong = UnsignedLong.fromString(ddSpanId, 10)
val b3ULong = UnsignedLong.fromString(b3SpanId, 16)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import scala.concurrent.ExecutionContext.global
import natchez.Tags

class TracedTransactorTest extends CatsEffectSuite {

Expand All @@ -22,7 +23,7 @@ class TracedTransactorTest extends CatsEffectSuite {
def run[A](a: Traced[IO, A]): IO[List[SpanData]] =
Ref.of[IO, List[SpanData]](List(SpanData("root", Map.empty))).flatMap { sps =>
lazy val spanMock: Span[IO] = new Span[IO] {
def span(name: String): Resource[IO, Span[IO]] =
def span(name: String, options: Span.Options): Resource[IO, Span[IO]] =
Resource.eval(sps.update(_ :+ SpanData(name, Map.empty)).as(spanMock))
def kernel: IO[Kernel] =
IO.pure(Kernel(Map.empty))
Expand All @@ -34,6 +35,10 @@ class TracedTransactorTest extends CatsEffectSuite {
IO.pure(None)
def traceUri: IO[Option[URI]] =
IO.pure(None)
def attachError(err: Throwable, fields: (String, TraceValue)*): IO[Unit] =
put(Tags.error(true) :: fields.toList: _*)
def log(event: String): IO[Unit] = put("event" -> TraceValue.StringValue(event))
def log(fields: (String, TraceValue)*): IO[Unit] = put(fields: _*)
}
a.run(spanMock).attempt.flatMap(_ => sps.get)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import fs2.{Pipe, Stream}
import natchez.{Kernel, Span, TraceValue}

import java.net.URI
import natchez.Tags

/**
* A Natchez span that has been pre-allocated and will stay open
Expand Down Expand Up @@ -45,8 +46,8 @@ object AllocatedSpan {
spn.kernel
def put(fields: (String, TraceValue)*): F[Unit] =
spn.put(fields: _*)
def span(name: String): Resource[F, Span[F]] =
spn.span(name)
def span(name: String, options: Span.Options): Resource[F, Span[F]] =
spn.span(name, options)
def addSubmitTask(task: F[Unit]): AllocatedSpan[F] =
createSpan(spn, F.uncancelable(_ => F.attempt(task) >> submit))
def submit: F[Unit] =
Expand All @@ -57,6 +58,10 @@ object AllocatedSpan {
spn.spanId
def traceUri: F[Option[URI]] =
spn.traceUri
def attachError(err: Throwable, fields: (String, TraceValue)*): F[Unit] =
put(Tags.error(true) :: fields.toList: _*)
def log(event: String): F[Unit] = put("event" -> TraceValue.StringValue(event))
def log(fields: (String, TraceValue)*): F[Unit] = put(fields: _*)
}

/**
Expand Down
Loading

0 comments on commit bf11bb1

Please sign in to comment.