-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathObservableRetry.java
87 lines (77 loc) · 2.06 KB
/
ObservableRetry.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package java_rxjava134;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
/**
* Observable retry
*
* when 'retry' is invoked:
* if Obserable encounter an Error
* something will be done repeatly
*
* @author ÌÆÁú
*
*/
public class ObservableRetry {
@SuppressWarnings("deprecation")
public static void main(String[] args) {
// retry
Commons.subscribePrint(Observable.create(new ErrorEmitter()).retry(), "Retry");
// retryWhen
testRetryWhen();
}
/** test for retryWhen */
private static void testRetryWhen() {
@SuppressWarnings("deprecation")
Observable<Integer> when = Observable.create(new ErrorEmitter())
.retryWhen(attempts -> {
return attempts.flatMap(error -> {
if (error instanceof FException) {
System.err.println("Delaying...");
return Observable.timer(100L, TimeUnit.MICROSECONDS);
}
return Observable.error(error);
});
})
.retry((attempts, error) -> {
return (error instanceof SException) && attempts < 0;
});
Commons.subscribePrint(when, "retryWhen");
}
}
/** Exception 1 */
class FException extends RuntimeException {
private static final long serialVersionUID = 1L;
public FException() {
super("FirstException!");
}
}
/** Exception 2 */
class SException extends RuntimeException {
private static final long serialVersionUID = 1L;
public SException() {
super("Second Exception!");
}
}
/** Error emitter */
class ErrorEmitter implements OnSubscribe<Integer> {
private int throwAnErrorCounter = 3;
@Override
public void call(Subscriber<? super Integer> subscriber) {
// if 'retry' is invoked this code will be executed {throwAnErrorCounter} times
subscriber.onNext(-1);
if (throwAnErrorCounter > 1) {
throwAnErrorCounter--;
subscriber.onError(new FException());
return;
}
if (throwAnErrorCounter > 0) {
throwAnErrorCounter--;
subscriber.onError(new SException());
return;
}
subscriber.onNext(1);
subscriber.onCompleted();
}
}