Skip to content

Commit

Permalink
Add DataFrame Routes (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
rafafrdz authored Jan 3, 2025
1 parent 19aa6fe commit fb1c4b1
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* MIT License
*
* Copyright (c) 2024 Rafael Fernandez
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.eff3ct.teckel.controller

import cats.effect.Async
import cats.implicits._
import com.eff3ct.teckel.interface.request._
import com.eff3ct.teckel.service.DataFrameService
import io.circe.generic.auto._
import org.apache.spark.sql.SparkSession
import org.http4s.Status.Ok
import org.http4s.circe.CirceEntityCodec._
import org.http4s.{Request, Response}

object DataFrameController {

def execute[F[_]: Async](request: Request[F]): F[Response[F]] = {
for {
jobConfigRequest <- request.as[SparkJobConfigRequest]
spark = SparkSession.builder().remote(jobConfigRequest.url).getOrCreate()
_ <- DataFrameService.impl[F](spark).execute()
} yield Response[F](status = Ok)

}

}
9 changes: 5 additions & 4 deletions src/main/scala/com/eff3ct/teckel/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.comcast.ip4s.{Host, Port}
import com.eff3ct.teckel.controller.ApiResponseEnrich
import com.eff3ct.teckel.interface.error.ApiError
import com.eff3ct.teckel.server.config.ApiConfig
import com.eff3ct.teckel.server.route.{HealthRoutes, TeckelRoutes}
import com.eff3ct.teckel.server.route.{DataFrameRoutes, HealthRoutes, TeckelRoutes}
import fs2.io.net.Network
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.implicits._
Expand All @@ -43,12 +43,13 @@ object Server {
def build[F[_]: Async: Network: Logger](apiConf: ApiConfig): Resource[F, Server] = {

// Definition of Routes layers
val healthRoutes = HealthRoutes.impl[F]
val teckelRoutes = TeckelRoutes.impl[F]
val healthRoutes = HealthRoutes.impl[F]
val dataFrameRoutes = DataFrameRoutes.impl[F]
val teckelRoutes = TeckelRoutes.impl[F]

// HTTP App
val httpApp: HttpApp[F] = (
healthRoutes <+> teckelRoutes
healthRoutes <+> dataFrameRoutes <+> teckelRoutes
).orNotFound

// Error handler in case of something wrong
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* MIT License
*
* Copyright (c) 2024 Rafael Fernandez
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.eff3ct.teckel.server.route

import cats.effect.Async
import com.eff3ct.teckel.controller.DataFrameController
import org.http4s.HttpRoutes
import org.http4s.dsl.Http4sDsl

object DataFrameRoutes {
val DataFramePath: String = "dataframe"

def impl[F[_]: Async]: HttpRoutes[F] = {

val dsl = new Http4sDsl[F] {}
import dsl._

HttpRoutes.of[F] { case data @ POST -> Root / DataFramePath =>
DataFrameController.execute[F](data)
}
}

}
47 changes: 47 additions & 0 deletions src/main/scala/com/eff3ct/teckel/service/DataFrameService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* MIT License
*
* Copyright (c) 2024 Rafael Fernandez
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.eff3ct.teckel.service

import cats.effect.Async
import org.apache.spark.sql.SparkSession

trait DataFrameService[F[_]] {

def execute(): F[Unit]

}

object DataFrameService {

def apply[F[_]: DataFrameService]: DataFrameService[F] =
implicitly[DataFrameService[F]]

def impl[F[_]: Async](S: SparkSession): DataFrameService[F] = new DataFrameService[F] {

override def execute(): F[Unit] =
Async[F].delay(S.createDataFrame(Seq(("a", 1), ("b", 2))).show(false))
}

}

0 comments on commit fb1c4b1

Please sign in to comment.