Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch request body as ReadableStream #628

Merged
merged 13 commits into from
Nov 17, 2021
17 changes: 14 additions & 3 deletions api-reports/2_12.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15625,6 +15625,8 @@ PushSubscriptionJSON[JT] val expirationTime: java.lang.Double
PushSubscriptionJSON[JT] val keys: js.Dictionary[String]
PushSubscriptionOptions[JT] var applicationServerKey: js.UndefOr[Uint8Array]
PushSubscriptionOptions[JT] var userVisibleOnly: js.UndefOr[Boolean]
QueuingStrategy[JT] var highWaterMark: Int
QueuingStrategy[JT] var size: js.Function1[T, Int]
RTCBundlePolicy[JT]
RTCBundlePolicy[SO] val balanced: RTCBundlePolicy
RTCBundlePolicy[SO] val `max-bundle` = "max-bundle".asInstanceOf[RTCBundlePolicy]
Expand Down Expand Up @@ -15851,14 +15853,23 @@ ReadableStream[JT] def locked: Boolean
ReadableStream[JT] def pipeThrough[U](pair: Any, options: Any?): ReadableStream[U]
ReadableStream[JT] def pipeTo(dest: WriteableStream[T], options: Any?): Unit
ReadableStream[JT] def tee(): js.Array[_ <: ReadableStream[T]]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]]?, queuingStrategy: js.UndefOr[QueuingStrategy[T]]?): ReadableStream[T]
ReadableStreamController[JC] def close(): Unit
ReadableStreamController[JC] def desiredSize: Int
ReadableStreamController[JC] def enqueue(chunk: Chunk[T]): js.UndefOr[Int]
ReadableStreamController[JC] def enqueue(chunk: T): js.UndefOr[Int]
ReadableStreamController[JC] def error(e: Any): Unit
ReadableStreamReader[JC] def cancel(reason: Any): js.Promise[Any]
ReadableStreamReader[JC] def cancel(): js.Promise[Unit]
ReadableStreamReader[JC] def cancel[U](reason: U): js.Promise[U]
ReadableStreamReader[JC] def closed: js.Promise[ReadableStreamReader[T]]
ReadableStreamReader[JC] def read(): js.Promise[Chunk[T]]
ReadableStreamReader[JC] def releaseLock(): Unit
ReadableStreamType[JT]
ReadableStreamType[SO] val bytes: ReadableStreamType
ReadableStreamUnderlyingSource[JT] var autoAllocateChunkSize: js.UndefOr[Double]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var `type`: js.UndefOr[ReadableStreamType]
ReferrerPolicy[JT]
ReferrerPolicy[SO] val empty: ReferrerPolicy
ReferrerPolicy[SO] val `no-referrer` = "no-referrer".asInstanceOf[ReferrerPolicy]
Expand Down Expand Up @@ -25958,7 +25969,7 @@ intl/NumberFormatOptions[JT] var useGrouping: js.UndefOr[Boolean]
intl/NumberFormatOptions[SO] def apply(localeMatcher: js.UndefOr[String]?, style: js.UndefOr[String]?, currency: js.UndefOr[String]?, currencyDisplay: js.UndefOr[String]?, useGrouping: js.UndefOr[Boolean]?, minimumIntegerDigits: js.UndefOr[Double]?, minimumFractionDigits: js.UndefOr[Double]?, maximumFractionDigits: js.UndefOr[Double]?, minimumSignificantDigits: js.UndefOr[Double]?, maximumSignificantDigits: js.UndefOr[Double]?): NumberFormatOptions (@deprecated in 2.0.0)
package[SO] type AlgorithmIdentifier = Algorithm | String
package[SO] type BigInteger = js.typedarray.Uint8Array
package[SO] type BodyInit = Blob | BufferSource | FormData | String
package[SO] type BodyInit = Blob | BufferSource | FormData | String | ReadableStream[Uint8Array]
package[SO] type BufferSource = ArrayBufferView | ArrayBuffer
package[SO] type ByteString = String
package[SO] type ClientRect = DOMRect (@deprecated in 2.0.0)
Expand Down
17 changes: 14 additions & 3 deletions api-reports/2_13.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15625,6 +15625,8 @@ PushSubscriptionJSON[JT] val expirationTime: java.lang.Double
PushSubscriptionJSON[JT] val keys: js.Dictionary[String]
PushSubscriptionOptions[JT] var applicationServerKey: js.UndefOr[Uint8Array]
PushSubscriptionOptions[JT] var userVisibleOnly: js.UndefOr[Boolean]
QueuingStrategy[JT] var highWaterMark: Int
QueuingStrategy[JT] var size: js.Function1[T, Int]
RTCBundlePolicy[JT]
RTCBundlePolicy[SO] val balanced: RTCBundlePolicy
RTCBundlePolicy[SO] val `max-bundle` = "max-bundle".asInstanceOf[RTCBundlePolicy]
Expand Down Expand Up @@ -15851,14 +15853,23 @@ ReadableStream[JT] def locked: Boolean
ReadableStream[JT] def pipeThrough[U](pair: Any, options: Any?): ReadableStream[U]
ReadableStream[JT] def pipeTo(dest: WriteableStream[T], options: Any?): Unit
ReadableStream[JT] def tee(): js.Array[_ <: ReadableStream[T]]
ReadableStream[SO] def apply[T](underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]]?, queuingStrategy: js.UndefOr[QueuingStrategy[T]]?): ReadableStream[T]
ReadableStreamController[JC] def close(): Unit
ReadableStreamController[JC] def desiredSize: Int
ReadableStreamController[JC] def enqueue(chunk: Chunk[T]): js.UndefOr[Int]
ReadableStreamController[JC] def enqueue(chunk: T): js.UndefOr[Int]
ReadableStreamController[JC] def error(e: Any): Unit
ReadableStreamReader[JC] def cancel(reason: Any): js.Promise[Any]
ReadableStreamReader[JC] def cancel(): js.Promise[Unit]
ReadableStreamReader[JC] def cancel[U](reason: U): js.Promise[U]
ReadableStreamReader[JC] def closed: js.Promise[ReadableStreamReader[T]]
ReadableStreamReader[JC] def read(): js.Promise[Chunk[T]]
ReadableStreamReader[JC] def releaseLock(): Unit
ReadableStreamType[JT]
ReadableStreamType[SO] val bytes: ReadableStreamType
ReadableStreamUnderlyingSource[JT] var autoAllocateChunkSize: js.UndefOr[Double]
ReadableStreamUnderlyingSource[JT] var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]]
ReadableStreamUnderlyingSource[JT] var `type`: js.UndefOr[ReadableStreamType]
ReferrerPolicy[JT]
ReferrerPolicy[SO] val empty: ReferrerPolicy
ReferrerPolicy[SO] val `no-referrer` = "no-referrer".asInstanceOf[ReferrerPolicy]
Expand Down Expand Up @@ -25958,7 +25969,7 @@ intl/NumberFormatOptions[JT] var useGrouping: js.UndefOr[Boolean]
intl/NumberFormatOptions[SO] def apply(localeMatcher: js.UndefOr[String]?, style: js.UndefOr[String]?, currency: js.UndefOr[String]?, currencyDisplay: js.UndefOr[String]?, useGrouping: js.UndefOr[Boolean]?, minimumIntegerDigits: js.UndefOr[Double]?, minimumFractionDigits: js.UndefOr[Double]?, maximumFractionDigits: js.UndefOr[Double]?, minimumSignificantDigits: js.UndefOr[Double]?, maximumSignificantDigits: js.UndefOr[Double]?): NumberFormatOptions (@deprecated in 2.0.0)
package[SO] type AlgorithmIdentifier = Algorithm | String
package[SO] type BigInteger = js.typedarray.Uint8Array
package[SO] type BodyInit = Blob | BufferSource | FormData | String
package[SO] type BodyInit = Blob | BufferSource | FormData | String | ReadableStream[Uint8Array]
package[SO] type BufferSource = ArrayBufferView | ArrayBuffer
package[SO] type ByteString = String
package[SO] type ClientRect = DOMRect (@deprecated in 2.0.0)
Expand Down
11 changes: 11 additions & 0 deletions dom/src/main/scala-2/org/scalajs/dom/ReadableStreamType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.scalajs.dom

