Skip to content

Commit

Permalink
Merge pull request #1 from bplommer/pr/listen-to-updates
Browse files Browse the repository at this point in the history
Expose flag value updates
  • Loading branch information
bplommer authored Oct 4, 2022
2 parents ef235e0 + a9f3d4f commit e5b1520
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 7 deletions.
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ lazy val core = crossProject(JVMPlatform)
libraryDependencies ++= Seq(
"org.typelevel" %%% "cats-core" % "2.8.0",
"org.typelevel" %%% "cats-effect" % "3.3.14",
"com.launchdarkly" % "launchdarkly-java-server-sdk" % "5.10.2",
"co.fs2" %%% "fs2-core" % "3.3.0",
("com.launchdarkly" % "launchdarkly-java-server-sdk" % "5.10.2")
.exclude("com.launchdarkly", "launchdarkly-logging"),
),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package io.github.bplommer.launchcatsly

import cats.effect.{Resource, Sync}
import cats.effect.std.{Dispatcher, Queue}
import cats.effect.{Async, Resource}
import cats.~>
import com.launchdarkly.sdk.server.interfaces.{FlagValueChangeEvent, FlagValueChangeListener}
import com.launchdarkly.sdk.server.{LDClient, LDConfig}
import com.launchdarkly.sdk.{LDUser, LDValue}
import fs2._

trait LaunchDarklyClient[F[_]] { self =>

trait LaunchDarklyClient[F[_]] {
def boolVariation(featureKey: String, user: LDUser, defaultValue: Boolean): F[Boolean]

def stringVariation(featureKey: String, user: LDUser, defaultValue: String): F[String]
Expand All @@ -33,21 +35,41 @@ trait LaunchDarklyClient[F[_]] { self =>

def jsonVariation(featureKey: String, user: LDUser, defaultValue: LDValue): F[LDValue]

def listen(featureKey: String, user: LDUser): Stream[F, FlagValueChangeEvent]

def flush: F[Unit]

def mapK[G[_]](fk: F ~> G): LaunchDarklyClient[G]
}

object LaunchDarklyClient {
def resource[F[_]](sdkKey: String, config: LDConfig)(implicit
F: Sync[F]
F: Async[F]
): Resource[F, LaunchDarklyClient[F]] =
Resource
.fromAutoCloseable(F.blocking(new LDClient(sdkKey, config)))
.map { ldClient =>
new LaunchDarklyClient.Default[F] {

override def unsafeWithJavaClient[A](f: LDClient => A): F[A] =
F.blocking(f(ldClient))

F.delay(f(ldClient))

override def listen(featureKey: String, user: LDUser): Stream[F, FlagValueChangeEvent] =
Stream.eval(F.delay(ldClient.getFlagTracker)).flatMap { tracker =>
Stream.resource(Dispatcher[F]).flatMap { dispatcher =>
Stream.eval(Queue.unbounded[F, FlagValueChangeEvent]).flatMap { q =>
val listener = new FlagValueChangeListener {
override def onFlagValueChange(event: FlagValueChangeEvent): Unit =
dispatcher.unsafeRunSync(q.offer(event))
}

Stream.bracket(
F.delay(tracker.addFlagValueChangeListener(featureKey, user, listener))
)(listener => F.delay(tracker.removeFlagChangeListener(listener))) >>
Stream.fromQueueUnterminated(q)
}
}
}
}
}

Expand All @@ -70,10 +92,17 @@ object LaunchDarklyClient {
override def jsonVariation(featureKey: String, user: LDUser, default: LDValue): F[LDValue] =
unsafeWithJavaClient(_.jsonValueVariation(featureKey, user, default))

override def flush: F[Unit] = unsafeWithJavaClient(_.flush())

override def mapK[G[_]](fk: F ~> G): LaunchDarklyClient[G] = new LaunchDarklyClient.Default[G] {
override def unsafeWithJavaClient[A](f: LDClient => A): G[A] = fk(
self.unsafeWithJavaClient(f)
)

override def listen(featureKey: String, user: LDUser): Stream[G, FlagValueChangeEvent] =
self.listen(featureKey, user).translate(fk)

override def flush: G[Unit] = fk(self.flush)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@

package com.github.bplommer.launchcatsly

import cats.data.{Chain, NonEmptyChain}
import cats.effect._
import cats.effect.std.Supervisor
import com.launchdarkly.sdk.server.interfaces.FlagValueChangeEvent
import com.launchdarkly.sdk.{LDUser, LDValue}
import io.github.bplommer.launchcatsly.testkit._
import weaver.SimpleIOSuite

import scala.concurrent.duration.DurationInt

object VariationTests extends SimpleIOSuite {
test("serve value of boolean variations")(
testClient.use { case (td, client) =>
Expand All @@ -37,4 +42,39 @@ object VariationTests extends SimpleIOSuite {

}
)

test("listen to change events")(
testClient.use { case (td, client) =>
def setFooFlag(value: String) = IO(td.update(td.flag("foo").valueForAll(LDValue.of(value))))

Supervisor[IO].use { sup =>
for {
received <- IO.ref[Chain[FlagValueChangeEvent]](Chain.empty)
_ <- sup.supervise(
client
.listen("foo", new LDUser.Builder("derek").build())
.evalTap(event => received.update(_.append(event)))
.compile
.drain
)
_ <- IO.sleep(500.millis)
_ <- setFooFlag("value1")
_ <- setFooFlag("value2")
_ <- client.flush
_ <- IO.sleep(1000.millis)
result <- received.get
unchained = NonEmptyChain
.fromChain(
result.map(event =>
(event.getOldValue.stringValue(), event.getNewValue.stringValue())
)
)
.map(_.reduceLeft { case ((old1, new1), (old2, new2)) =>
if (old2 == new1) (old1, new2) else throw new Exception("")
})

} yield expect(unchained == Some((null, "value2")))
}
}
)
}

0 comments on commit e5b1520

Please sign in to comment.