From 3f4d3dfe10f3919b7a6ff773f489b265d81da3c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Thu, 20 Sep 2018 14:05:08 +0200 Subject: [PATCH 01/11] Make DataSourceCache an abstract class --- shared/src/main/scala/cache.scala | 31 +++---- shared/src/main/scala/fetch.scala | 63 ++++++++++---- shared/src/test/scala/FetchTests.scala | 112 ++++++++++++------------- 3 files changed, 114 insertions(+), 92 deletions(-) diff --git a/shared/src/main/scala/cache.scala b/shared/src/main/scala/cache.scala index 95c524f0..87b9617c 100644 --- a/shared/src/main/scala/cache.scala +++ b/shared/src/main/scala/cache.scala @@ -28,10 +28,10 @@ final class DataSourceResult(val result: Any) extends AnyVal /** * A `Cache` trait so the users of the library can provide their own cache. */ -trait DataSourceCache { - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] - def insertMany[F[_]: ConcurrentEffect, I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache] = +abstract class DataSourceCache[F[_] : ConcurrentEffect] { + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] + def insertMany[I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache[F]] = vs.toList.foldLeftM(this)({ case (c, (i, v)) => c.insert(i, v, ds) }) @@ -40,30 +40,19 @@ trait DataSourceCache { /** * A cache that stores its elements in memory. */ -case class InMemoryCache(state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache { - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = +case class InMemoryCache[F[_] : ConcurrentEffect](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) } object InMemoryCache { - def empty: InMemoryCache = InMemoryCache(Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) + def empty[F[_] : ConcurrentEffect]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) - def from[I, A](results: ((String, I), A)*): InMemoryCache = - InMemoryCache(results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ + def from[F[_]: ConcurrentEffect, I, A](results: ((String, I), A)*): InMemoryCache[F] = + InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v)) })) - - implicit val inMemoryCacheMonoid: Monoid[InMemoryCache] = { - implicit val anySemigroup = new Semigroup[Any] { - def combine(a: Any, b: Any): Any = b - } - new Monoid[InMemoryCache] { - def empty: InMemoryCache = InMemoryCache.empty - def combine(c1: InMemoryCache, c2: InMemoryCache): InMemoryCache = - InMemoryCache(c1.state ++ c2.state) - } - } } diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index 875be580..adc0994e 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -269,16 +269,27 @@ object `package` { private[fetch] class FetchRunner[F[_]](private val dummy: Boolean = true) extends AnyVal { def apply[A]( - fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + fa: Fetch[F, A] )( implicit P: Par[F], C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] + ): F[A] = + apply(fa, InMemoryCache.empty[F]) + + def apply[A]( + fa: Fetch[F, A], + cache: DataSourceCache[F] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] ): F[A] = for { - cache <- Ref.of[F, DataSourceCache](cache) + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, None) } yield result } @@ -289,18 +300,29 @@ object `package` { def runEnv[F[_]]: FetchRunnerEnv[F] = new FetchRunnerEnv[F] private[fetch] class FetchRunnerEnv[F[_]](private val dummy: Boolean = true) extends AnyVal { + def apply[A]( + fa: Fetch[F, A] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] + ): F[(Env, A)] = + apply(fa, InMemoryCache.empty[F]) + def apply[A]( fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + cache: DataSourceCache[F] )( implicit P: Par[F], - C: ConcurrentEffect[F], + C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] ): F[(Env, A)] = for { env <- Ref.of[F, Env](FetchEnv()) - cache <- Ref.of[F, DataSourceCache](cache) + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, Some(env)) e <- env.get } yield (e, result) @@ -312,17 +334,28 @@ object `package` { def runCache[F[_]]: FetchRunnerCache[F] = new FetchRunnerCache[F] private[fetch] class FetchRunnerCache[F[_]](private val dummy: Boolean = true) extends AnyVal { + def apply[A]( + fa: Fetch[F, A] + )( + implicit + P: Par[F], + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] + ): F[(DataSourceCache[F], A)] = + apply(fa, InMemoryCache.empty[F]) + def apply[A]( fa: Fetch[F, A], - cache: DataSourceCache = InMemoryCache.empty + cache: DataSourceCache[F] )( implicit P: Par[F], - C: ConcurrentEffect[F], + C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] - ): F[(DataSourceCache, A)] = for { - cache <- Ref.of[F, DataSourceCache](cache) + ): F[(DataSourceCache[F], A)] = for { + cache <- Ref.of[F, DataSourceCache[F]](cache) result <- performRun(fa, cache, None) c <- cache.get } yield (c, result) @@ -332,7 +365,7 @@ object `package` { private def performRun[F[_], A]( fa: Fetch[F, A], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -360,7 +393,7 @@ object `package` { private def fetchRound[F[_], A]( rs: RequestMap[F], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -387,7 +420,7 @@ object `package` { private def runBlockedRequest[F[_], A]( blocked: BlockedRequest[F], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -405,7 +438,7 @@ object `package` { private def runFetchOne[F[_]]( q: FetchOne[Any, Any], putResult: FetchStatus => F[Unit], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit @@ -450,7 +483,7 @@ object `package` { private def runBatch[F[_]]( q: Batch[Any, Any], putResult: FetchStatus => F[Unit], - cache: Ref[F, DataSourceCache], + cache: Ref[F, DataSourceCache[F]], env: Option[Ref[F, Env]] )( implicit diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index b1c6cb00..20616be3 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -50,14 +50,14 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can map over Fetch values" in { - def fetch[F[_] : ConcurrentEffect : ContextShift]: Fetch[F, (Int)] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int)] = one(1).map(_ + 1) Fetch.run[IO](fetch).map(_ shouldEqual 2).unsafeToFuture } "We can use fetch inside a for comprehension" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = for { + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) } yield (o, t) @@ -66,7 +66,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can mix data sources" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = for { + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = for { o <- one(1) m <- many(3) } yield (o, m) @@ -75,7 +75,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as a cartesian" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.run[IO](fetch) @@ -83,7 +83,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as an applicative" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) + def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) val io = Fetch.run[IO](fetch) @@ -91,7 +91,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can traverse over a list with a Fetch for each element" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = for { + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) } yield ones @@ -102,7 +102,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can depend on previous computations of Fetch values" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { o <- one(1) t <- one(o + 1) } yield o + t @@ -113,7 +113,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetch into one" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.run[IO](fetch) @@ -122,7 +122,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetches with heterogeneous sources" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.run[IO](fetch) @@ -131,7 +131,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect the results of a traversal" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.run[IO](fetch) @@ -142,7 +142,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Execution model "Monadic bind implies sequential execution" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) @@ -159,7 +159,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are implicitly batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) @@ -176,7 +176,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequencing is implicitly batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.runEnv[IO](fetch) @@ -193,7 +193,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { "Identities are deduped when batched" in { val sources = List(1, 1, 2) - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = sources.traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -211,7 +211,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches implies parallel fetching" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -226,7 +226,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching calls batches only when it can" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -241,7 +241,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching performs requests to multiple data sources in parallel" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, ((Int, List[Int]), Int)] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, ((Int, List[Int]), Int)] = ((one(1), many(2)).tupled, anotherOne(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -256,7 +256,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches implies everything fetched concurrently" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect] = ( ( one(1), (one(2), one(3)).tupled @@ -277,18 +277,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches of the same type implies everything fetched in a single batch" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect] = ( (aFetch[F], anotherFetch[F]).tupled, one(3) // round 1 ).tupled @@ -306,18 +306,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of joined concurrent fetches is combined and batched" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = (aFetch[F], anotherFetch[F]).tupled + def fetch[F[_] : ConcurrentEffect] = (aFetch[F], anotherFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -332,21 +332,21 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of sequenced concurrent fetches is batched" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect] = for { a <- List(2, 3, 4).traverse(one[F]) // round 1 b <- List(0, 1).traverse(many[F]) // round 2 c <- List(9, 10, 11).traverse(one[F]) // round 3 } yield c - def anotherFetch[F[_] : ContextShift : ConcurrentEffect] = + def anotherFetch[F[_] : ConcurrentEffect] = for { a <- List(5, 6, 7).traverse(one[F]) // round 1 b <- List(2, 3).traverse(many[F]) // round 2 c <- List(12, 13, 14).traverse(one[F]) // round 3 } yield c - def fetch[F[_] : ContextShift : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect] = ( (aFetch[F], anotherFetch[F]).tupled, List(15, 16, 17).traverse(one[F]) // round 1 ).tupled @@ -364,7 +364,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches from the same data source implies batching" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -379,7 +379,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are run concurrently" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.runEnv[IO](fetch) @@ -394,7 +394,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are deduped" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(one(1), one(2), one(1)).sequence val io = Fetch.runEnv[IO](fetch) @@ -410,7 +410,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are batched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -425,7 +425,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Duplicated sources are only fetched once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = List(1, 2, 1).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -440,7 +440,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sources that can be fetched concurrently inside a for comprehension will be" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect] = for { v <- Fetch.pure[F, List[Int]](List(1, 2, 1)) result <- v.traverse(one[F]) @@ -458,13 +458,13 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Pure Fetches allow to explore further in the Fetch" in { - def aFetch[F[_] : ContextShift : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect] = for { a <- Fetch.pure[F, Int](2) b <- one[F](3) } yield a + b - def fetch[F[_] : ContextShift : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = (one(1), aFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -481,7 +481,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Caching "Elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -503,7 +503,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Batched elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect] = for { _ <- List(1, 2, 3).traverse(one[F]) aOne <- one(1) anotherOne <- one(1) @@ -514,7 +514,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val io = Fetch.runEnv(fetch) + val io = Fetch.runEnv[IO](fetch) io.map({ case (env, result) => { @@ -526,7 +526,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Elements that are cached won't be fetched" in { - def fetch[F[_] : ContextShift : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -537,7 +537,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = InMemoryCache.from( + def cache[F[_] : ConcurrentEffect] = InMemoryCache.from[F, One, Int]( (OneSource.name, One(1)) -> 1, (OneSource.name, One(2)) -> 2, (OneSource.name, One(3)) -> 3 @@ -554,16 +554,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } - case class ForgetfulCache() extends DataSourceCache { - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = + case class ForgetfulCache[F[_] : ConcurrentEffect]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(this) - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None) } + def forgetfulCache[F[_] : ConcurrentEffect] = ForgetfulCache[F]() + "We can use a custom cache that discards elements" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = for { + def fetch[F[_] : ConcurrentEffect] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -573,8 +575,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = ForgetfulCache() - val io = Fetch.runEnv[IO](fetch, cache) + val io = Fetch.runEnv[IO](fetch, forgetfulCache) io.map({ case (env, result) => { @@ -586,7 +587,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use a custom cache that discards elements together with concurrent fetches" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = for { + def fetch[F[_] : ConcurrentEffect] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -597,8 +598,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - val cache = ForgetfulCache() - val io = Fetch.runEnv[IO](fetch, cache) + val io = Fetch.runEnv[IO](fetch, forgetfulCache) io.map({ case (env, result) => { @@ -629,7 +629,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Data sources with errors won't fail if they're cached" in { - val cache = InMemoryCache.from( + def cache[F[_] : ConcurrentEffect] = InMemoryCache.from[F, Never, Int]( (NeverSource.name, Never()) -> 1 ) val io = Fetch.run[IO](never, cache) @@ -637,7 +637,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { io.map(_ shouldEqual 1).unsafeToFuture } - def fetchError[F[_] : ConcurrentEffect : ContextShift]: Fetch[F, Int] = + def fetchError[F[_] : ConcurrentEffect]: Fetch[F, Int] = Fetch.error(AnException()) "We can lift errors to Fetch" in { @@ -658,7 +658,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect] = (fetchError, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -670,7 +670,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect] = (many(3), fetchError).tupled val io = Fetch.run[IO](fetch) @@ -682,7 +682,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect] = (never, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -694,7 +694,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect] = (many(3), never).tupled val io = Fetch.run[IO](fetch) @@ -706,7 +706,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there are multiple failing identities the fetch will fail" in { - def fetch[F[_] : ConcurrentEffect : ContextShift] = + def fetch[F[_] : ConcurrentEffect] = (never, never).tupled val io = Fetch.run[IO](fetch) From 022098d5bf404a029ce757ff2a31486a635b0f0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Thu, 20 Sep 2018 17:22:45 +0200 Subject: [PATCH 02/11] Clean tests --- .../src/test/scala/FetchAsyncQueryTests.scala | 12 +- .../src/test/scala/FetchReportingTests.scala | 21 ++-- shared/src/test/scala/FetchTests.scala | 114 +++++++++--------- shared/src/test/scala/TestHelper.scala | 8 +- 4 files changed, 78 insertions(+), 77 deletions(-) diff --git a/shared/src/test/scala/FetchAsyncQueryTests.scala b/shared/src/test/scala/FetchAsyncQueryTests.scala index b8d6a866..5678c312 100644 --- a/shared/src/test/scala/FetchAsyncQueryTests.scala +++ b/shared/src/test/scala/FetchAsyncQueryTests.scala @@ -33,7 +33,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) "We can interpret an async fetch into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Article] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Article] = article(1) val io = Fetch.run[IO](fetch) @@ -42,7 +42,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can combine several async data sources and interpret a fetch into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Article, Author)] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Article, Author)] = for { art <- article(1) author <- author(art) } yield (art, author) @@ -53,7 +53,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can use combinators in a for comprehension and interpret a fetch from async sources into an IO" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Article]] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Article]] = for { articles <- List(1, 1, 2).traverse(article[F]) } yield articles @@ -67,7 +67,7 @@ class FetchAsyncQueryTests extends AsyncFreeSpec with Matchers { } "We can use combinators and multiple sources in a for comprehension and interpret a fetch from async sources into an IO" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { articles <- List(1, 1, 2).traverse(article[F]) authors <- articles.traverse(author[F]) } yield (articles, authors) @@ -108,7 +108,7 @@ object DataSources { }) } - def article[F[_] : ConcurrentEffect](id: Int): Fetch[F, Article] = + def article[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Article] = Fetch(ArticleId(id), ArticleAsync) case class AuthorId(id: Int) @@ -127,6 +127,6 @@ object DataSources { })) } - def author[F[_] : ConcurrentEffect](a: Article): Fetch[F, Author] = + def author[F[_] : ConcurrentEffect : Par](a: Article): Fetch[F, Author] = Fetch(AuthorId(a.author), AuthorAsync) } diff --git a/shared/src/test/scala/FetchReportingTests.scala b/shared/src/test/scala/FetchReportingTests.scala index 8a593e34..0d1997f4 100644 --- a/shared/src/test/scala/FetchReportingTests.scala +++ b/shared/src/test/scala/FetchReportingTests.scala @@ -22,6 +22,7 @@ import org.scalatest.{AsyncFreeSpec, Matchers} import fetch._ +import cats.temp.par._ import cats.effect._ import cats.instances.list._ import cats.syntax.all._ @@ -34,7 +35,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { implicit val cs: ContextShift[IO] = IO.contextShift(executionContext) "Plain values have no rounds of execution" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = Fetch.pure[F, Int](42) val io = Fetch.runEnv[IO](fetch) @@ -45,7 +46,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches are executed in one round" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = one(1) val io = Fetch.runEnv[IO](fetch) @@ -56,7 +57,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches are executed in one round per binding in a for comprehension" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { o <- one(1) t <- one(2) } yield (o, t) @@ -69,7 +70,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches for different data sources are executed in multiple rounds if they are in a for comprehension" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { o <- one(1) m <- many(3) } yield (o, m) @@ -82,7 +83,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches combined with cartesian are run in one round" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -93,7 +94,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "Single fetches combined with traverse are run in one round" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { manies <- many(3) // round 1 ones <- manies.traverse(one[F]) // round 2 } yield ones @@ -106,7 +107,7 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "The product of two fetches from the same data source implies batching" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (one(1), one(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -117,19 +118,19 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches of the same type implies everything fetched in batches" in { - def aFetch[F[_] : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(1) // round 1 b <- one(2) // round 2 c <- one(3) } yield c - def anotherFetch[F[_] : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(2) // round 1 m <- many(4) // round 2 c <- one(3) } yield c - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = ((aFetch, anotherFetch).tupled, one(3)).tupled val io = Fetch.runEnv[IO](fetch) diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 20616be3..64f98458 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -39,7 +39,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Fetch ops "We can lift plain values to Fetch" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch.pure[F, Int](42) Fetch.run[IO](fetch).map(_ shouldEqual 42).unsafeToFuture @@ -50,14 +50,14 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can map over Fetch values" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int)] = one(1).map(_ + 1) Fetch.run[IO](fetch).map(_ shouldEqual 2).unsafeToFuture } "We can use fetch inside a for comprehension" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) } yield (o, t) @@ -66,7 +66,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can mix data sources" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = for { o <- one(1) m <- many(3) } yield (o, m) @@ -75,7 +75,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as a cartesian" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.run[IO](fetch) @@ -83,7 +83,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use Fetch as an applicative" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = (one(1), one(2), one(3)).mapN(_ + _ + _) val io = Fetch.run[IO](fetch) @@ -91,7 +91,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can traverse over a list with a Fetch for each element" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) } yield ones @@ -102,7 +102,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can depend on previous computations of Fetch values" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { o <- one(1) t <- one(o + 1) } yield o + t @@ -113,7 +113,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetch into one" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.run[IO](fetch) @@ -122,7 +122,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect a list of Fetches with heterogeneous sources" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.run[IO](fetch) @@ -131,7 +131,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can collect the results of a traversal" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.run[IO](fetch) @@ -142,7 +142,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Execution model "Monadic bind implies sequential execution" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = for { o <- one(1) t <- one(2) @@ -159,7 +159,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are implicitly batched" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = for { manies <- many(3) ones <- manies.traverse(one[F]) @@ -176,7 +176,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequencing is implicitly batched" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3)).sequence val io = Fetch.runEnv[IO](fetch) @@ -193,7 +193,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { "Identities are deduped when batched" in { val sources = List(1, 1, 2) - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = sources.traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -211,7 +211,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches implies parallel fetching" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -226,7 +226,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching calls batches only when it can" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, List[Int])] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, List[Int])] = (one(1), many(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -241,7 +241,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Concurrent fetching performs requests to multiple data sources in parallel" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, ((Int, List[Int]), Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, ((Int, List[Int]), Int)] = ((one(1), many(2)).tupled, anotherOne(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -256,7 +256,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches implies everything fetched concurrently" in { - def fetch[F[_] : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( ( one(1), (one(2), one(3)).tupled @@ -277,18 +277,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of concurrent fetches of the same type implies everything fetched in a single batch" in { - def aFetch[F[_] : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( (aFetch[F], anotherFetch[F]).tupled, one(3) // round 1 ).tupled @@ -306,18 +306,18 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of joined concurrent fetches is combined and batched" in { - def aFetch[F[_] : ConcurrentEffect] = for { + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(1) // round 1 b <- many(1) // round 2 c <- one(1) } yield c - def anotherFetch[F[_] : ConcurrentEffect] = for { + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- one(2) // round 1 m <- many(2) // round 2 c <- one(2) } yield c - def fetch[F[_] : ConcurrentEffect] = (aFetch[F], anotherFetch[F]).tupled + def fetch[F[_] : ConcurrentEffect : Par] = (aFetch[F], anotherFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -332,21 +332,21 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Every level of sequenced concurrent fetches is batched" in { - def aFetch[F[_] : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- List(2, 3, 4).traverse(one[F]) // round 1 b <- List(0, 1).traverse(many[F]) // round 2 c <- List(9, 10, 11).traverse(one[F]) // round 3 } yield c - def anotherFetch[F[_] : ConcurrentEffect] = + def anotherFetch[F[_] : ConcurrentEffect : Par] = for { a <- List(5, 6, 7).traverse(one[F]) // round 1 b <- List(2, 3).traverse(many[F]) // round 2 c <- List(12, 13, 14).traverse(one[F]) // round 3 } yield c - def fetch[F[_] : ConcurrentEffect] = ( + def fetch[F[_] : ConcurrentEffect : Par] = ( (aFetch[F], anotherFetch[F]).tupled, List(15, 16, 17).traverse(one[F]) // round 1 ).tupled @@ -364,7 +364,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "The product of two fetches from the same data source implies batching" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = (one(1), one(3)).tupled val io = Fetch.runEnv[IO](fetch) @@ -379,7 +379,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are run concurrently" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(3), anotherOne(4), anotherOne(5)).sequence val io = Fetch.runEnv[IO](fetch) @@ -394,7 +394,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sequenced fetches are deduped" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(one(1), one(2), one(1)).sequence val io = Fetch.runEnv[IO](fetch) @@ -410,7 +410,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Traversals are batched" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -425,7 +425,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Duplicated sources are only fetched once" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 1).traverse(one[F]) val io = Fetch.runEnv[IO](fetch) @@ -440,7 +440,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Sources that can be fetched concurrently inside a for comprehension will be" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = for { v <- Fetch.pure[F, List[Int]](List(1, 2, 1)) result <- v.traverse(one[F]) @@ -458,13 +458,13 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Pure Fetches allow to explore further in the Fetch" in { - def aFetch[F[_] : ConcurrentEffect] = + def aFetch[F[_] : ConcurrentEffect : Par] = for { a <- Fetch.pure[F, Int](2) b <- one[F](3) } yield a + b - def fetch[F[_] : ConcurrentEffect]: Fetch[F, (Int, Int)] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, (Int, Int)] = (one(1), aFetch[F]).tupled val io = Fetch.runEnv[IO](fetch) @@ -481,7 +481,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { // Caching "Elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -503,7 +503,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Batched elements are cached and thus not fetched more than once" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { _ <- List(1, 2, 3).traverse(one[F]) aOne <- one(1) anotherOne <- one(1) @@ -526,7 +526,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Elements that are cached won't be fetched" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -537,7 +537,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { _ <- one(1) } yield aOne + anotherOne - def cache[F[_] : ConcurrentEffect] = InMemoryCache.from[F, One, Int]( + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( (OneSource.name, One(1)) -> 1, (OneSource.name, One(2)) -> 2, (OneSource.name, One(3)) -> 3 @@ -554,7 +554,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } - case class ForgetfulCache[F[_] : ConcurrentEffect]() extends DataSourceCache[F] { + case class ForgetfulCache[F[_] : ConcurrentEffect : Par]() extends DataSourceCache[F] { def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(this) @@ -562,10 +562,10 @@ class FetchTests extends AsyncFreeSpec with Matchers { Applicative[F].pure(None) } - def forgetfulCache[F[_] : ConcurrentEffect] = ForgetfulCache[F]() + def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() "We can use a custom cache that discards elements" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -587,7 +587,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can use a custom cache that discards elements together with concurrent fetches" in { - def fetch[F[_] : ConcurrentEffect] = for { + def fetch[F[_] : ConcurrentEffect : Par] = for { aOne <- one(1) anotherOne <- one(1) _ <- one(1) @@ -629,7 +629,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "Data sources with errors won't fail if they're cached" in { - def cache[F[_] : ConcurrentEffect] = InMemoryCache.from[F, Never, Int]( + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Never, Int]( (NeverSource.name, Never()) -> 1 ) val io = Fetch.run[IO](never, cache) @@ -637,7 +637,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { io.map(_ shouldEqual 1).unsafeToFuture } - def fetchError[F[_] : ConcurrentEffect]: Fetch[F, Int] = + def fetchError[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch.error(AnException()) "We can lift errors to Fetch" in { @@ -658,7 +658,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (fetchError, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -670,7 +670,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If a fetch fails in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (many(3), fetchError).tupled val io = Fetch.run[IO](fetch) @@ -682,7 +682,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the left hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (never, many(3)).tupled val io = Fetch.run[IO](fetch) @@ -694,7 +694,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there is a missing identity in the right hand of a product the product will fail" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (many(3), never).tupled val io = Fetch.run[IO](fetch) @@ -706,7 +706,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "If there are multiple failing identities the fetch will fail" in { - def fetch[F[_] : ConcurrentEffect] = + def fetch[F[_] : ConcurrentEffect : Par] = (never, never).tupled val io = Fetch.run[IO](fetch) @@ -733,25 +733,25 @@ class FetchTests extends AsyncFreeSpec with Matchers { Applicative[F].pure(Option(id.id)) } - def maybeOpt[F[_] : ConcurrentEffect](id: Int): Fetch[F, Option[Int]] = + def maybeOpt[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Option[Int]] = Fetch.optional(MaybeMissing(id), MaybeMissingSource) "We can run optional fetches" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Option[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Option[Int]] = maybeOpt(1) Fetch.run[IO](fetch).map(_ shouldEqual Some(1)).unsafeToFuture } "We can run optional fetches with traverse" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = List(1, 2, 3).traverse(maybeOpt[F]).map(_.flatten) Fetch.run[IO](fetch).map(_ shouldEqual List(1, 3)).unsafeToFuture } "We can run optional fetches with other data sources" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, List[Int]] = { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, List[Int]] = { val ones = List(1, 2, 3).traverse(one[F]) val maybes = List(1, 2, 3).traverse(maybeOpt[F]) (ones, maybes).mapN { case (os, ms) => os ++ ms.flatten } @@ -761,7 +761,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can make fetches that depend on optional fetch results when they aren't defined" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { maybe <- maybeOpt(2) result <- maybe.fold(Fetch.pure(42))(i => one(i)) } yield result @@ -770,7 +770,7 @@ class FetchTests extends AsyncFreeSpec with Matchers { } "We can make fetches that depend on optional fetch results when they are defined" in { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { + def fetch[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = for { maybe <- maybeOpt(1) result <- maybe.fold(Fetch.pure(42))(i => one(i)) } yield result diff --git a/shared/src/test/scala/TestHelper.scala b/shared/src/test/scala/TestHelper.scala index c59d0ade..8aeb321e 100644 --- a/shared/src/test/scala/TestHelper.scala +++ b/shared/src/test/scala/TestHelper.scala @@ -45,7 +45,7 @@ object TestHelper { ) } - def one[F[_] : ConcurrentEffect](id: Int): Fetch[F, Int] = + def one[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Int] = Fetch(One(id), OneSource) case class Many(n: Int) @@ -57,7 +57,7 @@ object TestHelper { Applicative[F].pure(Option(0 until id.n toList)) } - def many[F[_] : ConcurrentEffect](id: Int): Fetch[F, List[Int]] = + def many[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, List[Int]] = Fetch(Many(id), ManySource) case class AnotherOne(id: Int) @@ -74,7 +74,7 @@ object TestHelper { ) } - def anotherOne[F[_] : ConcurrentEffect](id: Int): Fetch[F, Int] = + def anotherOne[F[_] : ConcurrentEffect : Par](id: Int): Fetch[F, Int] = Fetch(AnotherOne(id), AnotheroneSource) case class Never() @@ -86,7 +86,7 @@ object TestHelper { Applicative[F].pure(None : Option[Int]) } - def never[F[_] : ConcurrentEffect]: Fetch[F, Int] = + def never[F[_] : ConcurrentEffect : Par]: Fetch[F, Int] = Fetch(Never(), NeverSource) // Check Env From 15bf01894118954223663d61386c7c1c6cf6ce6b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Thu, 20 Sep 2018 17:50:11 +0200 Subject: [PATCH 03/11] Update docs --- docs/src/main/tut/docs.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index d4f9de20..9fde0567 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -481,7 +481,7 @@ We'll be using the default in-memory cache, prepopulated with some data. The cac is calculated with the `DataSource`'s `name` method and the request identity. ```tut:silent -val cache = InMemoryCache.from( +def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, Int, User]( (UserSource.name, 1) -> User(1, "@dialelo") ) ``` @@ -537,9 +537,12 @@ Let's implement a cache that forgets everything we store in it. ```tut:silent import cats.Applicative -final case class ForgetfulCache() extends DataSourceCache { - def insert[F[_] : ConcurrentEffect, I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache] = Applicative[F].pure(this) - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None) +final case class ForgetfulCache[F[_] : ConcurrentEffect : Par]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = + Applicative[F].pure(this) + + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + Applicative[F].pure(None) } ``` From 79a933a1c643cbf38a2f22fbda7741c316a04455 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Thu, 20 Sep 2018 17:50:19 +0200 Subject: [PATCH 04/11] Whitespace --- shared/src/main/scala/fetch.scala | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index adc0994e..f2b272ca 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -285,7 +285,7 @@ object `package` { )( implicit P: Par[F], - C: ConcurrentEffect[F], + C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] ): F[A] = for { @@ -305,9 +305,9 @@ object `package` { )( implicit P: Par[F], - C: ConcurrentEffect[F], - CS: ContextShift[F], - T: Timer[F] + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] ): F[(Env, A)] = apply(fa, InMemoryCache.empty[F]) @@ -317,7 +317,7 @@ object `package` { )( implicit P: Par[F], - C: ConcurrentEffect[F], + C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] ): F[(Env, A)] = for { @@ -339,19 +339,19 @@ object `package` { )( implicit P: Par[F], - C: ConcurrentEffect[F], - CS: ContextShift[F], - T: Timer[F] + C: ConcurrentEffect[F], + CS: ContextShift[F], + T: Timer[F] ): F[(DataSourceCache[F], A)] = apply(fa, InMemoryCache.empty[F]) - + def apply[A]( fa: Fetch[F, A], cache: DataSourceCache[F] )( implicit P: Par[F], - C: ConcurrentEffect[F], + C: ConcurrentEffect[F], CS: ContextShift[F], T: Timer[F] ): F[(DataSourceCache[F], A)] = for { From 62e30a208a79d07f33ff0c604357bad7724ca8bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Fri, 21 Sep 2018 00:52:18 +0200 Subject: [PATCH 05/11] Trait instead of abstract class --- shared/src/main/scala/cache.scala | 50 +++++++++++++------ shared/src/main/scala/fetch.scala | 2 +- .../src/test/scala/FetchReportingTests.scala | 15 +++--- shared/src/test/scala/FetchTests.scala | 16 +++--- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/shared/src/main/scala/cache.scala b/shared/src/main/scala/cache.scala index 87b9617c..dbd596ba 100644 --- a/shared/src/main/scala/cache.scala +++ b/shared/src/main/scala/cache.scala @@ -17,9 +17,11 @@ package fetch import cats._ +import cats.effect._ +import cats.data.NonEmptyList import cats.instances.list._ import cats.syntax.all._ -import cats.effect._ +import cats.temp.par._ final class DataSourceName(val name: String) extends AnyVal final class DataSourceId(val id: Any) extends AnyVal @@ -28,30 +30,48 @@ final class DataSourceResult(val result: Any) extends AnyVal /** * A `Cache` trait so the users of the library can provide their own cache. */ -abstract class DataSourceCache[F[_] : ConcurrentEffect] { - def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] - def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] - def insertMany[I, A](vs: Map[I, A], ds: DataSource[I, A]): F[DataSourceCache[F]] = - vs.toList.foldLeftM(this)({ - case (c, (i, v)) => c.insert(i, v, ds) - }) +trait DataSourceCache[F[_]] { + def lookup[I, A](i: I, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[Option[A]] + + def insert[I, A](i: I, v: A, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[DataSourceCache[F]] + + // def delete[I, A](i: I, v: A, ds: DataSource[I, A])( + // implicit C: ConcurrentEffect[F], P: Par[F] + // ): F[Unit] + + def bulkInsert[I, A](vs: List[(I, A)], ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[DataSourceCache[F]] = { + vs.foldLeftM(this){ + case (acc, (i, v)) => + acc.insert(i, v, ds) + } + } } /** * A cache that stores its elements in memory. */ -case class InMemoryCache[F[_] : ConcurrentEffect](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { - def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = - Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) +case class InMemoryCache[F[_]](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { + def lookup[I, A](i: I, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[Option[A]] = + C.pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) - def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = - Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) + def insert[I, A](i: I, v: A, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[DataSourceCache[F]] = + C.pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) } object InMemoryCache { - def empty[F[_] : ConcurrentEffect]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) + def empty[F[_] : ConcurrentEffect : Par]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) - def from[F[_]: ConcurrentEffect, I, A](results: ((String, I), A)*): InMemoryCache[F] = + def from[F[_]: ConcurrentEffect : Par, I, A](results: ((String, I), A)*): InMemoryCache[F] = InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v)) })) diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index f2b272ca..39a5afed 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -526,7 +526,7 @@ object `package` { endTime <- T.clock.monotonic(MILLISECONDS) resultMap = combineBatchResults(batchedRequest.results, cachedResults) - updatedCache <- c.insertMany(batchedRequest.results, request.ds) + updatedCache <- c.bulkInsert(batchedRequest.results.toList, request.ds) _ <- cache.set(updatedCache) result <- putResult(FetchDone[Map[Any, Any]](resultMap)) diff --git a/shared/src/test/scala/FetchReportingTests.scala b/shared/src/test/scala/FetchReportingTests.scala index 0d1997f4..1a829d15 100644 --- a/shared/src/test/scala/FetchReportingTests.scala +++ b/shared/src/test/scala/FetchReportingTests.scala @@ -119,15 +119,15 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { "The product of concurrent fetches of the same type implies everything fetched in batches" in { def aFetch[F[_] : ConcurrentEffect : Par] = for { - a <- one(1) // round 1 - b <- one(2) // round 2 - c <- one(3) + a <- one(1) // round 1 (batched) + b <- one(2) // round 2 (cached) + c <- one(3) // round 3 (deduplicated) } yield c def anotherFetch[F[_] : ConcurrentEffect : Par] = for { - a <- one(2) // round 1 + a <- one(2) // round 1 (batched) m <- many(4) // round 2 - c <- one(3) + c <- one(3) // round 3 (deduplicated) } yield c def fetch[F[_] : ConcurrentEffect : Par] = @@ -136,7 +136,10 @@ class FetchReportingTests extends AsyncFreeSpec with Matchers { val io = Fetch.runEnv[IO](fetch) io.map({ - case (env, result) => env.rounds.size shouldEqual 2 + case (env, result) => + env.rounds.size shouldEqual 2 + totalBatches(env.rounds) shouldEqual 1 + totalFetched(env.rounds) shouldEqual 3 + 1 }).unsafeToFuture } } diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 64f98458..29b14410 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -554,12 +554,16 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } - case class ForgetfulCache[F[_] : ConcurrentEffect : Par]() extends DataSourceCache[F] { - def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = - Applicative[F].pure(this) - - def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = - Applicative[F].pure(None) + case class ForgetfulCache[F[_]]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[DataSourceCache[F]] = + C.pure(this) + + def lookup[I, A](i: I, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[Option[A]] = + C.pure(None) } def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() From 9e08d604a79e9c80b0abfbe009ba64cb0d1632bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Fri, 21 Sep 2018 01:07:58 +0200 Subject: [PATCH 06/11] Update docs --- docs/src/main/tut/docs.md | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index 9fde0567..a4a59de9 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -526,7 +526,7 @@ The default cache is implemented as an immutable in-memory map, but users are fr There is no need for the cache to be mutable since fetch executions run in an interpreter that uses the state monad. Note that the `update` method in the `DataSourceCache` trait yields a new, updated cache. ```scala -trait DataSourceCache { +trait DataSourceCache[F[_]] { def insert[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache] def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] } @@ -535,15 +535,19 @@ trait DataSourceCache { Let's implement a cache that forgets everything we store in it. ```tut:silent -import cats.Applicative +case class ForgetfulCache[F[_]]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[DataSourceCache[F]] = + C.pure(this) -final case class ForgetfulCache[F[_] : ConcurrentEffect : Par]() extends DataSourceCache[F] { - def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = - Applicative[F].pure(this) - - def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = - Applicative[F].pure(None) + def lookup[I, A](i: I, ds: DataSource[I, A])( + implicit C: ConcurrentEffect[F], P: Par[F] + ): F[Option[A]] = + C.pure(None) } + +def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() ``` We can now use our implementation of the cache when running a fetch. @@ -554,7 +558,7 @@ def fetchSameTwice[F[_] : ConcurrentEffect]: Fetch[F, (User, User)] = for { another <- getUser(1) } yield (one, another) -Fetch.run[IO](fetchSameTwice, ForgetfulCache()).unsafeRunTimed(5.seconds) +Fetch.run[IO](fetchSameTwice, forgetfulCache).unsafeRunTimed(5.seconds) ``` # Batching From a5cf2c58f55117c23d5ccfeabf5f17d90689f927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Tue, 25 Sep 2018 12:13:32 +0200 Subject: [PATCH 07/11] Add a few more tests for Fetch#run* methods --- shared/src/test/scala/FetchTests.scala | 75 +++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 2 deletions(-) diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 29b14410..524cabd2 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -554,16 +554,87 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } + "Fetch#run accepts a cache as the second (optional) parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( + (OneSource.name, One(1)) -> 1, + (OneSource.name, One(2)) -> 2, + (OneSource.name, One(3)) -> 3 + ) + + val io = Fetch.run[IO](fetch, cache) + + io.map(_ shouldEqual 2).unsafeToFuture + } + + "Fetch#runCache accepts a cache as the second (optional) parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + def cache[F[_] : ConcurrentEffect : Par] = InMemoryCache.from[F, One, Int]( + (OneSource.name, One(1)) -> 1, + (OneSource.name, One(2)) -> 2, + (OneSource.name, One(3)) -> 3 + ) + + val io = Fetch.runCache[IO](fetch, cache) + + io.map({ + case (c, result) => { + result shouldEqual 2 + } + }).unsafeToFuture + } + + "Fetch#runCache works without the optional cache parameter" in { + def fetch[F[_] : ConcurrentEffect : Par] = for { + aOne <- one(1) + anotherOne <- one(1) + _ <- one(1) + _ <- one(2) + _ <- one(3) + _ <- one(1) + _ <- List(1, 2, 3).traverse(one[F]) + _ <- one(1) + } yield aOne + anotherOne + + val io = Fetch.runCache[IO](fetch) + + io.map({ + case (c, result) => { + result shouldEqual 2 + } + }).unsafeToFuture + } + case class ForgetfulCache[F[_]]() extends DataSourceCache[F] { def insert[I, A](i: I, v: A, ds: DataSource[I, A])( implicit C: ConcurrentEffect[F], P: Par[F] ): F[DataSourceCache[F]] = - C.pure(this) + Applicative[F].pure(this) def lookup[I, A](i: I, ds: DataSource[I, A])( implicit C: ConcurrentEffect[F], P: Par[F] ): F[Option[A]] = - C.pure(None) + Applicative[F].pure(None) } def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() From 455e005a1ed9375adb5395de11c6c659de96e55e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Tue, 25 Sep 2018 13:41:10 +0200 Subject: [PATCH 08/11] Remove contraints from DataSourceCache trait methods --- shared/src/main/scala/cache.scala | 32 +++++++++----------------- shared/src/test/scala/FetchTests.scala | 10 +++----- 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/shared/src/main/scala/cache.scala b/shared/src/main/scala/cache.scala index dbd596ba..ae1916c2 100644 --- a/shared/src/main/scala/cache.scala +++ b/shared/src/main/scala/cache.scala @@ -31,20 +31,14 @@ final class DataSourceResult(val result: Any) extends AnyVal * A `Cache` trait so the users of the library can provide their own cache. */ trait DataSourceCache[F[_]] { - def lookup[I, A](i: I, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[Option[A]] + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] - def insert[I, A](i: I, v: A, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[DataSourceCache[F]] + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] - // def delete[I, A](i: I, v: A, ds: DataSource[I, A])( - // implicit C: ConcurrentEffect[F], P: Par[F] - // ): F[Unit] + // def delete[I, A](i: I, v: A, ds: DataSource[I, A]): F[Unit] def bulkInsert[I, A](vs: List[(I, A)], ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] + implicit M: Monad[F] ): F[DataSourceCache[F]] = { vs.foldLeftM(this){ case (acc, (i, v)) => @@ -56,22 +50,18 @@ trait DataSourceCache[F[_]] { /** * A cache that stores its elements in memory. */ -case class InMemoryCache[F[_]](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { - def lookup[I, A](i: I, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[Option[A]] = - C.pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) +case class InMemoryCache[F[_] : Monad](state: Map[(DataSourceName, DataSourceId), DataSourceResult]) extends DataSourceCache[F] { + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + Applicative[F].pure(state.get((new DataSourceName(ds.name), new DataSourceId(i))).map(_.result.asInstanceOf[A])) - def insert[I, A](i: I, v: A, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[DataSourceCache[F]] = - C.pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = + Applicative[F].pure(copy(state = state.updated((new DataSourceName(ds.name), new DataSourceId(i)), new DataSourceResult(v)))) } object InMemoryCache { - def empty[F[_] : ConcurrentEffect : Par]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) + def empty[F[_] : Monad]: InMemoryCache[F] = InMemoryCache[F](Map.empty[(DataSourceName, DataSourceId), DataSourceResult]) - def from[F[_]: ConcurrentEffect : Par, I, A](results: ((String, I), A)*): InMemoryCache[F] = + def from[F[_]: Monad, I, A](results: ((String, I), A)*): InMemoryCache[F] = InMemoryCache[F](results.foldLeft(Map.empty[(DataSourceName, DataSourceId), DataSourceResult])({ case (acc, ((s, i), v)) => acc.updated((new DataSourceName(s), new DataSourceId(i)), new DataSourceResult(v)) })) diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 524cabd2..095e1030 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -625,15 +625,11 @@ class FetchTests extends AsyncFreeSpec with Matchers { }).unsafeToFuture } - case class ForgetfulCache[F[_]]() extends DataSourceCache[F] { - def insert[I, A](i: I, v: A, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[DataSourceCache[F]] = + case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(this) - def lookup[I, A](i: I, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[Option[A]] = + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = Applicative[F].pure(None) } From 11de8d3030749cde8f9cc6706448f615c28ec5a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Tue, 25 Sep 2018 13:46:42 +0200 Subject: [PATCH 09/11] Update docs --- docs/src/main/tut/docs.md | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index a4a59de9..c569f57e 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -535,16 +535,12 @@ trait DataSourceCache[F[_]] { Let's implement a cache that forgets everything we store in it. ```tut:silent -case class ForgetfulCache[F[_]]() extends DataSourceCache[F] { - def insert[I, A](i: I, v: A, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[DataSourceCache[F]] = - C.pure(this) - - def lookup[I, A](i: I, ds: DataSource[I, A])( - implicit C: ConcurrentEffect[F], P: Par[F] - ): F[Option[A]] = - C.pure(None) +case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] { + def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = + Applicative[F].pure(this) + + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] = + Applicative[F].pure(None) } def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() @@ -553,7 +549,7 @@ def forgetfulCache[F[_] : ConcurrentEffect : Par] = ForgetfulCache[F]() We can now use our implementation of the cache when running a fetch. ```tut:book -def fetchSameTwice[F[_] : ConcurrentEffect]: Fetch[F, (User, User)] = for { +def fetchSameTwice[F[_] : ConcurrentEffect : Par]: Fetch[F, (User, User)] = for { one <- getUser(1) another <- getUser(1) } yield (one, another) From 060c3d5a3d5f26e350ba0c5e49f5373b9b3f8ce1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Tue, 25 Sep 2018 15:16:08 +0200 Subject: [PATCH 10/11] Add missing import --- docs/src/main/tut/docs.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index c569f57e..65efe92a 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -535,6 +535,8 @@ trait DataSourceCache[F[_]] { Let's implement a cache that forgets everything we store in it. ```tut:silent +import cats.{Applicative, Monad} + case class ForgetfulCache[F[_] : Monad]() extends DataSourceCache[F] { def insert[I, A](i: I, v: A, ds: DataSource[I, A]): F[DataSourceCache[F]] = Applicative[F].pure(this) From 3c19563992f1daac676ca9fa1f780425977f8fe3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alejandro=20G=C3=B3mez?= Date: Tue, 25 Sep 2018 16:25:27 +0200 Subject: [PATCH 11/11] Fix DataSourceCache trait in docs --- docs/src/main/tut/docs.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/src/main/tut/docs.md b/docs/src/main/tut/docs.md index 65efe92a..b679945a 100644 --- a/docs/src/main/tut/docs.md +++ b/docs/src/main/tut/docs.md @@ -527,8 +527,8 @@ There is no need for the cache to be mutable since fetch executions run in an in ```scala trait DataSourceCache[F[_]] { - def insert[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache] - def lookup[F[_] : ConcurrentEffect, I, A](i: I, ds: DataSource[I, A]): F[Option[A]] + def insert[I, A](i: I, ds: DataSource[I, A], v: A): DataSourceIdentity, v: A): F[DataSourceCache[F]] + def lookup[I, A](i: I, ds: DataSource[I, A]): F[Option[A]] } ```