import scala.scalajs.js

/** [[https://streams.spec.whatwg.org/#enumdef-readablestreamtype ReadableStreamType enum]] */
@js.native
sealed trait ReadableStreamType extends js.Any

object ReadableStreamType {
val bytes: ReadableStreamType = "bytes".asInstanceOf[ReadableStreamType]
}
7 changes: 7 additions & 0 deletions dom/src/main/scala-3/org/scalajs/dom/ReadableStreamType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.scalajs.dom

opaque type ReadableStreamType <: String = String

object ReadableStreamType {
val bytes: ReadableStreamType = "bytes"
}
23 changes: 23 additions & 0 deletions dom/src/main/scala/org/scalajs/dom/QueuingStrategy.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.scalajs.dom

import scala.scalajs.js

/** See [[https://streams.spec.whatwg.org/#qs-api ¶7.1. The queuing strategy API]]
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait QueuingStrategy[T] extends js.Object {

/** A non-negative number indicating the high water mark of the stream using this queuing strategy. */
var highWaterMark: Int

/** (non-byte streams only)
*
* The result is used to determine backpressure, manifesting via the appropriate desiredSize property. For readable
* streams, it also governs when the underlying source's [[ReadableStreamUnderlyingSource.pull]] method is called.
*
* A function that computes and returns the finite non-negative size of the given chunk value.
*/
var size: js.Function1[T, Int]
}
15 changes: 15 additions & 0 deletions dom/src/main/scala/org/scalajs/dom/ReadableStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,18 @@ trait ReadableStream[+T] extends js.Object {
*/
def tee(): js.Array[_ <: ReadableStream[T]] = js.native // TODO js.Tuple2[ReadableStream[T], ReadableStream[T]]
}

