diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index 81400fc6d7..fb9546fc1c 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -74,7 +74,7 @@ public String printBuffers() { public void init(TP transport) { this.transport=transport; - msg_processing_policy=transport.msgProcessingPolicy(); + msg_processing_policy=transport.getMsgProcessingPolicy(); msg_stats=transport.getMessageStats(); log=transport.getLog(); output=new ByteArrayDataOutputStream(max_size + MSG_OVERHEAD); diff --git a/src/org/jgroups/protocols/DAISYCHAIN.java b/src/org/jgroups/protocols/DAISYCHAIN.java index 7d0abdf302..16e4000484 100644 --- a/src/org/jgroups/protocols/DAISYCHAIN.java +++ b/src/org/jgroups/protocols/DAISYCHAIN.java @@ -76,7 +76,7 @@ public Object down(Message msg) { msg.setSrc(local_addr); if(log.isTraceEnabled()) log.trace("%s: looping back message %s", local_addr, msg); - transport.msgProcessingPolicy().loopback(msg, msg.isFlagSet(Message.Flag.OOB)); + transport.getMsgProcessingPolicy().loopback(msg, msg.isFlagSet(Message.Flag.OOB)); } // we need to copy the message, as we cannot do a msg.setSrc(next): the next retransmission diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index ff8bc1c2d4..f92eba5efd 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -96,7 +96,7 @@ public double avgBatchSize() { public void init(TP transport) { this.transport=Objects.requireNonNull(transport); - msg_processing_policy=transport.msgProcessingPolicy(); + msg_processing_policy=transport.getMsgProcessingPolicy(); msg_stats=transport.getMessageStats(); this.log=transport.getLog(); } diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 55ed5ef88e..71ac1c662b 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -207,6 +207,9 @@ public String getBundlerClass() { public long getTimeServiceInterval() {return time_service_interval;} public T setTimeServiceInterval(long t) {this.time_service_interval=t; return (T)this;} + public boolean isUseVirtualThreads() {return use_vthreads;} + public T useVirtualThreads(boolean b) {use_vthreads=b; return (T)this;} + public boolean logDiscardMsgs() {return log_discard_msgs;} public T logDiscardMsgs(boolean l) {this.log_discard_msgs=l; return (T)this;} @@ -249,10 +252,9 @@ public T setLevel(String level) { @ManagedOperation(description="Changes the message processing policy. The fully qualified name of a class " + "implementing MessageProcessingPolicy needs to be given") - public void setMessageProcessingPolicy(String policy) { + public T setMessageProcessingPolicy(String policy) { if(policy == null) - return; - + return (T)this; if(policy.startsWith("submit")) msg_processing_policy=new SubmitToThreadPool(); else if(policy.startsWith("max")) @@ -272,6 +274,7 @@ else if(policy.startsWith("unbatch")) catch(Exception e) { log.error("failed setting message_processing_policy", e); } + return (T)this; } public MessageProcessingPolicy getMessageProcessingPolicy() {return msg_processing_policy;} @@ -443,7 +446,10 @@ protected TP() { } public MsgStats getMessageStats() {return msg_stats;} - public MessageProcessingPolicy msgProcessingPolicy() {return msg_processing_policy;} + + public MessageProcessingPolicy getMsgProcessingPolicy() {return msg_processing_policy;} + public T msgProcessingPolicy(MessageProcessingPolicy p) {this.msg_processing_policy=p; return (T)this;} + public RTT getRTT() {return rtt;} @Override