Skip to content

Commit

Permalink
Don't close SttpBackend in SimpleInfluxClient
Browse files Browse the repository at this point in the history
It belongs to NussknackerApp
  • Loading branch information
piotrp committed Aug 31, 2023
1 parent a5faea0 commit c09f8d4
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pl.touk.nussknacker.processCounts.influxdb
import com.dimafeng.testcontainers.{ForAllTestContainer, InfluxDBContainer}
import org.influxdb.InfluxDBFactory
import org.influxdb.dto.Point
import org.scalatest.Assertion
import org.scalatest.{Assertion, BeforeAndAfterAll}
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.prop.TableDrivenPropertyChecks
Expand All @@ -16,7 +16,7 @@ import java.time.Instant
import java.util.concurrent.TimeUnit
import scala.language.implicitConversions

class InfluxCountsReporterSpec extends AnyFunSuite with ForAllTestContainer with TableDrivenPropertyChecks with VeryPatientScalaFutures with Matchers {
class InfluxCountsReporterSpec extends AnyFunSuite with BeforeAndAfterAll with ForAllTestContainer with TableDrivenPropertyChecks with VeryPatientScalaFutures with Matchers {

implicit val backend: SttpBackend[Identity, Any] = HttpURLConnectionBackend()

Expand All @@ -33,6 +33,11 @@ class InfluxCountsReporterSpec extends AnyFunSuite with ForAllTestContainer with

private val env = "testEnv"

override protected def afterAll(): Unit = {
backend.close()
super.afterAll()
}

test("invokes counts for point in time data") {

val process = "myProcess-1"
Expand Down Expand Up @@ -112,7 +117,7 @@ class InfluxCountsReporterSpec extends AnyFunSuite with ForAllTestContainer with
private val influxDB = InfluxDBFactory.connect(container.url, container.username, container.password)

def reporter(queryMode: QueryMode.Value) = new InfluxCountsReporter[Identity](env,
InfluxConfig(container.url + "/query", Option(container.username), Option(container.password), container.database, queryMode, Some(config)), identity
InfluxConfig(container.url + "/query", Option(container.username), Option(container.password), container.database, queryMode, Some(config))
)

influxDB.setDatabase(container.database)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ import sttp.monad.MonadError
import sttp.monad.syntax._

import java.time.Instant
import scala.concurrent.duration.DurationInt
import scala.concurrent.{Await, Future}
import scala.concurrent.Future
import scala.language.higherKinds

/*
Base reporter for counts
*/
class InfluxCountsReporter[F[_]](env: String, config: InfluxConfig, waitForClose: F[Unit] => Unit)(implicit backend: SttpBackend[F, Any]) extends CountsReporter[F] with LazyLogging {
class InfluxCountsReporter[F[_]](env: String, config: InfluxConfig)(implicit backend: SttpBackend[F, Any]) extends CountsReporter[F] with LazyLogging {

val influxGenerator = new InfluxGenerator(config, env)
private val influxGenerator = new InfluxGenerator(config, env)

private implicit val monadError: MonadError[F] = backend.responseMonad

Expand All @@ -28,7 +27,7 @@ class InfluxCountsReporter[F[_]](env: String, config: InfluxConfig, waitForClose
case ExecutionCount(pointInTime) => influxGenerator.queryBySingleDifference(processId, None, pointInTime, metricsConfig)
}).map(_.get)

override def close(): Unit = waitForClose(influxGenerator.close())
override def close(): Unit = {}

private def prepareRangeCounts(processId: String, fromDate: Instant, toDate: Instant): F[Map[String, Long]] = {

Expand Down Expand Up @@ -62,7 +61,7 @@ class InfluxCountsReporterCreator extends CountsReporterCreator {
override def createReporter(env: String, config: Config)
(implicit backend: SttpBackend[Future, Any]): CountsReporter[Future] = {
//TODO: logger
new InfluxCountsReporter(env, config.as[InfluxConfig](CountsReporterCreator.reporterCreatorConfigPath), Await.result(_, 10 seconds))
new InfluxCountsReporter(env, config.as[InfluxConfig](CountsReporterCreator.reporterCreatorConfigPath))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ private[influxdb] class InfluxGenerator[F[_]](config: InfluxConfig, env: String)
private def parseInfluxDate(date:String) : Instant =
ZonedDateTime.parse(date, DateTimeFormatter.ISO_ZONED_DATE_TIME).toInstant

def close(): F[Unit] = influxClient.close()

}

object InfluxGenerator extends LazyLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class SimpleInfluxClient[F[_]](config: InfluxConfig)(implicit backend: SttpBacke
.map(_.results.head.series)
}

def close(): F[Unit] = backend.close()

}

case class InfluxResponse(results: List[InfluxResult] = Nil)
Expand Down

0 comments on commit c09f8d4

Please sign in to comment.