Skip to content

Commit

Permalink
Use volatiles & var handles instead of atomics (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Feb 13, 2024
1 parent 3ef4be3 commit 260d06f
Show file tree
Hide file tree
Showing 4 changed files with 242 additions and 137 deletions.
146 changes: 88 additions & 58 deletions core/src/main/java/com/softwaremill/jox/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.Comparator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Function;
import java.util.function.Supplier;
Expand Down Expand Up @@ -88,29 +86,56 @@ operations on these (previous) segments, and we'll end up wanting to remove such
operations won't use them, so the relinking won't be useful.
*/

// immutable state

private final int capacity;
private final boolean isRendezvous;
private final boolean isUnlimited;

// mutable state

/**
* The total number of `send` operations ever invoked, and a flag indicating if the channel is closed.
* The flag is shifted by {@link Channel#SENDERS_AND_CLOSED_FLAG_SHIFT} bits.
* <p>
* Each {@link Channel#send} invocation gets a unique cell to process.
*/
private final AtomicLong sendersAndClosedFlag = new AtomicLong(0L);
private final AtomicLong receivers = new AtomicLong(0L);
private final AtomicLong bufferEnd;
private volatile long sendersAndClosedFlag = 0L;
private volatile long receivers = 0L;
private volatile long bufferEnd;

/**
* Segments holding cell states. State can be {@link CellState}, {@link Continuation}, {@link SelectInstance}, or a user-provided buffered value.
*/
private final AtomicReference<Segment> sendSegment;
private final AtomicReference<Segment> receiveSegment;
private final AtomicReference<Segment> bufferEndSegment;
private volatile Segment sendSegment;
private volatile Segment receiveSegment;
private volatile Segment bufferEndSegment;
private volatile ChannelClosed closedReason;

private final AtomicReference<ChannelClosed> closedReason;
// var handles

private final boolean isRendezvous;
private final boolean isUnlimited;
private static final VarHandle SENDERS_AND_CLOSE_FLAG;
private static final VarHandle RECEIVERS;
private static final VarHandle BUFFER_END;
private static final VarHandle SEND_SEGMENT;
private static final VarHandle RECEIVE_SEGMENT;
private static final VarHandle BUFFER_END_SEGMENT;
private static final VarHandle CLOSED_REASON;

static {
try {
MethodHandles.Lookup l = MethodHandles.privateLookupIn(Channel.class, MethodHandles.lookup());
SENDERS_AND_CLOSE_FLAG = l.findVarHandle(Channel.class, "sendersAndClosedFlag", long.class);
RECEIVERS = l.findVarHandle(Channel.class, "receivers", long.class);
BUFFER_END = l.findVarHandle(Channel.class, "bufferEnd", long.class);
SEND_SEGMENT = l.findVarHandle(Channel.class, "sendSegment", Segment.class);
RECEIVE_SEGMENT = l.findVarHandle(Channel.class, "receiveSegment", Segment.class);
BUFFER_END_SEGMENT = l.findVarHandle(Channel.class, "bufferEndSegment", Segment.class);
CLOSED_REASON = l.findVarHandle(Channel.class, "closedReason", ChannelClosed.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
}

/**
* Creates a rendezvous channel.
Expand All @@ -134,15 +159,13 @@ public Channel(int capacity) {

var firstSegment = new Segment(0, null, isRendezvousOrUnlimited ? 2 : 3, isRendezvousOrUnlimited);

sendSegment = new AtomicReference<>(firstSegment);
receiveSegment = new AtomicReference<>(firstSegment);
sendSegment = firstSegment;
receiveSegment = firstSegment;
// If the capacity is 0 or -1, buffer expansion never happens, so the buffer end segment points to a null segment,
// not the first one. This is also reflected in the pointer counter of firstSegment.
bufferEndSegment = new AtomicReference<>(isRendezvousOrUnlimited ? Segment.NULL_SEGMENT : firstSegment);

bufferEnd = new AtomicLong(capacity);
bufferEndSegment = isRendezvousOrUnlimited ? Segment.NULL_SEGMENT : firstSegment;

closedReason = new AtomicReference<>(null);
bufferEnd = capacity;
}

public static <T> Channel<T> newUnlimitedChannel() {
Expand Down Expand Up @@ -178,9 +201,9 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
}
while (true) {
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = sendSegment.get();
var segment = sendSegment;
// reserving the next cell
var scf = sendersAndClosedFlag.getAndIncrement();
var scf = (long) SENDERS_AND_CLOSE_FLAG.getAndAdd(this, 1L);
var s = getSendersCounter(scf);

// calculating the segment id and the index within the segment
Expand All @@ -189,16 +212,16 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau

// check if `sendSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(sendSegment, segment, id);
segment = findAndMoveForward(SEND_SEGMENT, this, segment, id);
if (segment == null) {
// the channel has been closed, `s` points to a segment which doesn't exist
return closedReason.get();
return closedReason;
}

// if we still have another segment, the segment must have been removed
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
sendersAndClosedFlag.compareAndSet(s, segment.getId() * Segment.SEGMENT_SIZE);
SENDERS_AND_CLOSE_FLAG.compareAndSet(this, s, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}
Expand All @@ -207,7 +230,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
// reference forward, so that segments which become eligible for removal can be GCed (after the channel
// is closed, e.g. when the channel is done and there are some values left to be received)
if (isClosed(scf)) {
return closedReason.get();
return closedReason;
}

var sendResult = updateCellSend(segment, i, s, value, select, selectClause);
Expand All @@ -233,7 +256,7 @@ private Object doSend(T value, SelectInstance select, SelectClause<?> selectClau
// trying again with a new cell
} else if (sendResult == SendResult.CLOSED) {
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
return closedReason;
} else {
throw new IllegalStateException("Unexpected result: " + sendResult);
}
Expand All @@ -253,7 +276,7 @@ private Object updateCellSend(Segment segment, int i, long s, T value, SelectIns

if (state == null) {
// reading the buffer end & receiver's counter if needed
if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd.get()) && s >= receivers.get()) {
if (!isUnlimited && s >= (isRendezvous ? 0 : bufferEnd) && s >= receivers) {
// cell is empty, and no receiver, not in buffer -> suspend
if (select != null) {
// cell is empty, no receiver, and we are in a select -> store the select instance
Expand Down Expand Up @@ -350,34 +373,34 @@ public Object receiveSafe() throws InterruptedException {
private Object doReceive(SelectInstance select, SelectClause<?> selectClause) throws InterruptedException {
while (true) {
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = receiveSegment.get();
var segment = receiveSegment;
// reserving the next cell
var r = receivers.getAndIncrement();
var r = (long) RECEIVERS.getAndAdd(this, 1L);

// calculating the segment id and the index within the segment
var id = r / Segment.SEGMENT_SIZE;
var i = (int) (r % Segment.SEGMENT_SIZE);

// check if `receiveSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(receiveSegment, segment, id);
segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id);
if (segment == null) {
// the channel has been closed, r points to a segment which doesn't exist
return closedReason.get();
return closedReason;
}

// if we still have another segment, the segment must have been removed
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
receivers.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE);
RECEIVERS.compareAndSet(this, r, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}

var result = updateCellReceive(segment, i, r, select, selectClause);
if (result == ReceiveResult.CLOSED) {
// not cleaning the previous segments - the close procedure might still need it
return closedReason.get();
return closedReason;
} else {
/*
After `updateCellReceive` completes and the channel isn't closed, we can be sure that S > r, unless
Expand Down Expand Up @@ -419,7 +442,7 @@ private Object updateCellReceive(Segment segment, int i, long r, SelectInstance
var state = segment.getCell(i); // reading the current state of the cell; we'll try to update it atomically

if (state == null || state == IN_BUFFER) {
if (r >= getSendersCounter(sendersAndClosedFlag.get())) { // reading the sender's counter
if (r >= getSendersCounter(sendersAndClosedFlag)) { // reading the sender's counter
if (select != null) {
// cell is empty, no sender, and we are in a select -> store the select instance
// and await externally
Expand Down Expand Up @@ -513,17 +536,17 @@ private void expandBuffer() {
if (isRendezvous || isUnlimited) return;
while (true) {
// reading the segment before the counter increment - this is needed to find the required segment later
var segment = bufferEndSegment.get();
var segment = bufferEndSegment;
// reserving the next cell
var b = bufferEnd.getAndIncrement();
var b = (long) BUFFER_END.getAndAdd(this, 1L);

// calculating the segment id and the index within the segment
var id = b / Segment.SEGMENT_SIZE;
var i = (int) (b % Segment.SEGMENT_SIZE);

// check if `bufferEndSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(bufferEndSegment, segment, id);
segment = findAndMoveForward(BUFFER_END_SEGMENT, this, segment, id);
if (segment == null) {
// the channel has been closed, b points to a segment which doesn't exist, nowhere to expand
return;
Expand All @@ -535,7 +558,7 @@ private void expandBuffer() {
// senders must have already been processed by expandBuffer)
if (segment.getId() != id) {
// skipping all interrupted cells as an optimization
bufferEnd.compareAndSet(b, segment.getId() * Segment.SEGMENT_SIZE);
BUFFER_END.compareAndSet(this, b, segment.getId() * Segment.SEGMENT_SIZE);
// another buffer expansion already happened for this cell (in the removed segment)
return;
}
Expand Down Expand Up @@ -660,17 +683,24 @@ public Object errorSafe(Throwable reason) {
}

private Object closeSafe(ChannelClosed channelClosed) {
if (!closedReason.compareAndSet(null, channelClosed)) {
return closedReason.get(); // already closed
if (!CLOSED_REASON.compareAndSet(this, null, channelClosed)) {
return closedReason; // already closed
}

// after this completes, it's guaranteed than no sender with `s >= lastSender` will proceed with the usual
// sending algorithm, as `send()` will observe that the channel is closed
var scf = sendersAndClosedFlag.updateAndGet(this::setClosedFlag);
long scf;
var scfUpdated = false;
do {
var initialScf = sendersAndClosedFlag;
scf = setClosedFlag(initialScf);
scfUpdated = SENDERS_AND_CLOSE_FLAG.compareAndSet(this, initialScf, scf);
} while (!scfUpdated);

var lastSender = getSendersCounter(scf);

// closing the segment chain guarantees that no new segment beyond `lastSegment` will be created
var lastSegment = sendSegment.get().close();
var lastSegment = sendSegment.close();

if (channelClosed instanceof ChannelError) {
// closing all cells, as this is an error
Expand Down Expand Up @@ -741,7 +771,7 @@ private void updateCellClose(Segment segment, int i) {
return;
}
} else if (state instanceof StoredSelectClause ss) {
ss.getSelect().channelClosed(ss, closedReason.get());
ss.getSelect().channelClosed(ss, closedReason);
// not setting the state & updating counters, as each non-selected stored select cell will be
// cleaned up, setting an interrupted state (and informing the segment)
// until this happens, other (concurrent) invocations of `channelClosed` or `trySelect` will still
Expand Down Expand Up @@ -774,13 +804,13 @@ private void updateCellClose(Segment segment, int i) {

@Override
public ChannelClosed closedForSend() {
return isClosed(sendersAndClosedFlag.get()) ? closedReason.get() : null;
return isClosed(sendersAndClosedFlag) ? closedReason : null;
}

@Override
public ChannelClosed closedForReceive() {
if (isClosed(sendersAndClosedFlag.get())) {
var cr = closedReason.get(); // cannot be null
if (isClosed(sendersAndClosedFlag)) {
var cr = closedReason; // cannot be null
if (cr instanceof ChannelError) {
return cr;
} else {
Expand All @@ -795,10 +825,10 @@ public ChannelClosed closedForReceive() {
private boolean hasValuesToReceive() {
while (true) {
// reading the segment before the counter - this is needed to find the required segment later
var segment = receiveSegment.get();
var segment = receiveSegment;
// r is the cell which will be used by the next receive
var r = receivers.get();
var s = getSendersCounter(sendersAndClosedFlag.get());
var r = receivers;
var s = getSendersCounter(sendersAndClosedFlag);

if (s <= r) {
// for sure, nothing is buffered / no senders are waiting
Expand All @@ -811,7 +841,7 @@ private boolean hasValuesToReceive() {

// check if `receiveSegment` stores a previous segment, if so move the reference forward
if (segment.getId() != id) {
segment = findAndMoveForward(receiveSegment, segment, id);
segment = findAndMoveForward(RECEIVE_SEGMENT, this, segment, id);
if (segment == null) {
// the channel has been closed, r points to a segment which doesn't exist
return false;
Expand All @@ -820,7 +850,7 @@ private boolean hasValuesToReceive() {
// if we still have another segment, the segment must have been removed
if (segment.getId() != id) {
// skipping all interrupted cells, and trying with a new one
receivers.compareAndSet(r, segment.getId() * Segment.SEGMENT_SIZE);
RECEIVERS.compareAndSet(this, r, segment.getId() * Segment.SEGMENT_SIZE);
continue;
}
}
Expand All @@ -832,7 +862,7 @@ private boolean hasValuesToReceive() {
return true;
} else {
// nothing to receive, we can (try to, if not already done) bump the counter and try again
receivers.compareAndSet(r, r + 1);
RECEIVERS.compareAndSet(this, r, r + 1);
}
}
}
Expand Down Expand Up @@ -964,35 +994,35 @@ void cleanupStoredSelectClause(Segment segment, int i, boolean isSender) {
private static final int SENDERS_AND_CLOSED_FLAG_SHIFT = 60;
private static final long SENDERS_COUNTER_MASK = (1L << SENDERS_AND_CLOSED_FLAG_SHIFT) - 1;

private long getSendersCounter(long sendersAndClosedFlag) {
private static long getSendersCounter(long sendersAndClosedFlag) {
return sendersAndClosedFlag & SENDERS_COUNTER_MASK;
}

private boolean isClosed(long sendersAndClosedFlag) {
private static boolean isClosed(long sendersAndClosedFlag) {
return sendersAndClosedFlag >> SENDERS_AND_CLOSED_FLAG_SHIFT == 1;
}

private long setClosedFlag(long sendersAndClosedFlag) {
private static long setClosedFlag(long sendersAndClosedFlag) {
return sendersAndClosedFlag | (1L << SENDERS_AND_CLOSED_FLAG_SHIFT);
}

@Override
public String toString() {
//noinspection OptionalGetWithoutIsPresent
var smallestSegment = Stream.of(sendSegment.get(), receiveSegment.get(), bufferEndSegment.get())
var smallestSegment = Stream.of(sendSegment, receiveSegment, bufferEndSegment)
.filter(s -> s != Segment.NULL_SEGMENT)
.min(Comparator.comparingLong(Segment::getId)).get();

var scf = sendersAndClosedFlag.get();
var scf = sendersAndClosedFlag;
var sendersCounter = getSendersCounter(scf);
var isClosed = isClosed(scf);

var sb = new StringBuilder();
sb.append("Channel(capacity=").append(capacity)
.append(", closed=").append(isClosed)
.append(", sendSegment=").append(sendSegment.get().getId()).append(", sendCounter=").append(sendersCounter)
.append(", receiveSegment=").append(receiveSegment.get().getId()).append(", receiveCounter=").append(receivers.get())
.append(", bufferEndSegment=").append(bufferEndSegment.get().getId()).append(", bufferEndCounter=").append(bufferEnd.get())
.append(", sendSegment=").append(sendSegment.getId()).append(", sendCounter=").append(sendersCounter)
.append(", receiveSegment=").append(receiveSegment.getId()).append(", receiveCounter=").append(receivers)
.append(", bufferEndSegment=").append(bufferEndSegment.getId()).append(", bufferEndCounter=").append(bufferEnd)
.append("): \n");
var s = smallestSegment;
while (s != null) {
Expand Down
Loading

1 comment on commit 260d06f

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 1.20.

Benchmark suite Current: 260d06f Previous: c96e9b5 Ratio
com.softwaremill.jox.ParallelKotlinBenchmark.parallelChannels_defaultDispatcher ( {"capacity":"0","parallelism":"10000"} ) 80.92968521 ns/op 64.14075957 ns/op 1.26

This comment was automatically generated by workflow using github-action-benchmark.

CC: @adamw

Please sign in to comment.