Skip to content

Commit

Permalink
introduce reduce with inout accumulator parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
kjaklinovic committed Jul 13, 2024
1 parent 6491a16 commit 4a387e2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 13 deletions.
58 changes: 54 additions & 4 deletions RxSwift/Observables/Reduce.swift
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,40 @@


extension ObservableType {

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.
For aggregation behavior with incremental intermediate results, see `scan`.
- seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html)
- parameter seed: The initial accumulator value.
- parameter accumulator: A accumulator function to be invoked on each element.
- parameter mapResult: A function to transform the final accumulator value into the result value.
- returns: An observable sequence containing a single element with the final accumulator value.
*/
public func reduce<A, Result>(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void, mapResult: @escaping (A) throws -> Result)
-> Observable<Result> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult)
}

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.
For aggregation behavior with incremental intermediate results, see `scan`.
- seealso: [reduce operator on reactivex.io](http://reactivex.io/documentation/operators/reduce.html)
- parameter seed: The initial accumulator value.
- parameter accumulator: A accumulator function to be invoked on each element.
- returns: An observable sequence containing a single element with the final accumulator value.
*/
public func reduce<A>(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void)
-> Observable<A> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 })
}

/**
Applies an `accumulator` function over an observable sequence, returning the result of the aggregation as a single element in the result sequence. The specified `seed` value is used as the initial accumulator value.
Expand All @@ -22,7 +56,15 @@ extension ObservableType {
*/
public func reduce<A, Result>(_ seed: A, accumulator: @escaping (A, Element) throws -> A, mapResult: @escaping (A) throws -> Result)
-> Observable<Result> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: mapResult)
Reduce(
source: self.asObservable(),
seed: seed,
accumulator: { acc, element in
let currentAcc = acc
acc = try accumulator(currentAcc, element)
},
mapResult: mapResult
)
}

/**
Expand All @@ -38,7 +80,15 @@ extension ObservableType {
*/
public func reduce<A>(_ seed: A, accumulator: @escaping (A, Element) throws -> A)
-> Observable<A> {
Reduce(source: self.asObservable(), seed: seed, accumulator: accumulator, mapResult: { $0 })
Reduce(
source: self.asObservable(),
seed: seed,
accumulator: { acc, element in
let currenctAcc = acc
acc = try accumulator(currenctAcc, element)
},
mapResult: { $0 }
)
}
}

Expand All @@ -60,7 +110,7 @@ final private class ReduceSink<SourceType, AccumulateType, Observer: ObserverTyp
switch event {
case .next(let value):
do {
self.accumulation = try self.parent.accumulator(self.accumulation, value)
try self.parent.accumulator(&self.accumulation, value)
}
catch let e {
self.forwardOn(.error(e))
Expand All @@ -85,7 +135,7 @@ final private class ReduceSink<SourceType, AccumulateType, Observer: ObserverTyp
}

final private class Reduce<SourceType, AccumulateType, ResultType>: Producer<ResultType> {
typealias AccumulatorType = (AccumulateType, SourceType) throws -> AccumulateType
typealias AccumulatorType = (inout AccumulateType, SourceType) throws -> Void
typealias ResultSelectorType = (AccumulateType) throws -> ResultType

private let source: Observable<SourceType>
Expand Down
17 changes: 8 additions & 9 deletions Tests/RxSwiftTests/Observable+ReduceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ extension ObservableReduceTest {
.completed(250)
])


let res = scheduler.start { xs.reduce(42, accumulator: +) }

let correctMessages = Recorded.events(
Expand All @@ -47,7 +46,7 @@ extension ObservableReduceTest {
.completed(250)
])

let res = scheduler.start { xs.reduce(42, accumulator: +) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) }

let correctMessages = Recorded.events(
.next(250, 42 + 24),
Expand Down Expand Up @@ -91,7 +90,7 @@ extension ObservableReduceTest {
.next(150, 1),
])

let res = scheduler.start { xs.reduce(42, accumulator: +) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=) }

let correctMessages: [Recorded<Event<Int>>] = [
]
Expand Down Expand Up @@ -146,9 +145,9 @@ extension ObservableReduceTest {
])

let res = scheduler.start {
xs.reduce(42) { (a: Int, x: Int) throws -> Int in
xs.reduce(into: 42) { (a: inout Int, x: Int) throws -> Void in
if x < 3 {
return a + x
a += x
}
else {
throw testError
Expand Down Expand Up @@ -200,7 +199,7 @@ extension ObservableReduceTest {
.completed(250)
])

let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) }

let correctMessages = Recorded.events(
.next(250, (42 + 24) * 5),
Expand Down Expand Up @@ -244,7 +243,7 @@ extension ObservableReduceTest {
.next(150, 1),
])

let res = scheduler.start { xs.reduce(42, accumulator: +, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: +=, mapResult: { $0 * 5 }) }

let correctMessages: [Recorded<Event<Int>>] = [
]
Expand Down Expand Up @@ -298,7 +297,7 @@ extension ObservableReduceTest {
.completed(260)
])

let res = scheduler.start { xs.reduce(42, accumulator: { a, x in if x < 3 { return a + x } else { throw testError } }, mapResult: { $0 * 5 }) }
let res = scheduler.start { xs.reduce(into: 42, accumulator: { a, x in if x < 3 { a += x } else { throw testError } }, mapResult: { $0 * 5 }) }

let correctMessages = [
Recorded.error(240, testError, Int.self)
Expand Down Expand Up @@ -345,7 +344,7 @@ extension ObservableReduceTest {
}

func testReduceReleasesResourcesOnError() {
_ = Observable<Int>.just(1).reduce(0, accumulator: +).subscribe()
_ = Observable<Int>.just(1).reduce(into: 0, accumulator: +=).subscribe()
}
#endif
}

0 comments on commit 4a387e2

Please sign in to comment.