Skip to content

Commit

Permalink
Implemented batch caching in NAKACK2 and UNICAST3 (https://issues.red…
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 16, 2024
1 parent cbb8897 commit 9cd0b3e
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 11 deletions.
37 changes: 31 additions & 6 deletions src/org/jgroups/protocols/UNICAST3.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.jgroups.Message.Flag.*;
Expand Down Expand Up @@ -94,6 +95,10 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {
@Property(description="The max size of a message batch when delivering messages. 0 is unbounded")
protected int max_batch_size;

@Property(description="Reuses the same message batch for delivery of regular messages (only done by a single " +
"thread anyway). Not advisable for buffers that can grow infinitely (NAKACK3)")
protected boolean reuse_message_batches=true;

@Property(description="If true, a unicast message to self is looped back up on the same thread. Note that this may " +
"cause problems (e.g. deadlocks) in some applications, so make sure that your code can handle this. " +
"Issue: https://issues.redhat.com/browse/JGRP-2547")
Expand Down Expand Up @@ -138,6 +143,8 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler<Address> {

protected final Map<Address,SenderEntry> send_table=Util.createConcurrentMap();
protected final Map<Address,ReceiverEntry> recv_table=Util.createConcurrentMap();
/** To cache batches for sending messages up the stack (https://issues.redhat.com/browse/JGRP-2841) */
protected final Map<Address,MessageBatch> cached_batches=Util.createConcurrentMap();

protected final ReentrantLock recv_table_lock=new ReentrantLock();

Expand Down Expand Up @@ -242,6 +249,8 @@ public <T extends Protocol> T setLevel(String level) {
public UNICAST3 setSyncMinInterval(long s) {this.sync_min_interval=s; return this;}
public int getMaxXmitReqSize() {return max_xmit_req_size;}
public UNICAST3 setMaxXmitReqSize(int m) {this.max_xmit_req_size=m; return this;}
public boolean reuseMessageBatches() {return reuse_message_batches;}
public UNICAST3 reuseMessageBatches(boolean b) {this.reuse_message_batches=b; return this;}
public boolean sendsCanBlock() {return sends_can_block;}
public UNICAST3 sendsCanBlock(boolean s) {this.sends_can_block=s; return this;}
public boolean loopback() {return loopback;}
Expand All @@ -267,6 +276,18 @@ public String printConnections() {
return sb.toString();
}

@ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
public String printCachedBatches() {
return "\n" + cached_batches.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue()))
.collect(Collectors.joining("\n"));
}

@ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
public UNICAST3 clearCachedBatches() {
cached_batches.clear();
return this;
}

/** Don't remove! https://issues.redhat.com/browse/JGRP-2814 */
@ManagedAttribute(type=SCALAR) @Deprecated
public long getNumMessagesSent() {return num_msgs_sent.sum();}
Expand Down Expand Up @@ -617,7 +638,7 @@ protected void handleBatchFromSelf(MessageBatch batch, Entry entry) {
}
deliverBatch(oob_batch);
}
removeAndDeliver(win, batch.sender(), batch.capacity());
removeAndDeliver(win, batch.sender(), batch.clusterName(), batch.capacity());
}
if(!batch.isEmpty())
up_prot.up(batch);
Expand Down Expand Up @@ -860,7 +881,7 @@ protected void handleDataReceived(final Address sender, long seqno, short conn_i
addQueuedMessages(sender, entry, queued_msgs);
}
addMessage(entry, sender, seqno, msg);
removeAndDeliver(entry.msgs, sender, 1);
removeAndDeliver(entry.msgs, sender, null, 1);
}

protected void addMessage(ReceiverEntry entry, Address sender, long seqno, Message msg) {
Expand Down Expand Up @@ -911,7 +932,7 @@ protected void handleDataReceivedFromSelf(final Address sender, long seqno, Mess
if(msg != null && msg.isFlagSet(OOB) && msg.setFlagIfAbsent(Message.TransientFlag.OOB_DELIVERED))
deliverMessage(msg, sender, seqno);
}
removeAndDeliver(win, sender, 1); // there might be more messages to deliver
removeAndDeliver(win, sender, null, 1); // there might be more messages to deliver
}


Expand Down Expand Up @@ -940,7 +961,7 @@ protected void handleBatchReceived(final ReceiverEntry entry, Address sender, Li

deliverBatch(oob_batch);
}
removeAndDeliver(win, sender, msgs.size());
removeAndDeliver(win, sender, null, msgs.size());
}


