-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathReactiveSum3.java
134 lines (121 loc) · 3.2 KB
/
ReactiveSum3.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package java_rxjava134;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.functions.Func1;
import rx.observables.ConnectableObservable;
import rx.subjects.BehaviorSubject;
/**
* Reactive sum
*
* @author ÌÆÁú
*
*/
public class ReactiveSum3 {
public static void main(String[] args) {
ConnectableObservable<String> input = from(System.in);
Observable<Double> a = varStream("a", input);
Observable<Double> b = varStream("b", input);
ReactiveDoSum3 sum = new ReactiveDoSum3(a, b);
System.out.println(sum.getC());
input.connect();
}
/**
* from1
* @param stream
* @return
*/
static ConnectableObservable<String> from(final InputStream stream) {
return from(new BufferedReader(new InputStreamReader(stream)));
}
/**
* from2
* @param reader
* @return
*/
@SuppressWarnings("deprecation")
static ConnectableObservable<String> from(final BufferedReader reader) {
return Observable.create(new OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
if (subscriber.isUnsubscribed()) {
return;
}
try {
String line;
while(!subscriber.isUnsubscribed() && (line = reader.readLine()) != null) {
if (line == null || line.equals("exit")) {
break;
}
subscriber.onNext(line);
}
} catch (IOException e) {
subscriber.onError(e);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
}).publish();
}
/**
* varStream
* @param varName
* @param input
* @return
*/
public static Observable<Double> varStream(final String varName, Observable<String> input) {
final Pattern pattern = Pattern.compile("\\^s*" + varName + "\\s*[:|=]\\s*(-?\\d+\\.?\\d*)$");
return input
.map(new Func1<String, Matcher>() {
@Override
public Matcher call(String str) {
return pattern.matcher(str);
}
})
.filter(new Func1<Matcher, Boolean>() {
@Override
public Boolean call(Matcher matcher) {
return matcher.matches() && matcher.group(1) != null;
}
})
.map(new Func1<Matcher, Double>() {
@Override
public Double call(Matcher matcher) {
return Double.parseDouble(matcher.group(1));
}
});
}
}
/** ReactiveDoSum class */
final class ReactiveDoSum3 implements Observer<Double> {
private BehaviorSubject<Double> c = BehaviorSubject.create(0.0);
public ReactiveDoSum3(Observable<Double> a, Observable<Double> b) {
Observable.combineLatest(a, b, (x, y) -> x + y).subscribe(c);
}
public double getC() {
return c.getValue();
}
public Observable<Double> obsC() {
return c.asObservable();
}
@Override
public void onCompleted() {
System.out.println("Completed...");
}
@Override
public void onError(Throwable arg0) {
System.out.println("Error...");
}
@Override
public void onNext(Double arg0) {
System.out.println("Next...");
}
}