Skip to content

Commit

Permalink
make zquery stack safe (#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamgfraser authored May 10, 2020
1 parent 3cd9f9e commit 2efe7cb
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 89 deletions.
156 changes: 69 additions & 87 deletions core/src/main/scala/zquery/ZQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,7 @@ import zio.duration._
* Concise Data Access" by Simon Marlow, Louis Brandy, Jonathan Coens, and Jon
* Purdy. [[http://simonmar.github.io/bib/papers/haxl-icfp14.pdf]]
*/
sealed trait ZQuery[-R, +E, +A] { self =>

/**
* Executes one step of this query.
*/
protected def step(cache: Cache): ZIO[R, Nothing, Result[R, E, A]]
final class ZQuery[-R, +E, +A] private (private val step: ZIO[(R, Cache), Nothing, Result[R, E, A]]) { self =>

/**
* A symbolic alias for `zipParRight`.
Expand Down Expand Up @@ -108,13 +103,12 @@ sealed trait ZQuery[-R, +E, +A] { self =>
* requests will still be applied.
*/
final def flatMap[R1 <: R, E1 >: E, B](f: A => ZQuery[R1, E1, B]): ZQuery[R1, E1, B] =
new ZQuery[R1, E1, B] {
def step(cache: Cache): ZIO[R1, Nothing, Result[R1, E1, B]] =
self.step(cache).flatMap {
case Result.Blocked(br, c) => ZIO.succeed(Result.blocked(br, c.flatMap(f)))
case Result.Done(a) => f(a).step(cache)
case Result.Fail(e) => ZIO.succeed(Result.fail(e))
}
ZQuery {
step.flatMap {
case Result.Blocked(br, c) => ZIO.succeed(Result.blocked(br, c.flatMap(f)))
case Result.Done(a) => f(a).step
case Result.Fail(e) => ZIO.succeed(Result.fail(e))
}
}

/**
Expand Down Expand Up @@ -142,27 +136,21 @@ sealed trait ZQuery[-R, +E, +A] { self =>
failure: Cause[E] => ZQuery[R1, E1, B],
success: A => ZQuery[R1, E1, B]
): ZQuery[R1, E1, B] =
new ZQuery[R1, E1, B] {
def step(cache: Cache): ZIO[R1, Nothing, Result[R1, E1, B]] =
self
.step(cache)
.foldCauseM(
failure(_).step(cache), {
case Result.Blocked(br, c) => ZIO.succeed(Result.blocked(br, c.foldCauseM(failure, success)))
case Result.Done(a) => success(a).step(cache)
case Result.Fail(e) => failure(e).step(cache)
}
)
ZQuery {
step.foldCauseM(
failure(_).step, {
case Result.Blocked(br, c) => ZIO.succeed(Result.blocked(br, c.foldCauseM(failure, success)))
case Result.Done(a) => success(a).step
case Result.Fail(e) => failure(e).step
}
)
}

/**
* Maps the specified function over the successful result of this query.
*/
final def map[B](f: A => B): ZQuery[R, E, B] =
new ZQuery[R, E, B] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, B]] =
self.step(cache).map(_.map(f))
}
ZQuery(step.map(_.map(f)))

/**
* Maps the specified function over the failed result of this query.
Expand Down Expand Up @@ -214,22 +202,18 @@ sealed trait ZQuery[-R, +E, +A] { self =>
final def provideLayer[E1 >: E, R0, R1 <: Has[_]](
layer: Described[ZLayer[R0, E1, R1]]
)(implicit ev1: R1 <:< R, ev2: NeedsEnv[R]): ZQuery[R0, E1, A] =
new ZQuery[R0, E1, A] {
def step(cache: Cache): ZIO[R0, Nothing, Result[R0, E1, A]] =
layer.value.build.run.use {
case Exit.Failure(e) => ZIO.succeed(Result.fail(e))
case Exit.Success(r) => self.step(cache).provide(r).map(_.provide(Described(r, layer.description)))
}
ZQuery {
layer.value.build.provideSome[(R0, Cache)](_._1).run.use {
case Exit.Failure(e) => ZIO.succeed(Result.fail(e))
case Exit.Success(r) => self.provide(Described(r, layer.description)).step
}
}

/**
* Provides this query with part of its required environment.
*/
final def provideSome[R0](f: Described[R0 => R])(implicit ev: NeedsEnv[R]): ZQuery[R0, E, A] =
new ZQuery[R0, E, A] {
def step(cache: Cache): ZIO[R0, Nothing, Result[R0, E, A]] =
self.step(cache).provideSome(f.value).map(_.provideSome(f))
}
ZQuery(step.map(_.provideSome(f)).provideSome(r => (f.value(r._1), r._2)))

/**
* Splits the environment into two parts, providing one part using the
Expand All @@ -250,7 +234,7 @@ sealed trait ZQuery[-R, +E, +A] { self =>
* without executing any new requests.
*/
final def runCache(cache: Cache): ZIO[R, E, A] =
step(cache).flatMap {
step.provideSome[R]((_, cache)).flatMap {
case Result.Blocked(br, c) => br.run *> c.runCache(cache)
case Result.Done(a) => ZIO.succeed(a)
case Result.Fail(e) => ZIO.halt(e)
Expand Down Expand Up @@ -342,17 +326,16 @@ sealed trait ZQuery[-R, +E, +A] { self =>
* be batched and deduplication and caching of requests will be applied.
*/
final def zipWithPar[R1 <: R, E1 >: E, B, C](that: ZQuery[R1, E1, B])(f: (A, B) => C): ZQuery[R1, E1, C] =
new ZQuery[R1, E1, C] {
def step(cache: Cache): ZIO[R1, Nothing, Result[R1, E1, C]] =
self.step(cache).zipPar(that.step(cache)).map {
case (Result.Blocked(br1, c1), Result.Blocked(br2, c2)) => Result.blocked(br1 ++ br2, c1.zipWithPar(c2)(f))
case (Result.Blocked(br, c), Result.Done(b)) => Result.blocked(br, c.map(a => f(a, b)))
case (Result.Done(a), Result.Blocked(br, c)) => Result.blocked(br, c.map(b => f(a, b)))
case (Result.Done(a), Result.Done(b)) => Result.done(f(a, b))
case (Result.Fail(e1), Result.Fail(e2)) => Result.fail(Cause.Both(e1, e2))
case (Result.Fail(e), _) => Result.fail(e)
case (_, Result.Fail(e)) => Result.fail(e)
}
ZQuery {
self.step.zipWithPar(that.step) {
case (Result.Blocked(br1, c1), Result.Blocked(br2, c2)) => Result.blocked(br1 ++ br2, c1.zipWithPar(c2)(f))
case (Result.Blocked(br, c), Result.Done(b)) => Result.blocked(br, c.map(a => f(a, b)))
case (Result.Done(a), Result.Blocked(br, c)) => Result.blocked(br, c.map(b => f(a, b)))
case (Result.Done(a), Result.Done(b)) => Result.done(f(a, b))
case (Result.Fail(e1), Result.Fail(e2)) => Result.fail(Cause.Both(e1, e2))
case (Result.Fail(e), _) => Result.fail(e)
case (_, Result.Fail(e)) => Result.fail(e)
}
}
}

Expand Down Expand Up @@ -410,7 +393,7 @@ object ZQuery {
* Constructs a query from an effect.
*/
def fromEffect[R, E, A](effect: ZIO[R, E, A]): ZQuery[R, E, A] =
ZQuery(effect.foldCause(Result.fail, Result.done))
ZQuery(effect.foldCause(Result.fail, Result.done).provideSome(_._1))

/**
* Constructs a query from a request and a data source. Queries will die with
Expand All @@ -421,39 +404,41 @@ object ZQuery {
def fromRequest[R, E, A, B](
request: A
)(dataSource: DataSource[R, A])(implicit ev: A <:< Request[E, B]): ZQuery[R, E, B] =
new ZQuery[R, E, B] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, B]] =
cache
.lookup(request)
.foldM(
_ =>
for {
ref <- Ref.make(Option.empty[Either[E, B]])
_ <- cache.insert(request, ref)
} yield Result.blocked(
BlockedRequestMap(dataSource, BlockedRequest(request, ref)),
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
ZQuery {
ZIO.accessM[(Any, Cache)] {
case (_, cache) =>
cache
.lookup(request)
.foldM(
_ =>
for {
ref <- Ref.make(Option.empty[Either[E, B]])
_ <- cache.insert(request, ref)
} yield Result.blocked(
BlockedRequestMap(dataSource, BlockedRequest(request, ref)),
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
}
),
ref =>
ref.get.map {
case Some(b) => Result.fromEither(b)
case None =>
Result.blocked(
BlockedRequestMap.empty,
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
),
ref =>
ref.get.map {
case Some(b) => Result.fromEither(b)
case None =>
Result.blocked(
BlockedRequestMap.empty,
ZQuery {
ref.get.flatMap {
case None => ZIO.die(QueryFailure(dataSource, request))
case Some(b) => ZIO.succeed(Result.fromEither(b))
}
}
}
)
}
)
)
}
)
}
}

/**
Expand Down Expand Up @@ -516,11 +501,8 @@ object ZQuery {
/**
* Constructs a query from an effect that returns a result.
*/
private def apply[R, E, A](step0: ZIO[R, Nothing, Result[R, E, A]]): ZQuery[R, E, A] =
new ZQuery[R, E, A] {
def step(cache: Cache): ZIO[R, Nothing, Result[R, E, A]] =
step0
}
private def apply[R, E, A](step: ZIO[(R, Cache), Nothing, Result[R, E, A]]): ZQuery[R, E, A] =
new ZQuery(step)

/**
* Partitions the elements of a collection using the specified function.
Expand Down
16 changes: 14 additions & 2 deletions core/src/test/scala/zquery/ZQuerySpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package zquery

import zio.console.Console
import zio.test.Assertion._
import zio.test.TestAspect.silent
import zio.test.TestAspect.{ nonFlaky, silent }
import zio.test._
import zio.test.environment.{ TestConsole, TestEnvironment }
import zio.{ console, Promise, ZIO }
Expand Down Expand Up @@ -69,7 +69,19 @@ object ZQuerySpec extends ZIOBaseSpec {
result <- ZQuery.collectAllPar(List.fill(100)(getAllUserNames)).run
log <- TestConsole.output
} yield assert(log)(hasSize(equalTo(2)))
} @@ TestAspect.nonFlaky
} @@ nonFlaky,
testM("stack safety") {
val effect = (0 to 100000)
.map(ZQuery.succeed(_))
.foldLeft(ZQuery.succeed(0)) { (query1, query2) =>
for {
acc <- query1
i <- query2
} yield acc + i
}
.run
assertM(effect)(equalTo(705082704))
}
) @@ silent

val userIds: List[Int] = (1 to 26).toList
Expand Down

0 comments on commit 2efe7cb

Please sign in to comment.