From fcb8ce7850206604a9e8516d01e7f8f5c07d744f Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Fri, 11 Oct 2024 09:58:32 +0200 Subject: [PATCH] - Increased initial sizes and increment of FastArray --- src/org/jgroups/protocols/BaseBundler.java | 12 ++++++++++-- src/org/jgroups/protocols/TransferQueueBundler.java | 5 ++++- src/org/jgroups/util/MaxOneThreadPerSender.java | 4 ++-- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index d850c814ba..81400fc6d7 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -6,6 +6,7 @@ import org.jgroups.View; import org.jgroups.annotations.GuardedBy; import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.ManagedOperation; import org.jgroups.annotations.Property; import org.jgroups.conf.AttributeType; import org.jgroups.logging.Log; @@ -18,6 +19,7 @@ import java.util.Objects; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK; @@ -34,7 +36,7 @@ public abstract class BaseBundler implements Bundler { /** Keys are destinations, values are lists of Messages */ protected final Map> msgs=new HashMap<>(24); - protected final Function> FUNC=k -> new FastArray<>(16); + protected final Function> FUNC=k -> new FastArray(32).increment(64); protected TP transport; protected MessageProcessingPolicy msg_processing_policy; protected final ReentrantLock lock=new ReentrantLock(); @@ -58,6 +60,12 @@ public abstract class BaseBundler implements Bundler { @ManagedAttribute(description="Time (us) to send the bundled messages") protected final AverageMinMax avg_send_time=new AverageMinMax().unit(NANOSECONDS); + @ManagedOperation(description="Prints the capacity of the buffers") + public String printBuffers() { + return msgs.entrySet().stream() + .map(e -> String.format("%s: %d", e.getKey(), ((FastArray)e.getValue()).capacity())) + .collect(Collectors.joining("\n")); + } public int getCapacity() {return capacity;} public Bundler setCapacity(int c) {this.capacity=c; return this;} @@ -225,7 +233,7 @@ protected void sendSingleMessage(final Message msg, ByteArrayDataOutputStream ou } } - protected void sendMessageList(final Address dest, final Address src, final List list, ByteArrayDataOutputStream out) { + protected void sendMessageList(Address dest, Address src, List list, ByteArrayDataOutputStream out) { try { Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, out, dest == null); transport.doSend(out.buffer(), 0, out.position(), dest); diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index 08c837817b..62dc227cde 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -19,7 +19,7 @@ */ public class TransferQueueBundler extends BaseBundler implements Runnable { protected BlockingQueue queue; - protected final List remove_queue=new FastArray<>(16); + protected final List remove_queue=new FastArray(128).increment(128); protected volatile Thread bundler_thread; @Property(description="When the queue is full, senders will drop a message rather than wait until space " + @@ -49,6 +49,9 @@ public TransferQueueBundler() { @ManagedAttribute(description="Size of the remove-queue") public int removeQueueSize() {return remove_queue.size();} + @ManagedAttribute(description="Capacity of the remove-queue") + public int removeQueueCapacity() {return ((FastArray)remove_queue).capacity();} + public boolean dropWhenFull() {return drop_when_full;} public T dropWhenFull(boolean d) {this.drop_when_full=d; return (T)this;} diff --git a/src/org/jgroups/util/MaxOneThreadPerSender.java b/src/org/jgroups/util/MaxOneThreadPerSender.java index a401587cbf..02893a3b2c 100644 --- a/src/org/jgroups/util/MaxOneThreadPerSender.java +++ b/src/org/jgroups/util/MaxOneThreadPerSender.java @@ -134,9 +134,9 @@ protected Entry(Address sender, boolean mcast, AsciiString cluster_name) { this.mcast=mcast; this.sender=sender; this.cluster_name=cluster_name; - int cap=max_buffer_size > 0? max_buffer_size : 512; // initial capacity + int cap=max_buffer_size > 0? max_buffer_size : 128; // initial capacity batch=new MessageBatch(cap).dest(tp.getAddress()).sender(sender).clusterName(cluster_name).multicast(mcast); - batch.array().increment(512); + batch.array().increment(128); }