From 1b7b6f0e3c903b3614f131613a541910728dee17 Mon Sep 17 00:00:00 2001 From: Ihar Abramovich Date: Wed, 19 Jul 2017 13:15:52 +0300 Subject: [PATCH] Add filterSome and filterNone extensions for Flowable, Maybe and Single. (#12) --- .../com/gojuno/koptional/rxjava2/rxjava2.kt | 90 +++++ .../rxjava2/RxJava2ExtensionsSpec.kt | 331 ++++++++++++++++-- 2 files changed, 394 insertions(+), 27 deletions(-) diff --git a/koptional-rxjava2-extensions/src/main/kotlin/com/gojuno/koptional/rxjava2/rxjava2.kt b/koptional-rxjava2-extensions/src/main/kotlin/com/gojuno/koptional/rxjava2/rxjava2.kt index 8eb7c4d..bf27fb9 100644 --- a/koptional-rxjava2-extensions/src/main/kotlin/com/gojuno/koptional/rxjava2/rxjava2.kt +++ b/koptional-rxjava2-extensions/src/main/kotlin/com/gojuno/koptional/rxjava2/rxjava2.kt @@ -3,9 +3,15 @@ package com.gojuno.koptional.rxjava2 import com.gojuno.koptional.None import com.gojuno.koptional.Optional import com.gojuno.koptional.Some +import io.reactivex.Flowable +import io.reactivex.Maybe import io.reactivex.Observable +import io.reactivex.Single private inline fun Observable<*>.ofType(): Observable = ofType(R::class.java) +private inline fun Flowable<*>.ofType(): Flowable = ofType(R::class.java) +private inline fun Maybe<*>.ofType(): Maybe = ofType(R::class.java) +private inline fun Single<*>.ofType(): Maybe = filter { it is R }.cast(R::class.java) /** * Filters items emitted by an ObservableSource by only emitting those that are `Some`. @@ -21,6 +27,48 @@ private inline fun Observable<*>.ofType(): Observable = ofT */ fun Observable>.filterSome(): Observable = ofType>().map { it.value } +/** + * Filters items emitted by a Publisher by only emitting those that are `Some`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Flowable that emits `Some.value` of those items emitted by the source Publisher that are `Some`. + * @see ReactiveX operators documentation: Filter + */ +fun Flowable>.filterSome(): Flowable = ofType>().map { it.value } + +/** + * Filters items emitted by an MaybeSource by only emitting those that are `Some`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Maybe that emits `Some.value` of those items emitted by the source MaybeSource that are `Some`. + * @see ReactiveX operators documentation: Filter + */ +fun Maybe>.filterSome(): Maybe = ofType>().map { it.value } + +/** + * Filters items emitted by an SingleSource by only emitting those that are `Some`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Maybe that emits `Some.value` of those items emitted by the source SingleSource that are `Some`. + * @see ReactiveX operators documentation: Filter + */ +fun Single>.filterSome(): Maybe = ofType>().map { it.value } + /** * Filters items emitted by an ObservableSource by only emitting those that are `None`. *

@@ -34,3 +82,45 @@ fun Observable>.filterSome(): Observable = ofTypeReactiveX operators documentation: Filter */ fun Observable>.filterNone(): Observable = ofType().map { Unit } + +/** + * Filters items emitted by an Publisher by only emitting those that are `None`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Flowable that emits `Unit` for each item emitted by the source Publisher that is `None`. + * @see ReactiveX operators documentation: Filter + */ +fun Flowable>.filterNone(): Flowable = ofType().map { Unit } + +/** + * Filters items emitted by an MaybeSource by only emitting those that are `None`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Maybe that emits `Unit` for each item emitted by the source MaybeSource that is `None`. + * @see ReactiveX operators documentation: Filter + */ +fun Maybe>.filterNone(): Maybe = ofType().map { Unit } + +/** + * Filters items emitted by an SingleSource by only emitting those that are `None`. + *

+ * + *

+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return an Maybe that emits `Unit` for each item emitted by the source SingleSource that is `None`. + * @see ReactiveX operators documentation: Filter + */ +fun Single>.filterNone(): Maybe = ofType().map { Unit } \ No newline at end of file diff --git a/koptional-rxjava2-extensions/src/test/kotlin/com/gojuno/koptional/rxjava2/RxJava2ExtensionsSpec.kt b/koptional-rxjava2-extensions/src/test/kotlin/com/gojuno/koptional/rxjava2/RxJava2ExtensionsSpec.kt index 706d1c1..8f46dee 100644 --- a/koptional-rxjava2-extensions/src/test/kotlin/com/gojuno/koptional/rxjava2/RxJava2ExtensionsSpec.kt +++ b/koptional-rxjava2-extensions/src/test/kotlin/com/gojuno/koptional/rxjava2/RxJava2ExtensionsSpec.kt @@ -1,62 +1,339 @@ package com.gojuno.koptional.rxjava2 import com.gojuno.koptional.None +import com.gojuno.koptional.Optional import com.gojuno.koptional.Some +import io.reactivex.Flowable +import io.reactivex.Maybe import io.reactivex.Observable +import io.reactivex.Single import io.reactivex.observers.TestObserver +import io.reactivex.subscribers.TestSubscriber import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.context import org.jetbrains.spek.api.dsl.it class RxJava2ExtensionsSpec : Spek({ - val stream by memoized { - Observable.just(Some("a"), None, Some("b"), Some("c"), None) - } + context("Observable") { + + val stream by memoized { + Observable.just(Some("a"), None, Some("b"), Some("c"), None) + } + + context("filterSome") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterSome() + .subscribe(this) + } + } - context("filterSome") { + it("passes only Some values") { + subscriber.assertValues("a", "b", "c") + } + + it("completes stream") { + subscriber.assertComplete() + } - val subscriber by memoized { - TestObserver().apply { - stream - .filterSome() - .subscribe(this) + it("does not emit error") { + subscriber.assertNoErrors() } } - it("passes only Some values") { - subscriber.assertValues("a", "b", "c") + context("filterNone") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("passes only None values as Unit") { + subscriber.assertValues(Unit, Unit) + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } } + } + + context("Flowable") { + + val stream by memoized { + Flowable.just(Some("a"), None, Some("b"), Some("c"), None) + } + + context("filterSome") { + + val subscriber by memoized { + TestSubscriber().apply { + stream + .filterSome() + .subscribe(this) + } + } + + it("passes only Some values") { + subscriber.assertValues("a", "b", "c") + } - it("completes stream") { - subscriber.assertComplete() + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } } - it("does not emit error") { - subscriber.assertNoErrors() + context("filterNone") { + + val subscriber by memoized { + TestSubscriber().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("passes only None values as Unit") { + subscriber.assertValues(Unit, Unit) + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } } } - context("filterNone") { + context("Maybe") { + + context("Stream with Some") { + val stream by memoized { + Maybe.just(Some("a") as Optional) + } + + context("filterSome") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterSome() + .subscribe(this) + } + } + + it("passes only Some values") { + subscriber.assertValues("a") + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } + + context("filterNone") { - val subscriber by memoized { - TestObserver().apply { - stream - .filterNone() - .subscribe(this) + val subscriber by memoized { + TestObserver().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("do not emit values") { + subscriber.assertNoValues() + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } } + } - it("passes only None values as Unit") { - subscriber.assertValues(Unit, Unit) + context("Stream with None") { + val stream by memoized { + Maybe.just(None as Optional) + } + + context("filterSome") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterSome() + .subscribe(this) + } + } + + it("do not emit values") { + subscriber.assertNoValues() + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } + + context("filterNone") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("passes only None value as Unit") { + subscriber.assertValue(Unit) + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } } + } + + context("Single") { + + context("Stream with Some") { + val stream by memoized { + Single.just(Some("a") as Optional) + } + + context("filterSome") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterSome() + .subscribe(this) + } + } + + it("passes only Some values") { + subscriber.assertValues("a") + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } + + context("filterNone") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("do not emit values") { + subscriber.assertNoValues() + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } - it("completes stream") { - subscriber.assertComplete() } - it("does not emit error") { - subscriber.assertNoErrors() + context("Stream with None") { + val stream by memoized { + Single.just(None as Optional) + } + + context("filterSome") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterSome() + .subscribe(this) + } + } + + it("do not emit values") { + subscriber.assertNoValues() + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } + + context("filterNone") { + + val subscriber by memoized { + TestObserver().apply { + stream + .filterNone() + .subscribe(this) + } + } + + it("passes only None value as Unit") { + subscriber.assertValue(Unit) + } + + it("completes stream") { + subscriber.assertComplete() + } + + it("does not emit error") { + subscriber.assertNoErrors() + } + } } } })