From b060a1d940a3523aac415a4f06e35324edf3f05d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 12 Dec 2016 15:13:37 +0100 Subject: [PATCH 1/3] Reimplement split() with backpressure --- build.gradle | 2 +- .../java/rx/observables/ObservableSplit.java | 336 ++++++++++++++++++ .../java/rx/observables/StringObservable.java | 60 +--- .../rx/observables/ObservableSplitTest.java | 116 ++++++ 4 files changed, 455 insertions(+), 59 deletions(-) create mode 100644 src/main/java/rx/observables/ObservableSplit.java create mode 100644 src/test/java/rx/observables/ObservableSplitTest.java diff --git a/build.gradle b/build.gradle index f4f4a93..08b7635 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ apply plugin: 'rxjava-project' apply plugin: 'java' dependencies { - compile 'io.reactivex:rxjava:1.1.1' + compile 'io.reactivex:rxjava:1.2.3' testCompile 'junit:junit-dep:4.10' testCompile 'org.mockito:mockito-core:1.8.5' } diff --git a/src/main/java/rx/observables/ObservableSplit.java b/src/main/java/rx/observables/ObservableSplit.java new file mode 100644 index 0000000..3fea720 --- /dev/null +++ b/src/main/java/rx/observables/ObservableSplit.java @@ -0,0 +1,336 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.observables; + +import java.util.Queue; +import java.util.concurrent.atomic.*; +import java.util.regex.Pattern; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.exceptions.Exceptions; +import rx.internal.operators.BackpressureUtils; +import rx.internal.util.atomic.SpscAtomicArrayQueue; +import rx.internal.util.unsafe.*; + +/** + * Split a sequence of strings based on a Rexexp pattern spanning subsequent + * items if necessary. + */ +final class ObservableSplit implements OnSubscribe { + + final Observable source; + + final Pattern pattern; + + final int bufferSize; + + ObservableSplit(Observable source, Pattern pattern, int bufferSize) { + this.source = source; + this.pattern = pattern; + this.bufferSize = bufferSize; + } + + @Override + public void call(Subscriber t) { + SplitSubscriber parent = new SplitSubscriber(t, pattern, bufferSize); + t.add(parent.requested); + t.setProducer(parent.requested); + + source.unsafeSubscribe(parent); + } + + static final class SplitSubscriber extends Subscriber { + + final Subscriber actual; + + final Pattern pattern; + + final Requested requested; + + final AtomicInteger wip; + + final int limit; + + final Queue queue; + + String[] current; + + String leftOver; + + int index; + + int produced; + + int empty; + + volatile boolean done; + Throwable error; + + volatile boolean cancelled; + + SplitSubscriber(Subscriber actual, Pattern pattern, int bufferSize) { + this.actual = actual; + this.pattern = pattern; + this.limit = bufferSize - (bufferSize >> 2); + this.requested = new Requested(); + this.wip = new AtomicInteger(); + if (UnsafeAccess.isUnsafeAvailable()) { + this.queue = new SpscArrayQueue(bufferSize); + } else { + this.queue = new SpscAtomicArrayQueue(bufferSize); + } + request(bufferSize); + } + + @Override + public void onNext(String t) { + String lo = leftOver; + String[] a; + try { + if (lo == null || lo.isEmpty()) { + a = pattern.split(t, -1); + } else { + a = pattern.split(lo + t, -1); + } + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + unsubscribe(); + onError(ex); + return; + } + + if (a.length == 0) { + leftOver = null; + request(1); + return; + } else + if (a.length == 1) { + leftOver = a[0]; + request(1); + return; + } + leftOver = a[a.length - 1]; + queue.offer(a); + drain(); + } + + @Override + public void onError(Throwable e) { + if (done) { +// RxJavaHooks.onError(e); RxJava 1.2+ required + return; + } + String lo = leftOver; + if (lo != null && !lo.isEmpty()) { + leftOver = null; + queue.offer(new String[] { lo, null }); + } + error = e; + done = true; + drain(); + } + + @Override + public void onCompleted() { + if (!done) { + done = true; + String lo = leftOver; + if (lo != null && !lo.isEmpty()) { + leftOver = null; + queue.offer(new String[] { lo, null }); + } + drain(); + } + } + + void drain() { + if (wip.getAndIncrement() != 0) { + return; + } + + Queue q = queue; + + int missed = 1; + int consumed = produced; + String[] array = current; + int idx = index; + int emptyCount = empty; + + Subscriber a = actual; + + for (;;) { + long r = requested.get(); + long e = 0; + + while (e != r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + + boolean d = done; + + if (array == null) { + array = q.poll(); + if (array != null) { + current = array; + if (++consumed == limit) { + consumed = 0; + request(limit); + } + } + } + + boolean empty = array == null; + + if (d && empty) { + current = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return; + } + + if (empty) { + break; + } + + if (array.length == idx + 1) { + array = null; + current = null; + idx = 0; + continue; + } + + String v = array[idx]; + + if (v.isEmpty()) { + emptyCount++; + idx++; + } else { + while (emptyCount != 0 && e != r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + a.onNext(""); + e++; + emptyCount--; + } + + if (e != r && emptyCount == 0) { + a.onNext(v); + + e++; + idx++; + } + } + } + + if (e == r) { + if (cancelled) { + current = null; + q.clear(); + return; + } + + boolean d = done; + + if (array == null) { + array = q.poll(); + if (array != null) { + current = array; + if (++consumed == limit) { + consumed = 0; + request(limit); + } + } + } + + boolean empty = array == null; + + if (d && empty) { + current = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return; + } + } + + if (e != 0L) { + BackpressureUtils.produced(requested, e); + } + + empty = emptyCount; + produced = consumed; + missed = wip.addAndGet(-missed); + if (missed == 0) { + break; + } + } + } + + void cancel() { + cancelled = true; + unsubscribe(); + if (wip.getAndIncrement() == 0) { + current = null; + queue.clear(); + } + } + + boolean isCancelled() { + return isUnsubscribed(); + } + + final class Requested extends AtomicLong implements Producer, Subscription { + + private static final long serialVersionUID = 3399839515828647345L; + + @Override + public void request(long n) { + if (n > 0) { + BackpressureUtils.getAndAddRequest(this, n); + drain(); + } + else if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + } + + @Override + public boolean isUnsubscribed() { + return isCancelled(); + } + + @Override + public void unsubscribe() { + cancel(); + } + } + } +} diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index 0af7817..d938206 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -25,6 +25,7 @@ import rx.functions.Func2; import rx.internal.operators.OnSubscribeInputStream; import rx.internal.operators.OnSubscribeReader; +import rx.internal.util.RxRingBuffer; import java.io.Closeable; import java.io.IOException; @@ -398,64 +399,7 @@ public static Observable split(final Observable src, String rege * @return the Observable streaming the split values */ public static Observable split(final Observable src, final Pattern pattern) { - - return src.lift(new Operator() { - @Override - public Subscriber call(final Subscriber o) { - return new Subscriber(o) { - private String leftOver = null; - - @Override - public void onCompleted() { - if (leftOver!=null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onCompleted(); - } - - @Override - public void onError(Throwable e) { - if (leftOver!=null) - output(leftOver); - if (!o.isUnsubscribed()) - o.onError(e); - } - - @Override - public void onNext(String segment) { - if (leftOver != null) - segment = leftOver + segment; - String[] parts = pattern.split(segment, -1); - - for (int i = 0; i < parts.length - 1; i++) { - String part = parts[i]; - output(part); - } - leftOver = parts[parts.length - 1]; - } - - private int emptyPartCount = 0; - - /** - * when limit == 0 trailing empty parts are not emitted. - * - * @param part - */ - private void output(String part) { - if (part.isEmpty()) { - emptyPartCount++; - } - else { - for (; emptyPartCount > 0; emptyPartCount--) - if (!o.isUnsubscribed()) - o.onNext(""); - if (!o.isUnsubscribed()) - o.onNext(part); - } - } - }; - } - }); + return Observable.create(new ObservableSplit(src, pattern, RxRingBuffer.SIZE)); } /** diff --git a/src/test/java/rx/observables/ObservableSplitTest.java b/src/test/java/rx/observables/ObservableSplitTest.java new file mode 100644 index 0000000..5d7aa63 --- /dev/null +++ b/src/test/java/rx/observables/ObservableSplitTest.java @@ -0,0 +1,116 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.observables; + +import java.io.IOException; + +import org.junit.Test; + +import rx.Observable; +import rx.observers.TestSubscriber; + +public class ObservableSplitTest { + + void test(String[] input, String pattern, String[] output, boolean rebatch) { + TestSubscriber ts = new TestSubscriber(); + Observable o = StringObservable.split(Observable.from(input), pattern); + if (rebatch) { + o = o.rebatchRequests(1); + } + o.subscribe(ts); + + ts.assertValues(output); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void split1() { + test(new String[] {"ab", ":cd", "e:fgh" }, ":", new String[] { "ab", "cde", "fgh" }, false); + } + + @Test + public void split1Request1() { + test(new String[] {"ab", ":cd", "e:fgh" }, ":", new String[] { "ab", "cde", "fgh" }, true); + } + + @Test + public void split2() { + test(new String[] { "abcdefgh" }, ":", new String[] { "abcdefgh" }, false); + } + + @Test + public void split2Request1() { + test(new String[] { "abcdefgh" }, ":", new String[] { "abcdefgh" }, true); + } + + @Test + public void splitEmpty() { + test(new String[] { }, ":", new String[] { }, false); + } + + @Test + public void splitError() { + TestSubscriber ts = new TestSubscriber(); + Observable o = StringObservable.split(Observable.empty().concatWith(Observable.error(new IOException())), ":"); + o.subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(IOException.class); + } + + @Test + public void splitExample1() { + test(new String[] { "boo:and:foo" }, ":", new String[] { "boo", "and", "foo" }, false); + } + + @Test + public void splitExample1Request1() { + test(new String[] { "boo:and:foo" }, ":", new String[] { "boo", "and", "foo" }, true); + } + + @Test + public void splitExample2() { + test(new String[] { "boo:and:foo" }, "o", new String[] { "b", "", ":and:f" }, false); + } + + @Test + public void splitExample2Request1() { + test(new String[] { "boo:and:foo" }, "o", new String[] { "b", "", ":and:f" }, true); + } + + @Test + public void split3() { + test(new String[] { "abqw", "ercdqw", "eref" }, "qwer", new String[] { "ab", "cd", "ef" }, false); + } + + @Test + public void split3Buffer1Request1() { + test(new String[] { "abqw", "ercdqw", "eref" }, "qwer", new String[] { "ab", "cd", "ef" }, true); + } + + @Test + public void split4() { + test(new String[] { "ab", ":", "", "", "c:d", "", "e:" }, ":", new String[] { "ab", "c", "de" }, false); + } + + @Test + public void split4Request1() { + test(new String[] { "ab", ":", "", "", "c:d", "", "e:" }, ":", new String[] { "ab", "c", "de" }, true); + } + +} From 67de5c6e41936788da6f572cb200f1cfbe114584 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 12 Dec 2016 16:09:39 +0100 Subject: [PATCH 2/3] Uncomment RxJavaHooks call --- src/main/java/rx/observables/ObservableSplit.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/observables/ObservableSplit.java b/src/main/java/rx/observables/ObservableSplit.java index 3fea720..cfbce6c 100644 --- a/src/main/java/rx/observables/ObservableSplit.java +++ b/src/main/java/rx/observables/ObservableSplit.java @@ -26,6 +26,7 @@ import rx.internal.operators.BackpressureUtils; import rx.internal.util.atomic.SpscAtomicArrayQueue; import rx.internal.util.unsafe.*; +import rx.plugins.RxJavaHooks; /** * Split a sequence of strings based on a Rexexp pattern spanning subsequent @@ -132,7 +133,7 @@ public void onNext(String t) { @Override public void onError(Throwable e) { if (done) { -// RxJavaHooks.onError(e); RxJava 1.2+ required + RxJavaHooks.onError(e); return; } String lo = leftOver; From 47e0c487afed35cfff3f68285526e22bca63d04d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 12 Dec 2016 19:07:33 +0100 Subject: [PATCH 3/3] Move ObservableSplit into internals --- .../operators}/ObservableSplit.java | 6 ++-- .../java/rx/observables/StringObservable.java | 33 +++++-------------- 2 files changed, 12 insertions(+), 27 deletions(-) rename src/main/java/rx/{observables => internal/operators}/ObservableSplit.java (98%) diff --git a/src/main/java/rx/observables/ObservableSplit.java b/src/main/java/rx/internal/operators/ObservableSplit.java similarity index 98% rename from src/main/java/rx/observables/ObservableSplit.java rename to src/main/java/rx/internal/operators/ObservableSplit.java index cfbce6c..56a47c3 100644 --- a/src/main/java/rx/observables/ObservableSplit.java +++ b/src/main/java/rx/internal/operators/ObservableSplit.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package rx.observables; +package rx.internal.operators; import java.util.Queue; import java.util.concurrent.atomic.*; @@ -32,7 +32,7 @@ * Split a sequence of strings based on a Rexexp pattern spanning subsequent * items if necessary. */ -final class ObservableSplit implements OnSubscribe { +public final class ObservableSplit implements OnSubscribe { final Observable source; @@ -40,7 +40,7 @@ final class ObservableSplit implements OnSubscribe { final int bufferSize; - ObservableSplit(Observable source, Pattern pattern, int bufferSize) { + public ObservableSplit(Observable source, Pattern pattern, int bufferSize) { this.source = source; this.pattern = pattern; this.bufferSize = bufferSize; diff --git a/src/main/java/rx/observables/StringObservable.java b/src/main/java/rx/observables/StringObservable.java index d938206..f78ed8b 100644 --- a/src/main/java/rx/observables/StringObservable.java +++ b/src/main/java/rx/observables/StringObservable.java @@ -15,34 +15,19 @@ */ package rx.observables; -import rx.Observable; -import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.functions.Action1; -import rx.functions.Func0; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.internal.operators.OnSubscribeInputStream; -import rx.internal.operators.OnSubscribeReader; -import rx.internal.util.RxRingBuffer; - -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.nio.charset.CharsetEncoder; -import java.nio.charset.CoderResult; -import java.nio.charset.CodingErrorAction; +import java.io.*; +import java.nio.*; +import java.nio.charset.*; import java.util.Arrays; import java.util.concurrent.Callable; import java.util.regex.Pattern; +import rx.*; +import rx.Observable.Operator; +import rx.functions.*; +import rx.internal.operators.*; +import rx.internal.util.RxRingBuffer; + public class StringObservable { /** * Reads bytes from a source {@link InputStream} and outputs {@link Observable} of