diff --git a/examples/src/test/scala/DoobieExample.scala b/examples/src/test/scala/DoobieExample.scala index c89a8a59..8119897a 100644 --- a/examples/src/test/scala/DoobieExample.scala +++ b/examples/src/test/scala/DoobieExample.scala @@ -42,7 +42,7 @@ object DatabaseExample { def fetchByIds(ids: NonEmptyList[AuthorId]): ConnectionIO[List[Author]] = { val q = fr"SELECT * FROM author WHERE" ++ Fragments.in(fr"id", ids) - q.query[Author].list + q.query[Author].to[List] } } @@ -64,12 +64,12 @@ object DatabaseExample { case (name, id) => Author(id + 1, name) } - def createTransactor[F[_]: ConcurrentEffect] = - H2Transactor[F]("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", "sa", "") + def createTransactor[F[_]: Async] = + H2Transactor.newH2Transactor[F]("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1", "sa", "") - def transactor[F[_]: ConcurrentEffect]: F[Transactor[F]] = + def transactor[F[_]: Async]: F[Transactor[F]] = for { - xa <- createTransactor + xa <- createTransactor[F] _ <- (dropTable *> createTable *> authors.traverse(addAuthor)).transact(xa) } yield xa } @@ -77,25 +77,27 @@ object DatabaseExample { object Authors extends Data[AuthorId, Author] { def name = "Authors" - def db[F[_]: ConcurrentEffect]: DataSource[F, AuthorId, Author] = + def db[F[_]: Concurrent]: DataSource[F, AuthorId, Author] = new DataSource[F, AuthorId, Author] { def data = Authors - override def CF = ConcurrentEffect[F] + override def CF = Concurrent[F] override def fetch(id: AuthorId): F[Option[Author]] = - Database.transactor + Database + .transactor[F] .flatMap(Queries.fetchById(id).transact(_)) override def batch(ids: NonEmptyList[AuthorId]): F[Map[AuthorId, Author]] = - Database.transactor + Database + .transactor[F] .flatMap(Queries.fetchByIds(ids).transact(_)) .map { authors => authors.map(a => AuthorId(a.id) -> a).toMap } } - def fetchAuthor[F[_]: ConcurrentEffect](id: Int): Fetch[F, Author] = + def fetchAuthor[F[_]: Concurrent](id: Int): Fetch[F, Author] = Fetch(AuthorId(id), Authors.db) } } diff --git a/examples/src/test/scala/GithubExample.scala b/examples/src/test/scala/GithubExample.scala index ff030015..28837e0c 100644 --- a/examples/src/test/scala/GithubExample.scala +++ b/examples/src/test/scala/GithubExample.scala @@ -48,7 +48,7 @@ class GithubExample extends WordSpec with Matchers { // http4s client which is used by the datasources def client[F[_]: ConcurrentEffect] = - Http1Client[F](BlazeClientConfig.defaultConfig) + BlazeClientBuilder[F](executionContext).resource // -- repos @@ -78,7 +78,7 @@ class GithubExample extends WordSpec with Matchers { def data = Repos def fetch(id: (String, String)): F[Option[Repo]] = { - client[F] >>= ((c) => { + client[F].use((c) => { val (owner, repo) = id val url = GITHUB / "repos" / owner / repo +? ("access_token", ACCESS_TOKEN) val req = Request[F](Method.GET, url) @@ -109,12 +109,12 @@ class GithubExample extends WordSpec with Matchers { implicit val repoED: EntityDecoder[F, Repo] = jsonOf implicit val reposED: EntityDecoder[F, List[Repo]] = jsonOf - def CF = ConcurrentEffect[F] + def CF = Concurrent[F] def data = OrgRepos def fetch(org: Org): F[Option[List[Repo]]] = { - client[F] >>= ((c) => { + client[F].use((c) => { val url = GITHUB / "orgs" / org / "repos" +? ("access_token", ACCESS_TOKEN) +? ("type", "public") +? ("per_page", 100) val req = Request[F](Method.GET, url) fetchCollectionRecursively[F, Repo](c, req).map(Option(_)) @@ -145,7 +145,7 @@ class GithubExample extends WordSpec with Matchers { def data = Languages def fetch(repo: Repo): F[Option[List[Language]]] = { - client[F] >>= ((c) => { + client[F].use((c) => { val url = Uri.unsafeFromString(repo.languages_url) +? ("access_token", ACCESS_TOKEN) val req = Request[F](Method.GET, url) fetchCollectionRecursively[F, Language](c, req).map(Option(_)) @@ -175,8 +175,9 @@ class GithubExample extends WordSpec with Matchers { def data = Contributors def fetch(repo: Repo): F[Option[List[Contributor]]] = { - client[F] >>= ((c) => { - val url = Uri.unsafeFromString(repo.contributors_url) +? ("access_token", ACCESS_TOKEN) +? ("type", "public") +? ("per_page", 100) + client[F].use((c) => { + val url = Uri + .unsafeFromString(repo.contributors_url) +? ("access_token", ACCESS_TOKEN) +? ("type", "public") +? ("per_page", 100) val req = Request[F](Method.GET, url) fetchCollectionRecursively[F, Contributor](c, req).map(Option(_)) }) @@ -211,7 +212,7 @@ class GithubExample extends WordSpec with Matchers { fetchOrg(org).map(projects => projects.map(_.languages.toSet).fold(Set())(_ ++ _).size) "We can fetch org repos" in { - val io = Fetch.runLog[IO](fetchOrg("47deg")) + val io = Fetch.runLog[IO](fetchOrg[IO]("47deg")) val (log, result) = io.unsafeRunSync @@ -223,7 +224,7 @@ class GithubExample extends WordSpec with Matchers { val GITHUB: Uri = Uri.unsafeFromString("https://api.github.com") private def fetchCollectionRecursively[F[_], A](c: Client[F], req: Request[F])( - implicit CF: ConcurrentEffect[F], + implicit CF: MonadError[F, Throwable], E: EntityDecoder[F, List[A]] ): F[List[A]] = { val REL_NEXT = "rel=\"next\"".r @@ -239,17 +240,16 @@ class GithubExample extends WordSpec with Matchers { REL_NEXT .findFirstMatchIn(raw) .fold( - Sync[F].raiseError[String](new Exception("Couldn't find next link")) + CF.raiseError[String](new Exception("Couldn't find next link")) )(m => { - Sync[F].pure( - m.before.toString.split(",").last.trim.dropWhile(_ == '<').takeWhile(_ != '>')) + CF.pure(m.before.toString.split(",").last.trim.dropWhile(_ == '<').takeWhile(_ != '>')) }) } def getNext(res: Response[F]): F[Uri] = res.headers .get(CaseInsensitiveString("Link")) - .fold(Sync[F].raiseError[Uri](new Exception("next not found")))( + .fold(CF.raiseError[Uri](new Exception("next not found")))( raw => getNextLink(raw.value).map(Uri.unsafeFromString(_)) ) diff --git a/examples/src/test/scala/MonixExample.scala b/examples/src/test/scala/MonixExample.scala index 740ef24e..39a74bbc 100644 --- a/examples/src/test/scala/MonixExample.scala +++ b/examples/src/test/scala/MonixExample.scala @@ -30,7 +30,6 @@ class MonixExample extends AsyncFreeSpec with Matchers { implicit val scheduler: Scheduler = Scheduler.io(name = "test-scheduler") override val executionContext: ExecutionContext = scheduler implicit val t: Timer[Task] = scheduler.timer - implicit val cs: ContextShift[Task] = scheduler.contextShift import DatabaseExample._ diff --git a/shared/src/main/scala/datasource.scala b/shared/src/main/scala/datasource.scala index a20127de..8c2aa6db 100644 --- a/shared/src/main/scala/datasource.scala +++ b/shared/src/main/scala/datasource.scala @@ -18,11 +18,11 @@ package fetch import cats.{Functor, Monad} import cats.effect._ -import cats.data.NonEmptyList +import cats.data.{NonEmptyList, NonEmptyMap} import cats.instances.list._ import cats.instances.option._ import cats.syntax.all._ -import cats.kernel.{ Hash => H } +import cats.kernel.{Hash => H} /** * `Data` is a trait used to identify and optimize access to a `DataSource`. @@ -45,7 +45,7 @@ object Data { trait DataSource[F[_], I, A] { def data: Data[I, A] - implicit def CF: ConcurrentEffect[F] + implicit def CF: Concurrent[F] /** Fetch one identity, returning a None if it wasn't found. */ @@ -56,8 +56,8 @@ trait DataSource[F[_], I, A] { */ def batch(ids: NonEmptyList[I]): F[Map[I, A]] = FetchExecution.parallel( - ids.map(id => fetch(id).map((v) => id -> v)) - ).map(_.collect({ case (id, Some(x)) => id -> x }).toMap) + ids.map(id => fetch(id).tupleLeft(id)) + ).map(_.collect { case (id, Some(x)) => id -> x }.toMap) def maxBatchSize: Option[Int] = None diff --git a/shared/src/main/scala/execution.scala b/shared/src/main/scala/execution.scala index 4eba0424..1f65640f 100644 --- a/shared/src/main/scala/execution.scala +++ b/shared/src/main/scala/execution.scala @@ -22,7 +22,7 @@ import cats.syntax.all._ private object FetchExecution { def parallel[F[_], A](effects: NonEmptyList[F[A]])( - implicit CF: ConcurrentEffect[F] + implicit CF: Concurrent[F] ): F[NonEmptyList[A]] = effects.traverse(CF.start(_)).flatMap(fibers => fibers.traverse(_.join).onError({ case _ => fibers.traverse_(_.cancel) }) diff --git a/shared/src/main/scala/fetch.scala b/shared/src/main/scala/fetch.scala index c9f11ba7..82a30bcf 100644 --- a/shared/src/main/scala/fetch.scala +++ b/shared/src/main/scala/fetch.scala @@ -254,16 +254,16 @@ object `package` { /** * Lift a plain value to the Fetch monad. */ - def pure[F[_]: ConcurrentEffect, A](a: A): Fetch[F, A] = + def pure[F[_]: Applicative, A](a: A): Fetch[F, A] = Unfetch(Applicative[F].pure(Done(a))) - def exception[F[_]: ConcurrentEffect, A](e: Log => FetchException): Fetch[F, A] = + def exception[F[_]: Applicative, A](e: Log => FetchException): Fetch[F, A] = Unfetch(Applicative[F].pure(Throw[F, A](e))) - def error[F[_]: ConcurrentEffect, A](e: Throwable): Fetch[F, A] = - exception((log) => UnhandledException(e, log)) + def error[F[_]: Applicative, A](e: Throwable): Fetch[F, A] = + exception(log => UnhandledException(e, log)) - def apply[F[_] : ConcurrentEffect, I, A]( + def apply[F[_]: Concurrent, I, A]( id: I, ds: DataSource[F, I, A] ): Fetch[F, A] = @@ -278,14 +278,14 @@ object `package` { } yield Blocked(blockedRequest, Unfetch[F, A]( deferred.get.map { case FetchDone(a) => - Done(a).asInstanceOf[FetchResult[F, A]] + Done(a.asInstanceOf[A]) case FetchMissing() => - Throw((log) => MissingIdentity(id, request.asInstanceOf[FetchQuery[I, A]], log)) + Throw(log => MissingIdentity(id, request, log)) } )) ) - def optional[F[_] : ConcurrentEffect, I, A]( + def optional[F[_] : Concurrent, I, A]( id: I, ds: DataSource[F, I, A] ): Fetch[F, Option[A]] = @@ -300,7 +300,7 @@ object `package` { } yield Blocked(blockedRequest, Unfetch[F, Option[A]]( deferred.get.map { case FetchDone(a) => - Done(Some(a)).asInstanceOf[FetchResult[F, Option[A]]] + Done(Some(a.asInstanceOf[A])) case FetchMissing() => Done(Option.empty[A]) } @@ -319,8 +319,7 @@ object `package` { fa: Fetch[F, A] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[A] = apply(fa, InMemoryCache.empty[F]) @@ -330,8 +329,7 @@ object `package` { cache: DataCache[F] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[A] = for { cache <- Ref.of[F, DataCache[F]](cache) @@ -349,8 +347,7 @@ object `package` { fa: Fetch[F, A] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[(Log, A)] = apply(fa, InMemoryCache.empty[F]) @@ -360,8 +357,7 @@ object `package` { cache: DataCache[F] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[(Log, A)] = for { log <- Ref.of[F, Log](FetchLog()) @@ -381,8 +377,7 @@ object `package` { fa: Fetch[F, A] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[(DataCache[F], A)] = apply(fa, InMemoryCache.empty[F]) @@ -392,8 +387,7 @@ object `package` { cache: DataCache[F] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[(DataCache[F], A)] = for { cache <- Ref.of[F, DataCache[F]](cache) @@ -410,8 +404,7 @@ object `package` { log: Option[Ref[F, Log]] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[A] = for { result <- fa.run @@ -437,8 +430,7 @@ object `package` { log: Option[Ref[F, Log]] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[Unit] = { val blocked = rs.m.toList.map(_._2) @@ -464,13 +456,12 @@ object `package` { log: Option[Ref[F, Log]] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[List[Request]] = blocked.request match { - case q @ FetchOne(id, _) => runFetchOne[F](q, ds, blocked.result, cache, log) - case q @ Batch(ids, _) => runBatch[F](q, ds, blocked.result, cache, log) + case q @ FetchOne(_, _) => runFetchOne[F](q, ds, blocked.result, cache, log) + case q @ Batch(_, _) => runBatch[F](q, ds, blocked.result, cache, log) } } @@ -482,8 +473,7 @@ object `package` { log: Option[Ref[F, Log]] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[List[Request]] = for { @@ -527,8 +517,7 @@ object `package` { log: Option[Ref[F, Log]] )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[List[Request]] = for { @@ -581,8 +570,7 @@ object `package` { e: BatchExecution )( implicit - C: ConcurrentEffect[F], - CS: ContextShift[F], + C: Concurrent[F], T: Timer[F] ): F[BatchedRequest] = { val batches = NonEmptyList.fromListUnsafe( diff --git a/shared/src/main/scala/syntax.scala b/shared/src/main/scala/syntax.scala index 5bd9c0cd..9ad23f74 100644 --- a/shared/src/main/scala/syntax.scala +++ b/shared/src/main/scala/syntax.scala @@ -24,14 +24,14 @@ object syntax { /** Implicit syntax to lift any value to the context of Fetch via pure */ implicit class FetchIdSyntax[A](val a: A) extends AnyVal { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, A] = + def fetch[F[_] : Concurrent]: Fetch[F, A] = Fetch.pure[F, A](a) } /** Implicit syntax to lift exception to Fetch errors */ implicit class FetchExceptionSyntax[B](val a: Throwable) extends AnyVal { - def fetch[F[_] : ConcurrentEffect]: Fetch[F, B] = + def fetch[F[_] : Concurrent]: Fetch[F, B] = Fetch.error[F, B](a) } } diff --git a/shared/src/test/scala/FetchTests.scala b/shared/src/test/scala/FetchTests.scala index 61a695f2..7c52c477 100644 --- a/shared/src/test/scala/FetchTests.scala +++ b/shared/src/test/scala/FetchTests.scala @@ -837,7 +837,7 @@ class FetchTests extends FetchSpec { "We can make fetches that depend on optional fetch results when they aren't defined" in { def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { maybe <- maybeOpt(2) - result <- maybe.fold(Fetch.pure(42))(i => one(i)) + result <- maybe.fold(Fetch.pure[F, Int](42))(i => one(i)) } yield result Fetch.run[IO](fetch).map(_ shouldEqual 42).unsafeToFuture @@ -846,7 +846,7 @@ class FetchTests extends FetchSpec { "We can make fetches that depend on optional fetch results when they are defined" in { def fetch[F[_] : ConcurrentEffect]: Fetch[F, Int] = for { maybe <- maybeOpt(1) - result <- maybe.fold(Fetch.pure(42))(i => one(i)) + result <- maybe.fold(Fetch.pure[F, Int](42))(i => one(i)) } yield result Fetch.run[IO](fetch).map(_ shouldEqual 1).unsafeToFuture