Expand All @@ -953,13 +974,17 @@ protected void handleBatchReceived(final ReceiverEntry entry, Address sender, Li
* delivery of P1, Q1, Q2, P2: FIFO (implemented by UNICAST) says messages need to be delivered in the
* order in which they were sent
*/
protected void removeAndDeliver(Table<Message> win, Address sender, int min_size) {
protected void removeAndDeliver(Table<Message> win, Address sender, AsciiString cluster, int min_size) {
AtomicInteger adders=win.getAdders();
if(adders.getAndIncrement() != 0)
return;

AsciiString cl=cluster != null? cluster : getTransport().getClusterNameAscii();
int cap=Math.max(Math.max(Math.max(win.size(), max_batch_size), min_size), 2048);
MessageBatch batch=new MessageBatch(cap).dest(local_addr).sender(sender).multicast(false);
MessageBatch batch=reuse_message_batches && cl != null?
cached_batches.computeIfAbsent(sender, __ -> new MessageBatch(cap).dest(local_addr).sender(sender).cluster(cl).mcast(true))
: new MessageBatch(cap).dest(local_addr).sender(sender).cluster(cl).multicast(true);
batch.array().increment(1024);
Supplier<MessageBatch> batch_creator=() -> batch;
MessageBatch mb=null;
do {
Expand Down
34 changes: 31 additions & 3 deletions src/org/jgroups/protocols/pbcast/NAKACK2.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package org.jgroups.protocols.pbcast;

import org.jgroups.*;
import org.jgroups.annotations.*;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.protocols.RED;
import org.jgroups.protocols.TCP;
Expand All @@ -20,6 +23,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.jgroups.Message.Flag.NO_FC;
import static org.jgroups.Message.Flag.OOB;
Expand Down Expand Up @@ -112,6 +116,10 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
@Property(description="The max size of a message batch when delivering messages. 0 is unbounded")
protected int max_batch_size;

@Property(description="Reuses the same message batch for delivery of regular messages (only done by a single " +
"thread anyway). Not advisable for buffers that can grow infinitely (NAKACK3)")
protected boolean reuse_message_batches=true;

@Property(description="If enabled, multicasts the highest sent seqno every xmit_interval ms. This is skipped if " +
"a regular message has been multicast, and the task aquiesces if the highest sent seqno hasn't changed for " +
"resend_last_seqno_max_times times. Used to speed up retransmission of dropped last messages (JGRP-1904)")
Expand All @@ -123,6 +131,8 @@ public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
@ManagedAttribute(description="True if sending a message can block at the transport level")
protected boolean sends_can_block=true;

/** To cache batches for sending messages up the stack (https://issues.redhat.com/browse/JGRP-2841) */
protected final Map<Address,MessageBatch> cached_batches=Util.createConcurrentMap();
/* -------------------------------------------------- JMX ---------------------------------------------------------- */


Expand Down Expand Up @@ -286,6 +296,9 @@ public void setResendLastSeqno(boolean flag) {
public int getMaxXmitReqSize() {return max_xmit_req_size;}
public NAKACK2 setMaxXmitReqSize(int m) {this.max_xmit_req_size=m; return this;}

public boolean reuseMessageBatches() {return reuse_message_batches;}
public NAKACK2 reuseMessageBatches(boolean b) {this.reuse_message_batches=b; return this;}

public boolean sendsCanBlock() {return sends_can_block;}
public NAKACK2 sendsCanBlock(boolean s) {this.sends_can_block=s; return this;}

Expand Down Expand Up @@ -402,6 +415,18 @@ public String printMessages() {
return ret.toString();
}

@ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
public String printCachedBatches() {
return "\n" + cached_batches.entrySet().stream().map(e -> String.format("%s: %s", e.getKey(), e.getValue()))
.collect(Collectors.joining("\n"));
}

@ManagedOperation(description="Prints the cached batches (if reuse_message_batches is true)")
public NAKACK2 clearCachedBatches() {
cached_batches.clear();
return this;
}

@ManagedAttribute public long getCurrentSeqno() {return seqno.get();}

@ManagedOperation(description="Prints the stability messages received")
Expand Down Expand Up @@ -884,14 +909,17 @@ protected void handleMessageBatch(MessageBatch mb) {
* we return immediately and let the existing thread process our message (https://issues.redhat.com/browse/JGRP-829).
* Benefit: fewer threads blocked on the same lock, these threads can be returned to the thread pool
*/
protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name,
protected void removeAndDeliver(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster,
int min_size) {
AtomicInteger adders=buf.getAdders();
if(adders.getAndIncrement() != 0)
return;
boolean remove_msgs=discard_delivered_msgs && !loopback;
AsciiString cl=cluster != null? cluster : getTransport().getClusterNameAscii();
int cap=Math.max(Math.max(Math.max(buf.size(), max_batch_size), min_size), 2048);
MessageBatch batch=new MessageBatch(cap).dest(null).sender(sender).clusterName(cluster_name).multicast(true);
MessageBatch batch=reuse_message_batches && cl != null?
cached_batches.computeIfAbsent(sender, __ -> new MessageBatch(cap).dest(null).sender(sender).cluster(cl).mcast(true))
: new MessageBatch(cap).dest(null).sender(sender).cluster(cl).multicast(true);
batch.array().increment(1024);
Supplier<MessageBatch> batch_creator=() -> batch;
MessageBatch mb=null;
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/util/MaxOneThreadPerSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ protected void setRunning(boolean flag) {

// unsynchronized on batch but who cares
public String toString() {
return String.format("batch size=%d queued msgs=%d queued batches=%d submitted msgs=%d submitted batches=%d",
batch.size(), queued_msgs, queued_batches, submitted_msgs, submitted_batches);
return String.format("batch size=%,d cap=%,d queued msgs=%,d queued batches=%,d submitted msgs=%,d submitted batches=%,d",
batch.size(), batch.capacity(), queued_msgs, queued_batches, submitted_msgs, submitted_batches);
}
}

Expand Down

0 comments on commit 9cd0b3e

Please sign in to comment.