Skip to content

Commit

Permalink
- Increased initial sizes and increment of FastArray
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Oct 11, 2024
1 parent ded5ba3 commit fcb8ce7
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 5 deletions.
12 changes: 10 additions & 2 deletions src/org/jgroups/protocols/BaseBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.jgroups.View;
import org.jgroups.annotations.GuardedBy;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.ManagedOperation;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.logging.Log;
Expand All @@ -18,6 +19,7 @@
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK;
Expand All @@ -34,7 +36,7 @@
public abstract class BaseBundler implements Bundler {
/** Keys are destinations, values are lists of Messages */
protected final Map<Address,List<Message>> msgs=new HashMap<>(24);
protected final Function<Address,List<Message>> FUNC=k -> new FastArray<>(16);
protected final Function<Address,List<Message>> FUNC=k -> new FastArray<Message>(32).increment(64);
protected TP transport;
protected MessageProcessingPolicy msg_processing_policy;
protected final ReentrantLock lock=new ReentrantLock();
Expand All @@ -58,6 +60,12 @@ public abstract class BaseBundler implements Bundler {
@ManagedAttribute(description="Time (us) to send the bundled messages")
protected final AverageMinMax avg_send_time=new AverageMinMax().unit(NANOSECONDS);

@ManagedOperation(description="Prints the capacity of the buffers")
public String printBuffers() {
return msgs.entrySet().stream()
.map(e -> String.format("%s: %d", e.getKey(), ((FastArray<Message>)e.getValue()).capacity()))
.collect(Collectors.joining("\n"));
}

public int getCapacity() {return capacity;}
public Bundler setCapacity(int c) {this.capacity=c; return this;}
Expand Down Expand Up @@ -225,7 +233,7 @@ protected void sendSingleMessage(final Message msg, ByteArrayDataOutputStream ou
}
}

protected void sendMessageList(final Address dest, final Address src, final List<Message> list, ByteArrayDataOutputStream out) {
protected void sendMessageList(Address dest, Address src, List<Message> list, ByteArrayDataOutputStream out) {
try {
Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, out, dest == null);
transport.doSend(out.buffer(), 0, out.position(), dest);
Expand Down
5 changes: 4 additions & 1 deletion src/org/jgroups/protocols/TransferQueueBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*/
public class TransferQueueBundler extends BaseBundler implements Runnable {
protected BlockingQueue<Message> queue;
protected final List<Message> remove_queue=new FastArray<>(16);
protected final List<Message> remove_queue=new FastArray<Message>(128).increment(128);
protected volatile Thread bundler_thread;

@Property(description="When the queue is full, senders will drop a message rather than wait until space " +
Expand Down Expand Up @@ -49,6 +49,9 @@ public TransferQueueBundler() {
@ManagedAttribute(description="Size of the remove-queue")
public int removeQueueSize() {return remove_queue.size();}

@ManagedAttribute(description="Capacity of the remove-queue")
public int removeQueueCapacity() {return ((FastArray<Message>)remove_queue).capacity();}

public boolean dropWhenFull() {return drop_when_full;}
public <T extends Bundler> T dropWhenFull(boolean d) {this.drop_when_full=d; return (T)this;}

Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/util/MaxOneThreadPerSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,9 @@ protected Entry(Address sender, boolean mcast, AsciiString cluster_name) {
this.mcast=mcast;
this.sender=sender;
this.cluster_name=cluster_name;
int cap=max_buffer_size > 0? max_buffer_size : 512; // initial capacity
int cap=max_buffer_size > 0? max_buffer_size : 128; // initial capacity
batch=new MessageBatch(cap).dest(tp.getAddress()).sender(sender).clusterName(cluster_name).multicast(mcast);
batch.array().increment(512);
batch.array().increment(128);
}


Expand Down

0 comments on commit fcb8ce7

Please sign in to comment.