Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch request body as ReadableStream #628

Merged
merged 13 commits into from
Nov 17, 2021
15 changes: 15 additions & 0 deletions dom/src/main/scala/org/scalajs/dom/ReadableStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,18 @@ trait ReadableStream[+T] extends js.Object {
*/
def tee(): js.Array[_ <: ReadableStream[T]] = js.native // TODO js.Tuple2[ReadableStream[T], ReadableStream[T]]
}

object ReadableStream {

def apply[T](
underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]],
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
queuingStrategy: js.UndefOr[ReadableStreamQueuingStrategy[T]] = js.undefined
): ReadableStream[T] = {
js.Dynamic
.newInstance(js.Dynamic.global.selectDynamic("ReadableStream"))(
ptrdom marked this conversation as resolved.
Show resolved Hide resolved
underlyingSource.asInstanceOf[js.Any],
queuingStrategy.asInstanceOf[js.Any]
)
.asInstanceOf[ReadableStream[T]]
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.annotation._

/** [[https://streams.spec.whatwg.org/#rs-controller-class ¶3.3 Class ReadableStreamController]] of whatwg spec
*
Expand All @@ -14,8 +13,7 @@ import scala.scalajs.js.annotation._
* Type of the Chunks to be enqueued to the Stream
*/
@js.native
@JSGlobal
class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.Object {
trait ReadableStreamController[-T] extends js.Object {
ptrdom marked this conversation as resolved.
Show resolved Hide resolved

/** The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be
* negative, if the queue is over-full. An underlying source should use this information to determine when and how to
Expand All @@ -39,7 +37,7 @@ class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.
* @return
* seems like its an undefOr[Int] of the size
*/
def enqueue(chunk: Chunk[T]): js.UndefOr[Int] = js.native
def enqueue(chunk: T): js.UndefOr[Int] = js.native
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** The error method will error the readable stream, making all future interactions with it fail with the given error
* e.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.scalajs.dom

import scala.scalajs.js

/** See [[https://streams.spec.whatwg.org/#qs-api ¶7.1. The queuing strategy API]]
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamQueuingStrategy[T] extends js.Object {
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** A non-negative number indicating the high water mark of the stream using this queuing strategy. */
var highWaterMark: Double
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** (non-byte streams only)
*
* The result is used to determine backpressure, manifesting via the appropriate desiredSize property. For readable
* streams, it also governs when the underlying source's [[ReadableStreamUnderlyingSource.pull]] method is called.
*
* A function that computes and returns the finite non-negative size of the given chunk value.
*/
var size: js.Function1[Chunk[T], Unit]
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@ class ReadableStreamReader[+T](stream: ReadableStream[T]) extends js.Object {
*
* If the reader is active, the cancel method behaves the same as that for the associated stream. When done, it
* automatically releases the lock.
*
* //todo determine type of reason
*/
// not actually sure what the return type is here
def cancel(reason: Any): js.Promise[Any] = js.native
def cancel[U](reason: js.UndefOr[U]): js.Promise[U] = js.native
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** See [[https://streams.spec.whatwg.org/#reader-read 3.4.4.3. read()]] of whatwg Stream spec.
*
Expand Down
11 changes: 11 additions & 0 deletions dom/src/main/scala/org/scalajs/dom/ReadableStreamType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.scalajs.dom

import scala.scalajs.js

/** [[https://streams.spec.whatwg.org/#enumdef-readablestreamtype ReadableStreamType enum]] */
@js.native
sealed trait ReadableStreamType extends js.Any
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

object ReadableStreamType {
val bytes: ReadableStreamType = "bytes".asInstanceOf[ReadableStreamType]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.|

/** See [[https://streams.spec.whatwg.org/#underlying-source-api ¶4.2.3. The underlying source API]] of whatwg streams
* spec.
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamUnderlyingSource[T] extends js.Object {

/** A function that is called immediately during creation of the ReadableStream.
*
* If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise
* will error the stream. Any thrown exceptions will be re-thrown by the [[ReadableStream]] constructor.
*/
var start: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the
* queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high
* water mark (i.e. until the desired size becomes non-positive).
*
* This function will not be called until [[start]] successfully completes. Additionally, it will only be called
* repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-op [[pull]] implementation will not
* be continually called.
*
* If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise
* rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise
* returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a
* rejected promise.
*/
var pull: js.UndefOr[js.Function1[ReadableStreamController[T], Unit | js.Promise[Unit]]] = js.undefined
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** A function that is called whenever the consumer cancels the stream, via [[ReadableStream.cancel]] or
* [[ReadableStreamReader.cancel]]. It takes as its argument the same value as was passed to those methods by the
* consumer.
*
* If the shutdown process is asynchronous, it can return a promise to signal success or failure; the result will be
* communicated via the return value of the [[cancel]] method that was called. Additionally, a rejected promise will
* error the stream, instead of letting it close. Throwing an exception is treated the same as returning a rejected
* promise.
*/
var cancel: js.UndefOr[js.Function1[js.Any, Unit | js.Promise[Unit]]] = js.undefined
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** Can be set to "bytes" to signal that the constructed [[ReadableStream]] is a readable byte stream.
*
* Setting any value other than "bytes" or undefined will cause the ReadableStream() constructor to throw an
* exception.
*/
var `type`: js.UndefOr[ReadableStreamType] = js.undefined

/** (byte streams only)
*
* Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying
* source code to write into.
*/
var autoAllocateChunkSize: js.UndefOr[Double] = js.undefined
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 2 additions & 2 deletions dom/src/main/scala/org/scalajs/dom/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.scalajs

import scala.scalajs.js
import scala.scalajs.js.annotation._
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView}
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView, Uint8Array}
import scala.scalajs.js.|

package object dom {
Expand Down Expand Up @@ -32,7 +32,7 @@ package object dom {

/** defined at [[https://fetch.spec.whatwg.org/#body-mixin ¶6.2 Body mixin]] in whatwg Fetch spec */
type BodyInit =
Blob | BufferSource | FormData | String // todo: add URLSearchParams
Blob | BufferSource | FormData | String | ReadableStream[Uint8Array] // todo: add URLSearchParams

/** WebIDL sequence<T> is js.Array[T] | JSIterable[T]. However @mseddon knows at least Blink's IDL compiler treats
* these as simply js.Array[T] for now. We keep this type as a reminder to check in more detail
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package org.scalajs.dom.tests.shared

import org.scalajs.dom.tests.shared.AsyncTesting._
import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalajs.dom.tests.shared.AsyncTesting._

import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.Thenable.Implicits._

trait SharedTests {

Expand Down Expand Up @@ -36,4 +41,42 @@ trait SharedTests {

@Test final def WindowIdbTest(): AsyncResult =
IdbTest(window.indexedDB)

@Test
final def ReadableStreamTest: AsyncResult = async {
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
case class Tuna(color: String)

val expectedTunas = Seq(
Tuna("blue"),
Tuna("red")
)

val stream = ReadableStream[Tuna](
new ReadableStreamUnderlyingSource[Tuna] {
start = { (controller: ReadableStreamController[Tuna]) =>
controller.enqueue(Tuna("blue"))
controller.enqueue(Tuna("red"))
controller.close()
}: js.Function1[ReadableStreamController[Tuna], Unit]
ptrdom marked this conversation as resolved.
Show resolved Hide resolved
}
)

val reader = stream.getReader()

def read(tunas: Seq[Tuna]): Future[Seq[Tuna]] = {
reader
.read()
.flatMap { chunk =>
if (chunk.done) {
Future.successful(tunas)
} else {
read(tunas :+ chunk.value)
}
}
}
read(Seq.empty)
.map { receivedTunas =>
assertEquals(receivedTunas, expectedTunas)
}
}
}