Skip to content

Commit

Permalink
Simplified ViewHandler to avoid hanging (https://issues.redhat.com/br…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 5, 2023
1 parent 0e8cfd4 commit 1605e69
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 52 deletions.
1 change: 1 addition & 0 deletions src/org/jgroups/protocols/pbcast/GMS.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ public void stop() {
leaver.reset();
if(prev_members != null)
prev_members.clear();
view_handler.processing(false);
}


Expand Down
72 changes: 31 additions & 41 deletions src/org/jgroups/protocols/pbcast/ViewHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -27,10 +26,9 @@
public class ViewHandler<R> {
protected final Collection<R> requests=new ConcurrentLinkedQueue<>();
protected final Lock lock=new ReentrantLock();
protected final AtomicInteger count=new AtomicInteger(); // #threads adding to (and removing from) queue
protected final AtomicBoolean suspended=new AtomicBoolean(false);
@GuardedBy("lock")
protected boolean processing;
protected final AtomicBoolean processing=new AtomicBoolean(false);
protected final Condition processing_done=lock.newCondition();
protected final GMS gms;
protected Consumer<Collection<R>> req_processor;
Expand All @@ -53,6 +51,7 @@ public ViewHandler(GMS gms, Consumer<Collection<R>> req_processor, BiPredicate<R
}

public boolean suspended() {return suspended.get();}
public boolean processing() {return processing.get();}
public int size() {return requests.size();}
public ViewHandler<R> reqProcessor(Consumer<Collection<R>> p) {req_processor=p; return this;}
public Consumer<Collection<R>> reqProcessor() {return req_processor;}
Expand Down Expand Up @@ -102,7 +101,7 @@ public void resume() {
public void waitUntilComplete() {
lock.lock();
try {
while(processing || count.get() > 0) {
while(processing.get()) {
try {
processing_done.await();
}
Expand All @@ -125,7 +124,7 @@ public void waitUntilComplete(long timeout) {
long now=0;
lock.lock();
try {
while(processing || count.get() > 0) {
while(processing.get()) {
long delay=timeout-now;
if(delay <= 0)
break;
Expand All @@ -146,7 +145,9 @@ public void waitUntilComplete(long timeout) {
public <T extends ViewHandler<R>> T processing(boolean flag) {
lock.lock();
try {
setProcessing(flag);
processing.set(flag);
if(flag == false)
processing_done.signalAll();
return (T)this;
}
finally {
Expand All @@ -168,14 +169,6 @@ public String toString() {

protected Log log() {return gms.getLog();}

@GuardedBy("lock")
protected boolean setProcessing(boolean flag) {
boolean do_signal=processing && !flag;
processing=flag;
if(do_signal)
processing_done.signalAll();
return flag;
}

protected boolean _add(R req) {
if(req == null)
Expand All @@ -185,14 +178,13 @@ protected boolean _add(R req) {
return false;
}
String log=new Date() + ": " + req;
count.incrementAndGet();
lock.lock();
try {
if(!requests.contains(req)) { // non-null check already performed (above)
requests.add(req);
history.add(log);
}
return count.decrementAndGet() == 0 && !processing && setProcessing(true);
return processing.compareAndSet(false, true);
}
finally {
lock.unlock();
Expand All @@ -203,24 +195,7 @@ protected boolean _add(R req) {
protected boolean _add(R ... reqs) {
if(reqs == null || reqs.length == 0)
return false;
if(suspended.get()) {
log().trace("%s: queue is suspended; requests %s are discarded", gms.getAddress(), Arrays.toString(reqs));
return false;
}
count.incrementAndGet();
lock.lock();
try {
for(R req: reqs) {
if(req != null && !requests.contains(req)) {
requests.add(req);
history.add(new Date() + ": " + req);
}
}
return count.decrementAndGet() == 0 && !processing && setProcessing(true);
}
finally {
lock.unlock();
}
return _add(Arrays.asList(reqs));
}


Expand All @@ -232,7 +207,6 @@ protected boolean _add(Collection<R> reqs) {
return false;
}

count.incrementAndGet();
lock.lock();
try {
for(R req: reqs) {
Expand All @@ -241,7 +215,7 @@ protected boolean _add(Collection<R> reqs) {
history.add(new Date() + ": " + req);
}
}
return count.decrementAndGet() == 0 && !processing && setProcessing(true);
return processing.compareAndSet(false, true);
}
finally {
lock.unlock();
Expand All @@ -255,18 +229,34 @@ protected void process() {
Collection<R> reqs=null;
lock.lock();
try {
reqs=removeAndProcess(requests); // remove matching requests
reqs=remove(requests); // remove matching requests
}
finally {
lock.unlock();
}
if(reqs != null && !reqs.isEmpty())
req_processor.accept(reqs); // process outside of the lock scope
if(reqs != null && !reqs.isEmpty()) {
try {
req_processor.accept(reqs); // process outside of the lock scope
}
catch(Throwable t) {
log().error("%s: failed processsing requests: %s", gms.addr(), t);
lock.lock();
try {
if(processing.compareAndSet(true, false))
processing_done.signalAll();
throw t;
}
finally {
lock.unlock();
}
}
}
}
lock.lock();
try {
if(requests.isEmpty()) {
setProcessing(false);
if(processing.compareAndSet(true, false))
processing_done.signalAll();
return;
}
}
Expand All @@ -281,7 +271,7 @@ protected void process() {
* This method must catch all exceptions; or else process() might return without setting processing to true again!
*/
@GuardedBy("lock")
protected Collection<R> removeAndProcess(Collection<R> requests) {
protected Collection<R> remove(Collection<R> requests) {
Collection<R> removed=new ArrayList<>();
R first_req=null;
Iterator<R> it=requests.iterator();
Expand Down
37 changes: 26 additions & 11 deletions tests/junit-functional/org/jgroups/tests/ViewHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,7 @@ public void testWaitUntilComplete() throws Exception {
System.out.printf("Joined %d in %d ms\n", adder.getId(), time);
}
System.out.println("view_handler = " + view_handler);
view_handler.waitUntilComplete(10000);
// view_handler.waitUntilComplete();
view_handler.waitUntilComplete();
System.out.println("view_handler = " + view_handler);
assert view_handler.size() == 0;
}
Expand All @@ -319,7 +318,7 @@ public void testCoordLeave() {
ViewHandler<GmsImpl.Request> handler=new ViewHandler<>(gms, req_processor, GmsImpl.Request::canBeProcessedTogether);
handler.add(new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE),
new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE));

handler.waitUntilComplete();
assert result.get();
}

Expand All @@ -336,10 +335,28 @@ public void testCoordLeave2() {
new GmsImpl.Request(GmsImpl.Request.JOIN, b),
new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE),
new GmsImpl.Request(GmsImpl.Request.COORD_LEAVE));

handler.waitUntilComplete();
assert result.get();
}

/** Tests the case where ViewHandler.process() encounters an exception throw by the request processor. This must
* not prevent processing from being set to false when process() returns */
public void testProcessWithException() {
Consumer<Collection<Integer>> processor=l -> {
if(l.size() < 5)
System.out.printf("list: %s\n", l);
else throw new RuntimeException("boom");
};
view_handler.reqProcessor(processor).reqMatcher((a,b) -> true);
view_handler.add(Arrays.asList(1,2,3)); // OK
try {
view_handler.add(Arrays.asList(4, 5, 6, 7, 8, 9, 10)); // boom
}
catch(Throwable t) {
System.out.printf("received exception as expected: %s\n", t);
}
assert !view_handler.processing();
}

protected static void configureGMS(GMS gms) {
Address local_addr=Util.createRandomAddress("A");
Expand All @@ -359,11 +376,11 @@ protected static void set(GMS gms, String field, Object value) {
}

protected static class Adder extends Thread {
protected final int from, to;
protected final ViewHandler vh;
protected final CountDownLatch latch;
protected final int from, to;
protected final ViewHandler<Integer> vh;
protected final CountDownLatch latch;

public Adder(int from, int to, ViewHandler vh, CountDownLatch latch) {
public Adder(int from, int to, ViewHandler<Integer> vh, CountDownLatch latch) {
this.from=from;
this.to=to;
this.vh=vh;
Expand All @@ -378,11 +395,9 @@ public void run() {
}

int len=to-from+1;
Object[] numbers=new Integer[len];
Integer[] numbers=new Integer[len];
for(int i=0; i < numbers.length; i++)
numbers[i]=from+i;

// IntStream.rangeClosed(from, to).forEach(vh::add);
vh.add(numbers);
}
}
Expand Down

0 comments on commit 1605e69

Please sign in to comment.