Skip to content

Commit

Permalink
Run loops check the interrupted flag (#1215)
Browse files Browse the repository at this point in the history
* Run loops check the interrupted flag

* Run loops check the interrupted flag
  • Loading branch information
scottf authored Aug 21, 2024
1 parent 4cf29d4 commit 0e4f39c
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 28 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void shutdown() {

@Override
public void run() {
if (alive.get()) {
if (alive.get() && !Thread.interrupted()) {
long sinceLast = System.currentTimeMillis() - lastMsgReceived.get();
if (alive.get() && sinceLast > alarmPeriodSetting) {
handleHeartbeatError();
Expand Down
18 changes: 6 additions & 12 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -2178,20 +2178,17 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
// Wait for the timeout or the pending count to go to 0
executor.submit(() -> {
try {
Instant now = Instant.now();

while (timeout == null || timeout.equals(Duration.ZERO)
|| Duration.between(start, now).compareTo(timeout) < 0) {
long stop = (timeout == null || timeout.equals(Duration.ZERO))
? Long.MAX_VALUE
: System.nanoTime() + timeout.toNanos();
while (System.nanoTime() < stop && !Thread.interrupted())
{
consumers.removeIf(NatsConsumer::isDrained);

if (consumers.isEmpty()) {
break;
}

//noinspection BusyWait
Thread.sleep(1); // Sleep 1 milli

now = Instant.now();
}

// Stop publishing
Expand All @@ -2201,16 +2198,13 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws TimeoutExceptio
if (timeout == null || timeout.equals(Duration.ZERO)) {
this.flush(Duration.ZERO);
} else {
now = Instant.now();

Instant now = Instant.now();
Duration passed = Duration.between(start, now);
Duration newTimeout = timeout.minus(passed);

if (newTimeout.toNanos() > 0) {
this.flush(newTimeout);
}
}

this.close(false, false); // close the connection after the last flush
tracker.complete(consumers.isEmpty());
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void run() {
this.gotCR = false;
this.opPos = 0;

while (this.running.get()) {
while (running.get() && !Thread.interrupted()) {
this.bufferPosition = 0;
int bytesRead = dataPort.read(this.buffer, 0, this.buffer.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void run() {
dataPort = this.dataPortFuture.get(); // Will wait for the future to complete
StatisticsCollector stats = this.connection.getNatsStatistics();

while (this.running.get()) {
while (running.get() && !Thread.interrupted()) {
NatsMessage msg;
if (this.reconnectMode.get()) {
msg = this.reconnectOutgoing.accumulate(sendBufferLength.get(), Options.MAX_MESSAGES_IN_NETWORK_BUFFER, reconnectTimeout);
Expand Down
12 changes: 5 additions & 7 deletions src/main/java/io/nats/client/impl/NatsConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,15 @@ public CompletableFuture<Boolean> drain(Duration timeout) throws InterruptedExce
// draining
connection.getExecutor().submit(() -> {
try {
Instant now = Instant.now();

while (timeout == null || timeout.equals(Duration.ZERO)
|| Duration.between(start, now).compareTo(timeout) < 0) {
long stop = (timeout == null || timeout.equals(Duration.ZERO))
? Long.MAX_VALUE
: System.nanoTime() + timeout.toNanos();
while (System.nanoTime() < stop && !Thread.interrupted()) {
if (this.isDrained()) {
break;
}

//noinspection BusyWait
Thread.sleep(1); // Sleep 1 milli

now = Instant.now();
}

this.cleanUpAfterDrain();
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/io/nats/client/impl/NatsDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,8 @@ boolean breakRunLoop() {

public void run() {
try {
while (this.running.get()) { // start

while (running.get() && !Thread.interrupted()) {
NatsMessage msg = this.incoming.pop(this.waitForMessage);

if (msg != null) {
NatsSubscription sub = msg.getNatsSubscription();
if (sub != null && sub.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@ class NatsDispatcherWithExecutor extends NatsDispatcher {
@Override
public void run() {
try {
while (this.running.get()) { // start

while (running.get() && !Thread.interrupted()) {
NatsMessage msg = this.incoming.pop(this.waitForMessage);

if (msg != null) {
NatsSubscription sub = msg.getNatsSubscription();
if (sub != null && sub.isActive()) {
Expand Down

0 comments on commit 0e4f39c

Please sign in to comment.