object ReadableStream {

def apply[T](
underlyingSource: js.UndefOr[ReadableStreamUnderlyingSource[T]] = js.undefined,
queuingStrategy: js.UndefOr[QueuingStrategy[T]] = js.undefined
): ReadableStream[T] = {
js.Dynamic
.newInstance(js.Dynamic.global.selectDynamic("ReadableStream"))(
ptrdom marked this conversation as resolved.
Show resolved Hide resolved
underlyingSource.asInstanceOf[js.Any],
queuingStrategy.asInstanceOf[js.Any]
)
.asInstanceOf[ReadableStream[T]]
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.scalajs.dom

import scala.scalajs.js
import scala.scalajs.js.annotation._
import scala.scalajs.js.annotation.JSGlobal

/** [[https://streams.spec.whatwg.org/#rs-controller-class ¶3.3 Class ReadableStreamController]] of whatwg spec
*
Expand All @@ -15,7 +15,7 @@ import scala.scalajs.js.annotation._
*/
@js.native
@JSGlobal
class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.Object {
class ReadableStreamController[-T] private[this] () extends js.Object {

/** The desiredSize getter returns the desired size to fill the controlled stream’s internal queue. It can be
* negative, if the queue is over-full. An underlying source should use this information to determine when and how to
Expand All @@ -39,7 +39,7 @@ class ReadableStreamController[-T](stream: ReadableStream[T] = null) extends js.
* @return
* seems like its an undefOr[Int] of the size
*/
def enqueue(chunk: Chunk[T]): js.UndefOr[Int] = js.native
def enqueue(chunk: T): js.UndefOr[Int] = js.native
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** The error method will error the readable stream, making all future interactions with it fail with the given error
* e.
Expand Down
6 changes: 2 additions & 4 deletions dom/src/main/scala/org/scalajs/dom/ReadableStreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ class ReadableStreamReader[+T](stream: ReadableStream[T]) extends js.Object {
*
* If the reader is active, the cancel method behaves the same as that for the associated stream. When done, it
* automatically releases the lock.
*
* //todo determine type of reason
*/
// not actually sure what the return type is here
def cancel(reason: Any): js.Promise[Any] = js.native
def cancel[U](reason: U): js.Promise[U] = js.native
def cancel(): js.Promise[Unit] = js.native
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

/** See [[https://streams.spec.whatwg.org/#reader-read 3.4.4.3. read()]] of whatwg Stream spec.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package org.scalajs.dom

import scala.scalajs.js

/** See [[https://streams.spec.whatwg.org/#underlying-source-api ¶4.2.3. The underlying source API]] of whatwg streams
* spec.
*
* @tparam T
* Type of the Chunks returned by the Stream
*/
trait ReadableStreamUnderlyingSource[T] extends js.Object {

/** A function that is called immediately during creation of the ReadableStream.
*
* If this setup process is asynchronous, it can return a promise to signal success or failure; a rejected promise
* will error the stream. Any thrown exceptions will be re-thrown by the [[ReadableStream]] constructor.
*/
var start: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** A function that is called whenever the stream’s internal queue of chunks becomes not full, i.e. whenever the
* queue’s desired size becomes positive. Generally, it will be called repeatedly until the queue reaches its high
* water mark (i.e. until the desired size becomes non-positive).
*
* This function will not be called until [[start]] successfully completes. Additionally, it will only be called
* repeatedly if it enqueues at least one chunk or fulfills a BYOB request; a no-op [[pull]] implementation will not
* be continually called.
*
* If the function returns a promise, then it will not be called again until that promise fulfills. (If the promise
* rejects, the stream will become errored.) This is mainly used in the case of pull sources, where the promise
* returned represents the process of acquiring a new chunk. Throwing an exception is treated the same as returning a
* rejected promise.
*/
var pull: js.UndefOr[js.Function1[ReadableStreamController[T], js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** A function that is called whenever the consumer cancels the stream, via [[ReadableStream.cancel]] or
* [[ReadableStreamReader.cancel():scala\.scalajs\.js\.Promise[Unit]*]]. It takes as its argument the same value as
* was passed to those methods by the consumer. If the shutdown process is asynchronous, it can return a promise to
* signal success or failure; the result will be communicated via the return value of the [[cancel]] method that was
* called. Additionally, a rejected promise will error the stream, instead of letting it close. Throwing an exception
* is treated the same as returning a rejected promise.
*/
var cancel: js.UndefOr[js.Function1[js.Any, js.UndefOr[js.Promise[Unit]]]] = js.undefined

/** Can be set to "bytes" to signal that the constructed [[ReadableStream]] is a readable byte stream.
*
* Setting any value other than "bytes" or undefined will cause the ReadableStream() constructor to throw an
* exception.
*/
var `type`: js.UndefOr[ReadableStreamType] = js.undefined

/** (byte streams only)
*
* Can be set to a positive integer to cause the implementation to automatically allocate buffers for the underlying
* source code to write into.
*/
var autoAllocateChunkSize: js.UndefOr[Double] = js.undefined
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 2 additions & 2 deletions dom/src/main/scala/org/scalajs/dom/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package org.scalajs

import scala.scalajs.js
import scala.scalajs.js.annotation._
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView}
import scala.scalajs.js.typedarray.{ArrayBuffer, ArrayBufferView, Uint8Array}
import scala.scalajs.js.|

package object dom {
Expand Down Expand Up @@ -32,7 +32,7 @@ package object dom {

/** defined at [[https://fetch.spec.whatwg.org/#body-mixin ¶6.2 Body mixin]] in whatwg Fetch spec */
type BodyInit =
Blob | BufferSource | FormData | String // todo: add URLSearchParams
Blob | BufferSource | FormData | String | ReadableStream[Uint8Array] // todo: add URLSearchParams

/** WebIDL sequence<T> is js.Array[T] | JSIterable[T]. However @mseddon knows at least Blink's IDL compiler treats
* these as simply js.Array[T] for now. We keep this type as a reminder to check in more detail
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package org.scalajs.dom.tests.chrome
import org.scalajs.dom.tests.shared._
import org.scalajs.dom.tests.webworker._

class ChromeTests extends SharedTests with WebWorkerTests
class ChromeTests extends SharedTests with WebWorkerTests with BrowserTests
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ package org.scalajs.dom.tests.firefox
import org.scalajs.dom.tests.shared._
import org.scalajs.dom.tests.webworker._

class FirefoxTests extends SharedTests with WebWorkerTests
class FirefoxTests extends SharedTests with WebWorkerTests with BrowserTests
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package org.scalajs.dom.tests.shared

import org.junit.Assert.assertEquals
import org.junit.Test
import org.scalajs.dom.QueuingStrategy
import org.scalajs.dom.ReadableStream
import org.scalajs.dom.ReadableStreamController
import org.scalajs.dom.ReadableStreamUnderlyingSource
import org.scalajs.dom.tests.shared.AsyncTesting.AsyncResult
import org.scalajs.dom.tests.shared.AsyncTesting._
import org.scalajs.dom.tests.shared.AsyncTesting.async

import scala.concurrent.Future
import scala.scalajs.js
import scala.scalajs.js.Thenable.Implicits._

trait BrowserTests {

@Test
final def ReadableStreamConstructionAndConsumptionTest: AsyncResult = async {
case class Tuna(color: String)

val expectedTunas = Seq(
Tuna("blue"),
Tuna("red")
)
armanbilge marked this conversation as resolved.
Show resolved Hide resolved

val stream = ReadableStream[Tuna](
new ReadableStreamUnderlyingSource[Tuna] {
start = js.defined({ (controller: ReadableStreamController[Tuna]) =>
controller.enqueue(Tuna("blue"))
controller.enqueue(Tuna("red"))
controller.close(): js.UndefOr[js.Promise[Unit]]
}): js.UndefOr[js.Function1[ReadableStreamController[Tuna], js.UndefOr[js.Promise[Unit]]]]
}
)

val reader = stream.getReader()

def read(tunas: Seq[Tuna]): Future[Seq[Tuna]] = {
reader
.read()
.flatMap { chunk =>
if (chunk.done) {
Future.successful(tunas)
} else {
read(tunas :+ chunk.value)
}
}
}
read(Seq.empty)
.map { receivedTunas =>
assertEquals(receivedTunas, expectedTunas)
}
}

@Test
final def ReadableStreamQueueingStrategyTest: AsyncResult = async {
val expectedStrings = Seq(
"short one",
"definitely a longer one"
)

val stream = ReadableStream[String](
new ReadableStreamUnderlyingSource[String] {
start = js.defined({ (controller: ReadableStreamController[String]) =>
controller.enqueue("short one")
controller.enqueue("definitely a longer one")
controller.close(): js.UndefOr[js.Promise[Unit]]
}): js.UndefOr[js.Function1[ReadableStreamController[String], js.UndefOr[js.Promise[Unit]]]]
},
new QueuingStrategy[String] {
var highWaterMark = 1
var size: js.Function1[String, Int] = { (chunk: String) =>
chunk.length
}
}
)

val reader = stream.getReader()

def read(strings: Seq[String]): Future[Seq[String]] = {
armanbilge marked this conversation as resolved.
Show resolved Hide resolved
reader
.read()
.flatMap { chunk =>
if (chunk.done) {
Future.successful(strings)
} else {
read(strings :+ chunk.value)
}
}
}
read(Seq.empty)
.map { receivedStrings =>
assertEquals(receivedStrings, expectedStrings)
}
}
}