From f2c78f3df15dac0e3f64903acc3aebb5f45c868c Mon Sep 17 00:00:00 2001 From: "Saai Syvendra (Github key)" Date: Thu, 24 Oct 2024 06:46:29 +0530 Subject: [PATCH 1/3] Replace use of deprecated DirectProcessor Signed-off-by: Saai Syvendra (Github key) --- .../mirror/grpc/listener/SharedTopicListener.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index db81aa93cfd..b126b45be12 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -22,9 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.Exceptions; -import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @@ -34,17 +33,17 @@ public abstract class SharedTopicListener implements TopicListener { protected final ListenerProperties listenerProperties; @Override - @SuppressWarnings("deprecation") public Flux listen(TopicMessageFilter filter) { - DirectProcessor overflowProcessor = DirectProcessor.create(); - FluxSink overflowSink = overflowProcessor.sink(); + Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); + Flux overflowProcessor = sink.asFlux(); // moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40% Flux topicMessageFlux = getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) .onBackpressureBuffer( - listenerProperties.getMaxBufferSize(), t -> overflowSink.error(Exceptions.failWithOverflow())) - .doFinally(s -> overflowSink.complete()); + listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow())) + .doFinally(s -> sink.tryEmitComplete()); + return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); } From 722cb6c370b80ea1836e77521b08bd239341bf54 Mon Sep 17 00:00:00 2001 From: "Saai Syvendra (Github key)" Date: Thu, 31 Oct 2024 09:33:47 +0530 Subject: [PATCH 2/3] Replace use of deprecated DirectProcessor Signed-off-by: Saai Syvendra (Github key) --- .../mirror/grpc/listener/SharedTopicListener.java | 11 +++-------- .../listener/AbstractSharedTopicListenerTest.java | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index b126b45be12..387e0ee5a72 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -35,16 +35,11 @@ public abstract class SharedTopicListener implements TopicListener { @Override public Flux listen(TopicMessageFilter filter) { Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - Flux overflowProcessor = sink.asFlux(); - // moving publishOn from after onBackpressureBuffer to after Flux.merge reduces CPU usage by up to 40% - Flux topicMessageFlux = getSharedListener(filter) + return getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) - .onBackpressureBuffer( - listenerProperties.getMaxBufferSize(), t -> sink.tryEmitError(Exceptions.failWithOverflow())) - .doFinally(s -> sink.tryEmitComplete()); - - return Flux.merge(listenerProperties.getPrefetch(), topicMessageFlux, overflowProcessor) + .onBackpressureBuffer(listenerProperties.getMaxBufferSize(), BufferOverflowStrategy.ERROR) + .doFinally(s -> sink.tryEmitComplete()) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); } diff --git a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java index 2f6823b94bb..499326f66fe 100644 --- a/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java +++ b/hedera-mirror-grpc/src/test/java/com/hedera/mirror/grpc/listener/AbstractSharedTopicListenerTest.java @@ -80,7 +80,7 @@ void slowSubscriberOverflowException() { .expectNext(1L, 2L) .thenAwait(Duration.ofMillis(500L)) // stall to overrun backpressure buffer .thenRequest(Long.MAX_VALUE) - .thenConsumeWhile(n -> n < numMessages) + .expectNextCount(maxBufferSize) .expectErrorMatches(Exceptions::isOverflow) .verify(Duration.ofMillis(1000L)); From aa3b9a2aaee2280fb789a9456e7c78acce35897c Mon Sep 17 00:00:00 2001 From: Saai Syvendra <157691467+saai-syvendra@users.noreply.github.com> Date: Thu, 31 Oct 2024 19:58:40 +0530 Subject: [PATCH 3/3] removed unused sink Signed-off-by: Saai Syvendra <157691467+saai-syvendra@users.noreply.github.com> --- .../com/hedera/mirror/grpc/listener/SharedTopicListener.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java index 387e0ee5a72..a1d8990743c 100644 --- a/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java +++ b/hedera-mirror-grpc/src/main/java/com/hedera/mirror/grpc/listener/SharedTopicListener.java @@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory; import reactor.core.Exceptions; import reactor.core.publisher.Flux; -import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; @RequiredArgsConstructor @@ -34,12 +33,9 @@ public abstract class SharedTopicListener implements TopicListener { @Override public Flux listen(TopicMessageFilter filter) { - Sinks.Many sink = Sinks.many().unicast().onBackpressureBuffer(); - return getSharedListener(filter) .doOnSubscribe(s -> log.info("Subscribing: {}", filter)) .onBackpressureBuffer(listenerProperties.getMaxBufferSize(), BufferOverflowStrategy.ERROR) - .doFinally(s -> sink.tryEmitComplete()) .publishOn(Schedulers.boundedElastic(), false, listenerProperties.getPrefetch()); }