diff --git a/src/org/jgroups/protocols/relay/RELAY2.java b/src/org/jgroups/protocols/relay/RELAY2.java
index f58d4ca719..dcb004aa11 100644
--- a/src/org/jgroups/protocols/relay/RELAY2.java
+++ b/src/org/jgroups/protocols/relay/RELAY2.java
@@ -27,7 +27,8 @@
import static org.jgroups.protocols.relay.Relay2Header.*;
// todo: check if copy is needed in route(), passUp() and deliver(); possibly pass a boolean as parameter (copy or not)
-
+// todo: use CompletableFutures in routeThen(); this could parallelize routing and delivery/passsing up
+// todo: check if a message can bypass RELAY2 completely when NO_RELAY is set (in up(),down())
/**
* Provides relaying of messages between autonomous sites.
* Design: ./doc/design/RELAY2.txt and at https://github.com/belaban/JGroups/blob/master/doc/design/RELAY2.txt.
@@ -41,7 +42,8 @@
@MBean(description="RELAY2 protocol")
public class RELAY2 extends Protocol {
// reserved flags
- public static final short can_become_site_master_flag = 1 << 1;
+ public static final short can_become_site_master_flag = 1 << 1;
+ protected static final String RELAY2_CL=RELAY2.class.getSimpleName();
/* ------------------------------------------ Properties ---------------------------------------------- */
@Property(description="Name of the site; must be defined in the configuration",writable=false)
@@ -430,6 +432,8 @@ public Object down(Event evt) {
}
public Object down(Message msg) {
+ //if(msg.isFlagSet(Flag.NO_RELAY))
+ // return down_prot.down(msg);
msg.src(local_addr);
return process(true, msg);
}
@@ -441,6 +445,8 @@ public Object up(Event evt) {
}
public Object up(Message msg) {
+ // if(msg.isFlagSet(Flag.NO_RELAY))
+ // return up_prot.up(msg);
Message copy=msg;
Relay2Header hdr=msg.getHeader(id);
if(hdr != null) {
@@ -458,6 +464,8 @@ public void up(MessageBatch batch) {
List unreachable_sites=null;
for(Iterator it=batch.iterator(); it.hasNext();) {
Message msg=it.next(), copy=msg;
+ // if(msg.isFlagSet(Flag.NO_RELAY))
+ // continue;
Relay2Header hdr=msg.getHeader(id);
it.remove();
if(hdr != null) {
@@ -546,6 +554,9 @@ public void handleView(View view) {
topo().adjust(this.site, view.getMembers());
}
+ public String toString() {
+ return String.format("%s%s", RELAY2_CL, local_addr != null? String.format(" (%s)", local_addr) : "");
+ }
/** Called to handle a message received from a different site (via a bridge channel) */
protected void handleRelayMessage(Relay2Header hdr, Message msg) {
@@ -605,21 +616,22 @@ protected Object process(boolean down, Message msg) {
switch(type) {
case ALL:
if(down)
- return routeThen(msg,null,() -> deliver(null, msg, true));
- return routeThen(msg,null,() -> passUp(msg));
+ return routeThen(msg, null,() -> deliver(null, msg, true));
+ return routeThen(msg, null, () -> passUp(msg));
case SM_ALL:
- return routeThen(msg,null,() -> passUp(msg));
+ return routeThen(msg, null, () -> passUp(msg));
case SM:
if(sameSite(dst))
return passUp(msg);
return route(msg, Arrays.asList(dst.getSite()));
case UNICAST:
if(sameSite(dst)) {
- if(down)
- return deliver(dst, msg, false);
if(local_addr.equals(dst))
return passUp(msg);
- return deliver(dst, msg, false);
+ if(down)
+ return deliver(dst, msg, false);
+ String s=String.format("a unicast requires dest (%s) == local_addr (%s)", dst, local_addr);
+ throw new IllegalStateException(s);
}
else
return route(msg, Arrays.asList(dst.getSite()));
@@ -638,7 +650,7 @@ protected Object process(boolean down, Message msg) {
throw new IllegalStateException(String.format("non site master received a sg with dst %s",dst));
case UNICAST:
if(down) {
- if(sameSite(dst))
+ if(sameSite(dst)) // todo: if same address -> passUp()
return deliver(dst, msg, false);
return sendToLocalSiteMaster(local_addr, msg);
}
diff --git a/src/org/jgroups/util/MyReceiver.java b/src/org/jgroups/util/MyReceiver.java
index 6c1e5735f6..c7701d90a4 100644
--- a/src/org/jgroups/util/MyReceiver.java
+++ b/src/org/jgroups/util/MyReceiver.java
@@ -25,9 +25,8 @@ public class MyReceiver implements Receiver, Closeable {
public void receive(Message msg) {
T obj=raw_msgs? (T)msg : (T)msg.getObject();
list.add(obj);
- if(verbose) {
+ if(verbose)
System.out.println((name() != null? name() + ":" : "") + " received message from " + msg.getSrc() + ": " + obj);
- }
}
@Override
diff --git a/tests/junit-functional/org/jgroups/tests/Relay2Test.java b/tests/junit-functional/org/jgroups/tests/Relay2Test.java
index e965913f0b..4c340f306a 100644
--- a/tests/junit-functional/org/jgroups/tests/Relay2Test.java
+++ b/tests/junit-functional/org/jgroups/tests/Relay2Test.java
@@ -13,12 +13,15 @@
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.jgroups.tests.RelayTests.Data.Type.REQ;
+
/**
* Various RELAY2-related tests
* @author Bela Ban
@@ -129,26 +132,6 @@ public void testMissingRouteAfterMerge() throws Exception {
}
- /**
- * Tests whether the bridge channel connects and disconnects ok.
- */
- public void testConnectAndReconnectOfBridgeStack() throws Exception {
- a=new JChannel(defaultStack()).setName("A");
- b=new JChannel(defaultStack()).setName("B");
-
- a.connect(BRIDGE_CLUSTER);
- b.connect(BRIDGE_CLUSTER);
- Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b);
-
- b.disconnect();
- Util.waitUntilAllChannelsHaveSameView(10000, 500, a);
-
- b.connect(BRIDGE_CLUSTER);
- Util.waitUntilAllChannelsHaveSameView(10000, 500, a, b);
- }
-
-
-
/**
* Tests sites LON and SFO, with SFO disconnecting (bridge view on LON should be 1) and reconnecting (bridge view on
* LON and SFO should be 2)
@@ -216,12 +199,12 @@ public void testUnknownAndUpStateTransitions() throws Exception {
System.out.println("Disconnecting X");
x.disconnect();
- System.out.println("A: waiting for site SFO to be UNKNOWN");
+ System.out.println("A: waiting for site SFO to be DOWN");
waitUntilRoute(SFO, false, 20000, 500, a);
- System.out.println("Reconnecting X, waiting for 5 seconds to see if the route is marked as DOWN");
+ System.out.println("Reconnecting X");
x.connect(SFO);
- Util.sleep(5000);
+ waitUntilRoute(SFO, true, 5000, 100, a);
Route route=getRoute(a, SFO);
assert route != null : "route is " + route + " (expected to be UP)";
@@ -277,32 +260,6 @@ public void testSiteUnreachableMessageBreaksSiteUUID() throws Exception {
: "Expecting 100 site unreachable events on node A but got " + h2.getSiteUnreachableEvents();
}
- protected class MyUphandler implements UpHandler {
- protected final BlockingQueue received=new LinkedBlockingDeque<>();
- protected final AtomicInteger siteUnreachableEvents=new AtomicInteger(0);
-
- public BlockingQueue getReceived() {return received;}
- public int getSiteUnreachableEvents() {return siteUnreachableEvents.get();}
-
- @Override public UpHandler setLocalAddress(Address a) {return this;}
-
- @Override
- public Object up(Event evt) {
- if(evt.getType() == Event.SITE_UNREACHABLE) {
- log.debug("Site %s is unreachable", (Object) evt.getArg());
- siteUnreachableEvents.incrementAndGet();
- }
- return null;
- }
-
- @Override
- public Object up(Message msg) {
- log.debug("Received %s from %s\n", new String(msg.getArray(), StandardCharsets.UTF_8), msg.getSrc());
- received.add(msg);
- return null;
- }
- }
-
/**
@@ -311,8 +268,8 @@ public Object up(Message msg) {
* despite using multiple site masters. JIRA: https://issues.redhat.com/browse/JGRP-2112
*/
public void testSenderOrderWithMultipleSiteMasters() throws Exception {
- MyReceiver