Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StreamMessageListenerContainer should wait for cancellation to complete on shutdown #3040

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
* {@link Task} that invokes a {@link BiFunction read function} to poll on a Redis Stream.
*
* @author Mark Paluch
* @author JiHongKim98
* @see 2.2
*/
class StreamPollTask<K, V extends Record<K, ?>> implements Task {
Expand All @@ -54,6 +55,8 @@ class StreamPollTask<K, V extends Record<K, ?>> implements Task {
private final PollState pollState;
private final TypeDescriptor targetType;

private final CountDownLatch cancelLatch = new CountDownLatch(1);

private volatile boolean isInEventLoop = false;

StreamPollTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener, ErrorHandler errorHandler,
Expand Down Expand Up @@ -83,6 +86,12 @@ private static PollState createPollState(StreamReadRequest<?> streamRequest) {
@Override
public void cancel() throws DataAccessResourceFailureException {
this.pollState.cancel();

try {
cancelLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

@Override
Expand Down Expand Up @@ -112,6 +121,7 @@ public void run() {
doLoop();
} finally {
isInEventLoop = false;
cancelLatch.countDown();
}
}

Expand All @@ -129,11 +139,13 @@ private void doLoop() {

} catch (InterruptedException ex) {

cancelLatch.countDown();
cancel();
Thread.currentThread().interrupt();
} catch (RuntimeException ex) {

if (cancelSubscriptionOnError.test(ex)) {
cancelLatch.countDown();
cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,27 @@ void containerRestartShouldRestartSubscription() throws InterruptedException {
cancelAwait(subscription);
}

@Test // GH-2261
void containerShouldStopGracefully() throws InterruptedException {
StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer
.create(connectionFactory, containerOptions);

BlockingQueue<MapRecord<String, String, String>> queue = new LinkedBlockingQueue<>();
container.start();
Subscription subscription = container.receive(StreamOffset.create("my-stream", ReadOffset.from("0-0")), r -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
// ignore
}
queue.add(r);
});
redisTemplate.opsForStream().add("my-stream", Collections.singletonMap("key", "value1"));
subscription.await(DEFAULT_TIMEOUT);
container.stop();
assertThat(queue.poll(500, TimeUnit.MILLISECONDS)).isNotNull();
}

private static void cancelAwait(Subscription subscription) {

subscription.cancel();
Expand Down