diff --git a/src/org/jgroups/protocols/pbcast/GMS.java b/src/org/jgroups/protocols/pbcast/GMS.java index 27240934c5..f57f78b2bb 100644 --- a/src/org/jgroups/protocols/pbcast/GMS.java +++ b/src/org/jgroups/protocols/pbcast/GMS.java @@ -360,6 +360,7 @@ public void stop() { leaver.reset(); if(prev_members != null) prev_members.clear(); + view_handler.processing(false); } diff --git a/src/org/jgroups/protocols/pbcast/ViewHandler.java b/src/org/jgroups/protocols/pbcast/ViewHandler.java index 7bb95f6692..86047b226b 100644 --- a/src/org/jgroups/protocols/pbcast/ViewHandler.java +++ b/src/org/jgroups/protocols/pbcast/ViewHandler.java @@ -9,7 +9,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,10 +26,9 @@ public class ViewHandler { protected final Collection requests=new ConcurrentLinkedQueue<>(); protected final Lock lock=new ReentrantLock(); - protected final AtomicInteger count=new AtomicInteger(); // #threads adding to (and removing from) queue protected final AtomicBoolean suspended=new AtomicBoolean(false); @GuardedBy("lock") - protected boolean processing; + protected final AtomicBoolean processing=new AtomicBoolean(false); protected final Condition processing_done=lock.newCondition(); protected final GMS gms; protected Consumer> req_processor; @@ -53,6 +51,7 @@ public ViewHandler(GMS gms, Consumer> req_processor, BiPredicate reqProcessor(Consumer> p) {req_processor=p; return this;} public Consumer> reqProcessor() {return req_processor;} @@ -102,7 +101,7 @@ public void resume() { public void waitUntilComplete() { lock.lock(); try { - while(processing || count.get() > 0) { + while(processing.get()) { try { processing_done.await(); } @@ -125,7 +124,7 @@ public void waitUntilComplete(long timeout) { long now=0; lock.lock(); try { - while(processing || count.get() > 0) { + while(processing.get()) { long delay=timeout-now; if(delay <= 0) break; @@ -146,7 +145,9 @@ public void waitUntilComplete(long timeout) { public > T processing(boolean flag) { lock.lock(); try { - setProcessing(flag); + processing.set(flag); + if(flag == false) + processing_done.signalAll(); return (T)this; } finally { @@ -168,14 +169,6 @@ public String toString() { protected Log log() {return gms.getLog();} - @GuardedBy("lock") - protected boolean setProcessing(boolean flag) { - boolean do_signal=processing && !flag; - processing=flag; - if(do_signal) - processing_done.signalAll(); - return flag; - } protected boolean _add(R req) { if(req == null) @@ -185,14 +178,13 @@ protected boolean _add(R req) { return false; } String log=new Date() + ": " + req; - count.incrementAndGet(); lock.lock(); try { if(!requests.contains(req)) { // non-null check already performed (above) requests.add(req); history.add(log); } - return count.decrementAndGet() == 0 && !processing && setProcessing(true); + return processing.compareAndSet(false, true); } finally { lock.unlock(); @@ -203,24 +195,7 @@ protected boolean _add(R req) { protected boolean _add(R ... reqs) { if(reqs == null || reqs.length == 0) return false; - if(suspended.get()) { - log().trace("%s: queue is suspended; requests %s are discarded", gms.getAddress(), Arrays.toString(reqs)); - return false; - } - count.incrementAndGet(); - lock.lock(); - try { - for(R req: reqs) { - if(req != null && !requests.contains(req)) { - requests.add(req); - history.add(new Date() + ": " + req); - } - } - return count.decrementAndGet() == 0 && !processing && setProcessing(true); - } - finally { - lock.unlock(); - } + return _add(Arrays.asList(reqs)); } @@ -232,7 +207,6 @@ protected boolean _add(Collection reqs) { return false; } - count.incrementAndGet(); lock.lock(); try { for(R req: reqs) { @@ -241,7 +215,7 @@ protected boolean _add(Collection reqs) { history.add(new Date() + ": " + req); } } - return count.decrementAndGet() == 0 && !processing && setProcessing(true); + return processing.compareAndSet(false, true); } finally { lock.unlock(); @@ -255,18 +229,34 @@ protected void process() { Collection reqs=null; lock.lock(); try { - reqs=removeAndProcess(requests); // remove matching requests + reqs=remove(requests); // remove matching requests } finally { lock.unlock(); } - if(reqs != null && !reqs.isEmpty()) - req_processor.accept(reqs); // process outside of the lock scope + if(reqs != null && !reqs.isEmpty()) { + try { + req_processor.accept(reqs); // process outside of the lock scope + } + catch(Throwable t) { + log().error("%s: failed processsing requests: %s", gms.addr(), t); + lock.lock(); + try { + if(processing.compareAndSet(true, false)) + processing_done.signalAll(); + throw t; + } + finally { + lock.unlock(); + } + } + } } lock.lock(); try { if(requests.isEmpty()) { - setProcessing(false); + if(processing.compareAndSet(true, false)) + processing_done.signalAll(); return; } } @@ -281,7 +271,7 @@ protected void process() { * This method must catch all exceptions; or else process() might return without setting processing to true again! */ @GuardedBy("lock") - protected Collection removeAndProcess(Collection requests) { + protected Collection remove(Collection requests) { Collection removed=new ArrayList<>(); R first_req=null; Iterator it=requests.iterator(); diff --git a/tests/junit-functional/org/jgroups/tests/ViewHandlerTest.java b/tests/junit-functional/org/jgroups/tests/ViewHandlerTest.java index 90068134c2..0cbee38cd0 100644 --- a/tests/junit-functional/org/jgroups/tests/ViewHandlerTest.java +++ b/tests/junit-functional/org/jgroups/tests/ViewHandlerTest.java @@ -303,8 +303,7 @@ public void testWaitUntilComplete() throws Exception { System.out.printf("Joined %d in %d ms\n", adder.getId(), time); } System.out.println("view_handler = " + view_handler); - view_handler.waitUntilComplete(10000); - // view_handler.waitUntilComplete(); + view_handler.waitUntilComplete(); System.out.println("view_handler = " + view_handler); assert view_handler.size() == 0; } @@ -319,7 +318,7 @@ public void testCoordLeave() { ViewHandler handler=new ViewHandler<>(gms, req_processor, GmsImpl.Request::canBeProcessedTogether); handler.add(new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE), new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE)); - + handler.waitUntilComplete(); assert result.get(); } @@ -336,10 +335,28 @@ public void testCoordLeave2() { new GmsImpl.Request(GmsImpl.Request.JOIN, b), new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE), new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE)); - + handler.waitUntilComplete(); assert result.get(); } + /** Tests the case where ViewHandler.process() encounters an exception throw by the request processor. This must + * not prevent processing from being set to false when process() returns */ + public void testProcessWithException() { + Consumer> processor=l -> { + if(l.size() < 5) + System.out.printf("list: %s\n", l); + else throw new RuntimeException("boom"); + }; + view_handler.reqProcessor(processor).reqMatcher((a,b) -> true); + view_handler.add(Arrays.asList(1,2,3)); // OK + try { + view_handler.add(Arrays.asList(4, 5, 6, 7, 8, 9, 10)); // boom + } + catch(Throwable t) { + System.out.printf("received exception as expected: %s\n", t); + } + assert !view_handler.processing(); + } protected static void configureGMS(GMS gms) { Address local_addr=Util.createRandomAddress("A"); @@ -359,11 +376,11 @@ protected static void set(GMS gms, String field, Object value) { } protected static class Adder extends Thread { - protected final int from, to; - protected final ViewHandler vh; - protected final CountDownLatch latch; + protected final int from, to; + protected final ViewHandler vh; + protected final CountDownLatch latch; - public Adder(int from, int to, ViewHandler vh, CountDownLatch latch) { + public Adder(int from, int to, ViewHandler vh, CountDownLatch latch) { this.from=from; this.to=to; this.vh=vh; @@ -378,11 +395,9 @@ public void run() { } int len=to-from+1; - Object[] numbers=new Integer[len]; + Integer[] numbers=new Integer[len]; for(int i=0; i < numbers.length; i++) numbers[i]=from+i; - - // IntStream.rangeClosed(from, to).forEach(vh::add); vh.add(numbers); } }