Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added websocket event sources #60

Open
wants to merge 43 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
694e53d
New: Var.updater and Var.tryUpdater. Fixes #41
raquo Dec 13, 2020
bf18b86
New: Make `sample` and `withCurrentValueOf` available on Signal. Fixe…
raquo Dec 13, 2020
eaca9ca
Fix: Delay operator now clears the pending queue when stopped
raquo Dec 26, 2020
66d3c95
Fix: Debounce operator now clears timeout on stop
raquo Dec 26, 2020
31493bf
New: AjaxEventStream
ajaychandran Dec 27, 2020
d703b2f
API: Clear pending ajax request when stream is stopped
raquo Dec 27, 2020
42adddb
New: DomEventStream
raquo Dec 27, 2020
167e2db
Docs: Add new web streams
raquo Dec 27, 2020
02aeaac
Added Websocket event sources for #49
ajaychandran Dec 27, 2020
5bcde0b
Rename websocketPath to websocketUrl
ajaychandran Dec 27, 2020
bfa3243
Separate constructors for unidirectional and bidirectional usecases
ajaychandran Dec 27, 2020
5e72c72
Fix unidirectional constructor
ajaychandran Dec 27, 2020
71b0903
Defer callback registration
ajaychandran Dec 27, 2020
5cafb9f
Revised error handling
ajaychandran Dec 27, 2020
dae60cd
Added project parameter
ajaychandran Dec 28, 2020
7a19cdd
Fix names
ajaychandran Dec 28, 2020
040352e
Redesigned error type
ajaychandran Dec 28, 2020
8883fdb
Remove unused alias
ajaychandran Dec 28, 2020
181a487
Build: Use sbt-tpolcat for scalac warnings; bump sbt and Scala versio…
ngbinh Dec 30, 2020
874a2f9
New: AjaxEventStream
ajaychandran Dec 27, 2020
86a57d0
API: Clear pending ajax request when stream is stopped
raquo Dec 27, 2020
d4c9e0b
New: DomEventStream
raquo Dec 27, 2020
05ba3a8
Docs: Add new web streams
raquo Dec 27, 2020
26a75b8
New: EventStream.{withCallback, withObserver}
raquo Dec 28, 2020
c05abf9
New: Custom event sources, rework AjaxEventStream, and more
raquo Dec 29, 2020
bb1730a
2.12: Fix SetCurrentValue signature
raquo Dec 29, 2020
9a1378a
New: Observer.contramapSome, Var.someWriter
raquo Dec 30, 2020
f018b41
New: requestObserver for Ajax requests; Also some fixes:
raquo Dec 30, 2020
764da32
Define error ADT
ajaychandran Dec 30, 2020
167c44b
Revise builders
ajaychandran Dec 30, 2020
37d7600
Fix build errors
ajaychandran Dec 30, 2020
0f98b00
Added optional observers
ajaychandran Dec 30, 2020
2271c45
Added binary data extractors
ajaychandran Dec 30, 2020
578d62c
Better names
ajaychandran Dec 31, 2020
b1bef32
Add/remove final modifier
ajaychandran Dec 31, 2020
aa86998
Merge branch 'next-0.12' into wip-websockets
ajaychandran Jan 1, 2021
0fd099a
Updated docs
ajaychandran Jan 1, 2021
fb7d8ed
Added protocol parameter
ajaychandran Jan 4, 2021
270e05a
Redesign
ajaychandran Jan 6, 2021
1d7e9bb
Connect if subscribed
ajaychandran Jan 6, 2021
97b1067
Format
ajaychandran Jan 6, 2021
54edb62
Make protocol parameter optional
ajaychandran Jan 11, 2021
219d1d9
Update local socket reference in callbacks
ajaychandran Jan 11, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,5 @@ target
.DS_Store

yarn.lock

/.bsp
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ jdk:
- oraclejdk11

scala:
- 2.12.10
- 2.13.1
- 2.12.12
- 2.13.4

