diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala deleted file mode 100644 index bdd38d68cd..0000000000 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsRequestTimeoutTest.scala +++ /dev/null @@ -1,95 +0,0 @@ -package sttp.tapir.server.netty.cats - -import cats.effect.{IO, Resource} -import cats.effect.std.Dispatcher -import cats.effect.unsafe.implicits.global -import io.netty.channel.EventLoopGroup -import org.scalatest.matchers.should.Matchers._ -import sttp.capabilities.WebSockets -import sttp.capabilities.fs2.Fs2Streams -import sttp.client3._ -import sttp.model.HeaderNames -import sttp.tapir._ -import sttp.tapir.server.netty.NettyConfig -import sttp.tapir.tests.Test - -import scala.concurrent.duration.DurationInt - -class NettyCatsRequestTimeoutTest( - dispatcher: Dispatcher[IO], - eventLoopGroup: EventLoopGroup, - backend: SttpBackend[IO, Fs2Streams[IO] with WebSockets] -) { - def tests(): List[Test] = List( - Test("chunked transmission lasts longer than given timeout") { - val givenRequestTimeout = 2.seconds - val howManyChunks: Int = 2 - val chunkSize = 100 - val millisBeforeSendingSecondChunk = 1000L - - val e = - endpoint.post - .in(header[Long](HeaderNames.ContentLength)) - .in(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .out(header[Long](HeaderNames.ContentLength)) - .out(streamTextBody(Fs2Streams[IO])(CodecFormat.TextPlain())) - .serverLogicSuccess[IO] { case (length, stream) => - IO((length, stream)) - } - - val config = - NettyConfig.default - .eventLoopGroup(eventLoopGroup) - .randomPort - .withDontShutdownEventLoopGroupOnClose - .noGracefulShutdown - .requestTimeout(givenRequestTimeout) - - val bind = NettyCatsServer(dispatcher, config).addEndpoint(e).start() - - def iterator(howManyChunks: Int, chunkSize: Int): Iterator[Byte] = new Iterator[Iterator[Byte]] { - private var chunksToGo: Int = howManyChunks - - def hasNext: Boolean = { - if (chunksToGo == 1) - Thread.sleep(millisBeforeSendingSecondChunk) - chunksToGo > 0 - } - - def next(): Iterator[Byte] = { - chunksToGo -= 1 - List.fill('A')(chunkSize).map(_.toByte).iterator - } - }.flatMap(identity) - - val inputStream = fs2.Stream.fromIterator[IO](iterator(howManyChunks, chunkSize), chunkSize = chunkSize) - - Resource - .make(bind)(_.stop()) - .map(_.port) - .use { port => - basicRequest - .post(uri"http://localhost:$port") - .contentLength(howManyChunks * chunkSize) - .streamBody(Fs2Streams[IO])(inputStream) - .send(backend) - .map { _ => - fail("I've got a bad feeling about this.") - } - } - .attempt - .map { - case Left(ex: sttp.client3.SttpClientException.TimeoutException) => - ex.getCause.getMessage shouldBe "request timed out" - case Left(ex: sttp.client3.SttpClientException.ReadException) if ex.getCause.isInstanceOf[java.io.IOException] => - println(s"Unexpected IOException: $ex") - fail(s"Unexpected IOException: $ex") - case Left(ex) => - fail(s"Unexpected exception: $ex") - case Right(_) => - fail("Expected an exception but got success") - } - .unsafeToFuture() - } - ) -} diff --git a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala index 7f38b698d9..e8e36e90a5 100644 --- a/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala +++ b/server/netty-server/cats/src/test/scala/sttp/tapir/server/netty/cats/NettyCatsServerTest.scala @@ -49,8 +49,7 @@ class NettyCatsServerTest extends TestSuite with EitherValues { override def functionToPipe[A, B](f: A => B): streams.Pipe[A, B] = in => in.map(f) override def emptyPipe[A, B]: fs2.Pipe[IO, A, B] = _ => fs2.Stream.empty } - .tests() ++ - new NettyCatsRequestTimeoutTest(dispatcher, eventLoopGroup, backend).tests() + .tests() IO.pure((tests, eventLoopGroup)) } { case (_, eventLoopGroup) => diff --git a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala index 7ef2dc432d..b62c6da644 100644 --- a/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala +++ b/server/play-server/src/test/scala/sttp/tapir/server/play/PlayServerTest.scala @@ -1,21 +1,32 @@ package sttp.tapir.server.play +import scala.concurrent.duration.{DurationInt, FiniteDuration} import org.apache.pekko.actor.ActorSystem import org.apache.pekko.stream.scaladsl.{Flow, Sink, Source} import cats.data.NonEmptyList import cats.effect.{IO, Resource} import cats.effect.unsafe.implicits.global -import org.scalatest.matchers.should.Matchers._ +import com.typesafe.config.ConfigFactory +import org.apache.pekko.util.ByteString +import org.scalatest.matchers.should.Matchers.{fail, _} +import play.api.{Configuration, Mode} import play.api.http.ParserConfiguration +import play.api.routing.Router +import play.core.server.{DefaultPekkoHttpServerComponents, ServerConfig} +import sttp.capabilities.Streams +import sttp.capabilities.fs2.Fs2Streams import sttp.capabilities.pekko.PekkoStreams import sttp.client3._ -import sttp.model.{MediaType, Part, StatusCode} +import sttp.model.{HeaderNames, MediaType, Part, StatusCode} import sttp.monad.FutureMonad import sttp.tapir._ import sttp.tapir.server.tests._ import sttp.tapir.tests.{Test, TestSuite} +import fs2.{Chunk, Stream} +import sttp.capabilities.fs2.Fs2Streams import scala.concurrent.Future +import scala.concurrent.duration.FiniteDuration class PlayServerTest extends TestSuite { @@ -23,10 +34,10 @@ class PlayServerTest extends TestSuite { Resource.make(IO.delay(ActorSystem()))(actorSystem => IO.fromFuture(IO.delay(actorSystem.terminate())).void) override def tests: Resource[IO, List[Test]] = backendResource.flatMap { backend => - actorSystemResource.map { implicit actorSystem => - implicit val m: FutureMonad = new FutureMonad()(actorSystem.dispatcher) + actorSystemResource.map { implicit _actorSystem => + implicit val m: FutureMonad = new FutureMonad()(_actorSystem.dispatcher) - val interpreter = new PlayTestServerInterpreter()(actorSystem) + val interpreter = new PlayTestServerInterpreter()(_actorSystem) val createServerTest = new DefaultCreateServerTest(backend, interpreter) def additionalTests(): List[Test] = List( @@ -98,7 +109,61 @@ class PlayServerTest extends TestSuite { } } .unsafeToFuture() - } + }, + Test("chunked transmission lasts longer than given timeout") { + val chunkSize = 100 + val beforeSendingSecondChunk: FiniteDuration = 2.second + val requestTimeout: FiniteDuration = 1.second + + val e = + endpoint.post + .in(header[Long](HeaderNames.ContentLength)) + .in(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .out(header[Long](HeaderNames.ContentLength)) + .out(streamTextBody(PekkoStreams)(CodecFormat.TextPlain())) + .serverLogicSuccess[Future] { case (length, stream) => + Future.successful(length, stream) + } + + val components: DefaultPekkoHttpServerComponents = new DefaultPekkoHttpServerComponents { + val initialServerConfig: ServerConfig = ServerConfig(port = Some(0), address = "127.0.0.1", mode = Mode.Test) + + val customConf: Configuration = + Configuration( + ConfigFactory.parseString(s"play { server.pekko.requestTimeout=${requestTimeout.toString} }") + ) + override lazy val serverConfig: ServerConfig = + initialServerConfig.copy(configuration = customConf.withFallback(initialServerConfig.configuration)) + override lazy val actorSystem: ActorSystem = ActorSystem("tapir", defaultExecutionContext = Some(_actorSystem.dispatcher)) + override lazy val router: Router = Router.from(PlayServerInterpreter().toRoutes(e)).withPrefix("/chunks") + } + + def createStream(chunkSize: Int, beforeSendingSecondChunk: FiniteDuration): Stream[IO, Byte] = { + val chunk = Chunk.array(Array.fill(chunkSize)('A'.toByte)) + val initialChunks = Stream.chunk(chunk) + val delayedChunk = Stream.sleep[IO](beforeSendingSecondChunk) >> Stream.chunk(chunk) + initialChunks ++ delayedChunk + } + + val inputStream = createStream(chunkSize, beforeSendingSecondChunk) + + val bind = IO.blocking(components.server) + Resource.make(bind)(s => IO.blocking(s.stop())) + .map(_.mainAddress.getPort) + .use { port => + basicRequest + .post(uri"http://localhost:$port/chunks") + .contentLength(2 * chunkSize) + .streamBody(Fs2Streams[IO])(inputStream) + .send(backend) + .map{ response => + response.code shouldBe StatusCode.Ok + response.contentLength shouldBe Some(2 * chunkSize) + response.body shouldBe Right("A" * 2 * chunkSize) + } + } + .unsafeToFuture() + }, ) def drainPekko(stream: PekkoStreams.BinaryStream): Future[Unit] = @@ -135,4 +200,6 @@ class PlayServerTest extends TestSuite { additionalTests() } } + + override def testNameFilter: Option[String] = Some("chunked transmission lasts longer than given timeout") }