script:
- sbt ++$TRAVIS_SCALA_VERSION test
Expand Down
103 changes: 101 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ I created Airstream because I found existing solutions were not suitable for bui
* [EventStream.fromSeq](#eventstreamfromseq)
* [EventStream.periodic](#eventstreamperiodic)
* [EventStream.empty](#eventstreamempty)
* [EventStream.withCallback and withObserver](#eventstreamwithcallback-and-withobserver)
* [EventBus](#eventbus)
* [Var](#var)
* [Val](#val)
* [Ajax](#ajax)
* [Websockets](#websockets)
* [DOM Events](#dom-events)
* [Custom Observables](#custom-observables)
* [FRP Glitches](#frp-glitches)
* [Other Libraries](#other-libraries)
Expand Down Expand Up @@ -207,6 +211,8 @@ You can use `stream.withCurrentValueOf(signal).map((lastStreamEvent, signalCurre

If you don't need lastStreamEvent, use `stream.sample(signal).map(signalCurrentValue => ???)` instead. Note: both of these output streams will emit only when `stream` emits, as documented in the code. If you want updates from signal to also trigger an event, look into the `combineWith` operator.

`withCurrentValueOf` and `sample` operators are also available on signals, not just streams.

If you want to get a Signal's current value without the complications of sampling, or even if you just want to make sure that a Signal is started, just call `observe` on it. That will add a noop observer to the signal, and return a `SignalViewer` instance which being a `StrictSignal`, does expose `now()` and `tryNow()` methods that safely provide you with its current value.


Expand Down Expand Up @@ -434,6 +440,22 @@ The underlying `PeriodicEventStream` class offers more functionality, including
A stream that never emits any events.


#### `EventStream.withCallback` and `withObserver`

`EventStream.withCallback[A]` Creates and returns a stream and an `A => Unit` callback that, when called, passes the input value to that stream. Of course, as streams are lazy, the stream will only emit if it has observers.

```scala
val (stream, callback) = EventStream.withCallback[Int]
callback(1) // nothing happens because stream has no observers
stream.foreach(println)
callback(2) // `2` will be printed
```

`EventStream.withJsCallback[A]` works similarly except it returns a js.Function for easier integration with Javascript libraries.

`EventStream.withObserver[A]` works similarly but creates an observer, which among other conveniences passes the errors that it receives into the stream.


#### EventBus

`new EventBus[MyEvent]` is the general-purpose way to create a stream on which you can manually trigger events. The resulting EventBus exposes two properties:
Expand Down Expand Up @@ -496,7 +518,26 @@ Creating a Var is straightforward: `Var(initialValue)`, `Var.fromTry(tryValue)`.

You can update a Var using one of its methods: `set(value)`, `setTry(Try(value))`, `update(currentValue => nextValue)`, `tryUpdate(currentValueTry => Try(nextValue))`. Note that `update` will throw if the Var's current value is an error (thus `tryUpdate`).

Every Var also provides an Observer (`.writer`) that you can use where an Observer is expected, or if you want to provide your code with write-only access to a Var.
##### Observers Feeding into Var

Every Var provides a `writer` which is an Observer that writes input values into the Var. It may be useful to provide your code with write-only access to a Var, or to a subset of the data in the Var by means of the Observer's `contramap` method.

In addition to `writer`, Var also offers `updater`s, making it easy to create an Observer that updates the Var based on both the Observer's input value and the Var's current value:

```scala
val v = Var(List(1, 2, 3))
val adder = v.updater[Int]((currValue, nextInput) => currValue :+ nextInput)

adder.onNext(4)
v.now() // List(1, 2, 3, 4)

val inputStream: EventStream[Int] = ???

inputStream.foreach(adder)
inputStream --> adder // Laminar syntax
```

`updater` will fail to update if the Var is in a failed state, for those cases we have `tryUpdater`.

##### Reading Values from a Var

Expand Down Expand Up @@ -542,12 +583,70 @@ Remember that this atomicity guarantee only applies to failures which would have
Val is useful when a component wants to accept either a Signal or a constant value as input. You can just wrap your constant in a Val, and make the component accept a `Signal` (or a `StrictSignal`) instead.



#### Ajax

Airstream now has a built-in way to perform Ajax requests:

```scala
AjaxEventStream
.get("/api/kittens") // EventStream[dom.XMLHttpRequest]
.map(req => req.responseText) // EventStream[String]
```

Methods for POST, PUT, PATCH, and DELETE are also available.

The request is made every time the stream is started. If the stream is stopped while the request is pending, the old request will not be cancelled, but its result will be discarded.

If the request times out, is aborted, returns an HTTP status code that isn't 2xx or 304, or fails in any other way, the stream will emit an `AjaxStreamError`.

If you want a stream that never fails, a stream that emits an event regardless of all those errors, call `.completeEvents` on your ajax stream.

You can listen for `progress` or `readyStateChange` events by passing in the corresponding observers to `AjaxEventStream.get` et al, for example:

```scala
val (progressObserver, $progress) = EventStream.withObserver[(dom.XMLHttpRequest, dom.ProgressEvent)]

val $request = AjaxEventStream.get(
url = "/api/kittens",
progressObserver = progressObserver
)

val $bytesLoaded = $progress.map2((xhr, ev) => ev.loaded)
```

In a similar manner, you can pass a `requestObserver` that will be called with the newly created `dom.XMLHttpRequest` just before the request is sent. This way you can save the pending request into a Var and e.g. `abort()` it if needed.

Warning: dom.XmlHttpRequest is an ugly, imperative JS construct. We set event callbacks for onload, onerror, onabort, ontimeout, and if requested, also for onprogress and onreadystatechange. Make sure you don't override Airstream's listeners, or this stream will not work properly.


### Websockets

TODO.

### DOM Events

```scala
val element: dom.Element = ???
DomEventStream[dom.MouseEvent](element, "click") // EventStream[dom.MouseEvent]
```

This stream, when started, registers a `click` event listener on `element`, and emits all events the listener receives until the stream is stopped, at which point the listener is removed.

Airstream does not know the names & types of DOM events, so you need to manually specify both. You can get those manually from MDN or programmatically from event props such as `onClick` available in Laminar.

`DomEventStream` works not just on elements but on any `dom.raw.EventTarget`. However, make sure to check browser compatibility for fancy EventTarget-s such as XMLHttpRequest.



#### Custom Observables

EventBus is a very generic solution that should suit most needs, even if perhaps not very elegantly sometimes.

You can create your own observables that emit events in their own unique way by wrapping or extending EventBus (easier) or extending Observable (more work and knowledge required, but rewarded with better behavior)).

If extending Observable, you will need to make the `topoRank` field public to be able to override it. See [#37](https://github.com/raquo/Airstream/issues/37).

Unfortunately I don't have enough time to describe how to create custom observables in detail right now. You will need to read the rest of the documentation and the source code – you will see how other observables such as MapEventStream or FilterEventStream are implemented. Airstream's source code should be easy to comprehend. It is clean, small (a bit more than 1K LoC with all the operators), and does not use complicated implicits or hardcore functional stuff.


Expand Down Expand Up @@ -1114,7 +1213,7 @@ stream.recoverToTry.collect { case Failure(err) => err } // EventStream[Throwabl
## Limitations

* Airstream only runs on Scala.js because its primary intended use case is unidirectional dataflow architecture on the frontend. I have no plans to make it run on the JVM. It would require too much of my time and too much compromise, complicating the API to support a completely different environment and use cases.
* Airstream has no concept of observables "completing". Personally I don't think this is a limitation, but I can see it being viewed as such. See [Issue #23](https://github.com/raquo/Airstream/issues/23).
* Airstream has no concept of observables "completing". Personally I don't think this is much of a limitation, but I can see it being viewed as such. See [Issue #23](https://github.com/raquo/Airstream/issues/23).


## My Related Projects
Expand Down
18 changes: 12 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,18 @@ libraryDependencies ++= Seq(
"org.scalatest" %%% "scalatest" % "3.2.0" % Test
)

scalacOptions ++= Seq(
// "-deprecation",
"-feature",
"-language:higherKinds",
"-language:implicitConversions"
)
val filterScalacOptions = { options: Seq[String] =>
options.filterNot(Set(
"-Ywarn-value-discard",
"-Wvalue-discard"
))
}

scalaVersion := "2.13.4"

crossScalaVersions := Seq("2.12.12", "2.13.4")

scalacOptions ~= filterScalacOptions

// @TODO[Build] Why does this need " in (Compile, doc)" while other options don't?
scalacOptions in (Compile, doc) ++= Seq(
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.3.4
sbt.version = 1.4.6
2 changes: 2 additions & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.1")
addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.8")

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.8.1")

addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.16")
4 changes: 0 additions & 4 deletions release.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ normalizedName := "airstream"

organization := "com.raquo"

scalaVersion := "2.13.3"

crossScalaVersions := Seq("2.12.11", "2.13.3")

homepage := Some(url("https://github.com/raquo/Airstream"))

licenses += ("MIT", url("https://github.com/raquo/Airstream/blob/master/LICENSE.md"))
Expand Down
3 changes: 3 additions & 0 deletions src/main/scala-2.12/scala/annotation/unused.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package scala.annotation

final class unused extends deprecated("unused", "unused")
7 changes: 4 additions & 3 deletions src/main/scala/com/raquo/airstream/core/Observable.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.raquo.airstream.core

import com.raquo.airstream.eventstream.EventStream
import com.raquo.airstream.features.{FlattenStrategy, Splittable}
import com.raquo.airstream.features.FlattenStrategy
import com.raquo.airstream.features.FlattenStrategy.{SwitchSignalStrategy, SwitchStreamStrategy}
import com.raquo.airstream.ownership.{Owner, Subscription}
import com.raquo.airstream.signal.Signal
import org.scalajs.dom

import scala.annotation.unused
import scala.scalajs.js
import scala.util.Try

Expand Down Expand Up @@ -90,7 +91,7 @@ trait Observable[+A] {
def debugLog(prefix: String = "event", when: A => Boolean = _ => true): Self[A] = {
map(value => {
if (when(value)) {
println(prefix + ": ", value.asInstanceOf[js.Any])
println(prefix + ": " + value.asInstanceOf[js.Any])
}
value
})
Expand Down Expand Up @@ -150,7 +151,7 @@ trait Observable[+A] {
subscription
}

@inline protected def onAddedExternalObserver(observer: Observer[A]): Unit = ()
@inline protected def onAddedExternalObserver(@unused observer: Observer[A]): Unit = ()

/** Child observable should call this method on its parents when it is started.
* This observable calls [[onStart]] if this action has given it its first observer (internal or external).
Expand Down
11 changes: 9 additions & 2 deletions src/main/scala/com/raquo/airstream/core/Observer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package com.raquo.airstream.core

import com.raquo.airstream.core.AirstreamError.{ObserverError, ObserverErrorHandlingError}

import scala.scalajs.js
import scala.util.{Failure, Success, Try}

trait Observer[-A] {

lazy val toJsFn1: js.Function1[A, Unit] = onNext

/** Note: must not throw! */
def onNext(nextValue: A): Unit

Expand All @@ -31,6 +34,11 @@ trait Observer[-A] {
)
}

/** Available only on Observers of Option, this is a shortcut for contramap[B](Some(_)) */
def contramapSome[V](implicit evidence: Option[V] <:< A): Observer[V] = {
contramap[V](value => evidence(Some(value)))
}

/** Like `contramap` but with `collect` semantics: not calling the original observer when `pf` is not defined */
def contracollect[B](pf: PartialFunction[B, A]): Observer[B] = {
Observer.withRecover(
Expand All @@ -53,7 +61,6 @@ trait Observer[-A] {

object Observer {


/** An observer that does nothing. Use it to ensure that an Observable is started
*
* Used by SignalView and EventStreamView
Expand Down Expand Up @@ -126,7 +133,7 @@ object Observer {
if (onTryParam.isDefinedAt(nextValue)) {
onTryParam(nextValue)
} else {
nextValue.fold(err => AirstreamError.sendUnhandledError(err), identity)
nextValue.fold(err => AirstreamError.sendUnhandledError(err), _ => ())
}
} catch {
case err: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.raquo.airstream.custom

import com.raquo.airstream.custom.CustomSource._
import com.raquo.airstream.signal.Signal

import scala.util.{Success, Try}

// @TODO[Test] needs testing

/** Use this to easily create a custom signal from an external source
*
* See docs on custom sources, and [[CustomSource.Config]]
*/
class CustomSignalSource[A] (
getInitialValue: => Try[A],
makeConfig: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => CustomSource.Config,
) extends Signal[A] with CustomSource[A] {

override protected[this] def initialValue: Try[A] = getInitialValue

override protected[this] val config: Config = makeConfig(_fireTry, tryNow, getStartIndex, getIsStarted)
}

object CustomSignalSource {

def apply[A](
initial: => A
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](Success(initial), config)
}

def fromTry[A](
initial: => Try[A]
)(
config: (SetCurrentValue[A], GetCurrentValue[A], GetStartIndex, GetIsStarted) => Config
): Signal[A] = {
new CustomSignalSource[A](initial, config)
}
}
Loading